The Workflow Engine That
Lives Inside Your App
A Python library — not a platform. Embed DAG-based orchestration directly in your code. All 43 academic workflow patterns, event sourcing, crash recovery, and native AI agent integration.
pip install stabilize
Message-Driven DAG
Stages form a directed acyclic graph. Execution progresses via queued messages processed by handlers. Parallel fan-out, join synchronization, and complex multi-level DAGs supported natively.
43 Workflow Patterns
Complete implementation of van der Aalst's control-flow patterns. AND/OR joins, discriminator, N-of-M, deferred choice, mutex, milestones, structured loops, multi-instance, and sub-workflows.
Event Sourcing
Every state transition recorded as an immutable event. Replay execution from any point. Time-travel state reconstruction. Build analytics projections from the event stream.
Signals and Suspend/Resume
External signals pause and resume stage execution. Buffered or transient delivery modes. Human-in-the-loop approvals and inter-workflow communication built in.
Crash Recovery
Automatic workflow recovery on process restart. Message deduplication prevents duplicate side effects. Optimistic locking handles concurrent execution safely.
Dynamic Routing
TaskResult.jump_to() enables conditional branching, retry loops, and AI-driven flow control within executing workflows. Jump count tracking prevents infinite loops.
Synthetic Stages
Before/after/onFailure stages injected automatically around any stage. Setup, cleanup, validation, and rollback without polluting your core workflow logic.
Enterprise Reliability
Transient vs permanent error classification. Exponential backoff with jitter. Dead letter queue. Stateful retries that preserve progress across failures via context_update.
Built for AI Agents
Stabilize is designed from the ground up for programmatic workflow creation. Every workflow is defined in Python code — no YAML, no UI, no drag-and-drop. AI coding agents generate correct Stabilize code on the first try.
- ✓ MCP documentation server with 26 tools for AI-assisted development
- ✓
stabilize promptCLI outputs LLM-optimized docs for context windows - ✓ Dynamic routing lets AI agents make decisions during execution
- ✓ Event sourcing gives agents full observability into execution history
- ✓ Sub-workflows enable recursive, agent-orchestrated pipeline composition
MCP Documentation Server
26 toolsfrom stabilize import ( Workflow, StageExecution, TaskExecution, Task, TaskResult, TaskRegistry, SqliteQueue, SqliteWorkflowStore, Orchestrator, QueueProcessor, ) from stabilize.events import configure_event_sourcing, SqliteEventStore class RouteTask(Task): """AI agent decides the next step based on analysis.""" def execute(self, stage): if stage.context.get("sentiment") == "negative": return TaskResult.jump_to("escalate") # Dynamic routing return TaskResult.success() # One-line event sourcing setup db = "./workflows.db" store = SqliteWorkflowStore(f"sqlite:///{db}", create_tables=True) queue = SqliteQueue(f"sqlite:///{db}", table_name="queue") configure_event_sourcing(SqliteEventStore(f"sqlite:///{db}", create_tables=True)) workflow = Workflow.create( application="ai-agent", name="Intelligent Routing", stages=[ StageExecution( ref_id="analyze", type="python", name="Analyze Input", context={"script": "RESULT = analyze(INPUT['data'])"}, tasks=[TaskExecution.create("Run", "python", stage_start=True, stage_end=True)], ), StageExecution( ref_id="route", type="route", name="AI Decision", requisite_stage_ref_ids={"analyze"}, context={}, tasks=[TaskExecution.create("Route", "route", stage_start=True, stage_end=True)], ), StageExecution( ref_id="respond", type="http", name="Send Response", requisite_stage_ref_ids={"route"}, context={"url": "https://api.example.com/respond", "method": "POST"}, tasks=[TaskExecution.create("Send", "http", stage_start=True, stage_end=True)], ), StageExecution( ref_id="escalate", type="http", name="Escalate Issue", context={"url": "https://api.example.com/escalate", "method": "POST"}, tasks=[TaskExecution.create("Alert", "http", stage_start=True, stage_end=True)], ), ], )
Discriminator Join
First upstream to complete fires the downstream stage. The rest are ignored.
StageExecution( ref_id="triage", join_type=JoinType.DISCRIMINATOR, requisite_stage_ref_ids={"check_a", "check_b"}, )
N-of-M Join
Proceed when 3 of 5 reviewers approve. Cancel the remaining.
StageExecution( ref_id="approved", join_type=JoinType.N_OF_M, join_threshold=3, requisite_stage_ref_ids={"r1","r2","r3","r4","r5"}, )
Deferred Choice
Race between branches. First to start wins, others are cancelled.
StageExecution( ref_id="agent_reply", deferred_choice_group="response", ) StageExecution( ref_id="escalate", deferred_choice_group="response", )
Signals
External signals suspend and resume execution. Buffered delivery supported.
# Task suspends waiting for signal return TaskResult.suspend() # External system sends signal queue.push(SignalStage( stage_id=stage.id, signal_name="approved", signal_data={"user": "alice"}, persistent=True, ))
ShellTask
Execute commands with timeout, env vars, secrets masking, and output capture
HTTPTask
Requests with auth, retries, file uploads, downloads, and JSON parsing
PythonTask
Run scripts inline or from files with INPUT/RESULT data convention
DockerTask
Containers, image builds, resource limits, GPU support, and volume mounts
SSHTask
Remote command execution via SSH with key auth and timeouts
WaitTask
Configurable timed delays between workflow stages
SubWorkflowTask
Nest workflows within workflows with recursion depth tracking
Custom Tasks
Extend Task or RetryableTask for polling, custom integrations, anything