I was inspired to replicate the functionality of AWS Step Functions in my own code when I couldn’t find any libraries that suited my needs for orchestrating complex workflows. This led me to design a custom Step Function library to handle sequential and parallel execution, branch logic, error handling, and more.
What is a Step Function?
A Step Function helps developers orchestrate complex workflows and track the execution status at each step. It essentially functions as a directed acyclic graph (DAG) where each node represents a function call, and the connections between nodes define the order of execution. Every node in the graph points to the next step, unidirectionally, and each step depends on the result of the previous step—except for the first.
You can install the package directly from PyPI.
pip install stepfunction
Example
The below example handles car purchase transactions by validating the transaction, updating shipping, payment, and inventory records in parallel, and notifying the customer. If any step fails, it gracefully triggers a fallback to manage the error without halting the workflow.

Validate Car Purchase Transaction Workflow
from stepfunction.core.step_function import StepFunction
# Simulating the workflow steps for handling a car purchase transaction
def parse_car_purchase_transaction():
"""Simulate parsing the car purchase transaction."""
pass
def check_if_car_purchase_transaction_is_valid():
"""Simulate checking if the transaction details are valid."""
pass
def send_notification_to_user_email():
"""Simulate sending a notification to the user via email."""
pass
def update_car_purchase_transaction_status():
"""Simulate updating the status of the car purchase transaction."""
pass
def update_shipping_db_record():
"""Simulate updating the shipping database record."""
pass
def update_payment_db_record():
"""Simulate updating the payment database record."""
pass
def update_inventory_db_record():
"""Simulate updating the inventory database record."""
pass
def end_step():
"""Simulate the final step to conclude the workflow."""
pass
def generic_failure_handler():
"""Simulate handling a generic failure during the workflow."""
pass
async def validate_car_purchase_transaction_workflow():
step_function = StepFunction(
name="Validate_Car_Purchase_Transaction_Workflow")
step_function.add_step("Parse_Car_Purchase_Transaction", func=parse_car_purchase_transaction,
next_step="Check_If_Car_Purchase_Transaction_Is_Valid", on_failure="Generic_Failure_Handler")
step_function.add_step("Check_If_Car_Purchase_Transaction_Is_Valid", func=check_if_car_purchase_transaction_is_valid, branch={
"valid": "Update_Company_DB_Records",
"invalid": "Send_Notification_To_User_Email"
}, on_failure="Generic_Failure_Handler")
# Parallel validation steps
step_function.add_step(
"Update_Company_DB_Records",
{
"update_shipping_db_record": update_shipping_db_record,
"update_payment_db_record": update_payment_db_record,
"update_inventory_db_record": update_inventory_db_record
},
next_step="Update_Car_Purchase_Transaction_Status",
parallel=True,
stop_on_failure=False
)
step_function.add_step("Update_Car_Purchase_Transaction_Status",
func=update_car_purchase_transaction_status, next_step="Send_Notification_To_User_Email", on_failure="Generic_Failure_Handler")
step_function.add_step("Send_Notification_To_User_Email",
func=send_notification_to_user_email, next_step="End", on_failure="Generic_Failure_Handler")
step_function.add_step("End", func=end_step)
step_function.add_step("Generic_Failure_Handler",
func=generic_failure_handler)
step_function.set_start_step("Parse_Car_Purchase_Transaction")
await step_function.execute()
step_function.visualize()
Key Features of the Step Function Library
1. Sequential and Parallel Execution
- The library supports both sequential execution, where each step follows the previous one, and parallel execution, where multiple steps can run concurrently.
2. Error Handling and Branching
- Each step can specify an
on_failure
handler, which is executed when a step fails. You can also define branching logic that allows a step to branch to different steps based on its result.
3. Context Management
- The library keeps track of the context of each step, storing the results or exceptions encountered during execution. This context can be retrieved for further analysis.
4. Sub-Step Functions
- Sub-step functions allow you to nest workflows within workflows, adding modularity and flexibility to your design. This is particularly useful when you have reusable workflows that can be applied in different contexts. By breaking larger workflows into smaller sub-step functions, you enhance code readability and maintainability.
Anatomy of a Step
Each step in the workflow is defined with the following parameters:
Argument | Purpose | Type |
---|---|---|
name |
Name of the step | str |
func |
Function to be executed | Union[Callable[[Any], Any], Dict[str, Callable[[Any], Any]]] |
next_step |
Name of the next step to be executed (if no branching) | Optional[str] |
on_failure |
Name of the step to execute if the current step fails | Optional[str] |
branch |
Logic to decide the next step based on the result of the current step | Optional[Dict[Any, str]] |
parallel |
Whether to execute the step in parallel with other steps | bool (default: False ) |
stop_on_failure |
Whether to stop all parallel executions if one step fails | bool (default: False ) |
This flexible structure makes it easy to define workflows that can handle different execution paths and failure scenarios.
Key Properties of the Step Function
Once a Step Function is defined, you can access various properties during workflow execution:
Property | Purpose |
---|---|
name |
The name of the Step Function |
steps |
A dictionary containing all the steps defined in the workflow |
last_result |
The result of the most recently executed step |
context |
A dictionary that stores the results of all steps, including exceptions |
status |
The current status of the Step Function (e.g., INITIALIZED , RUNNING , COMPLETED , FAILED ) |