v0.15.7 - Production Ready

DAG-Based Workflow
Orchestration Engine

Build robust, fault-tolerant pipelines with Python. Stabilize handles task execution, error recovery, and state management so you can focus on your business logic.

🔀

DAG-Based Workflows

Define complex workflows as directed acyclic graphs. Run stages sequentially or in parallel based on dependencies.

🔄

Automatic Recovery

Built-in crash recovery, message deduplication, and optimistic locking ensure your workflows complete reliably.

Built-in Tasks

Shell, HTTP, Python, Docker, and SSH tasks ready to use. Or create custom tasks for your specific needs.

🗄️

Flexible Storage

SQLite for development, PostgreSQL for production. Full transaction support and state persistence.

🎯

Dynamic Routing

Jump to stages dynamically, implement retry loops, and build complex conditional workflows.

🛡️

Enterprise Ready

Transient error handling, exponential backoff, verification systems, and structured conditions.

Simple, Powerful API

Define workflows with a clean Python API. Each stage is a node in the DAG.

from stabilize import (
    Workflow, StageExecution, TaskExecution,
    Orchestrator, QueueProcessor,
    SqliteQueue, SqliteWorkflowStore,
    ShellTask, TaskRegistry,
)

# Create workflow with parallel stages
workflow = Workflow.create(
    application="my-pipeline",
    name="Build and Deploy",
    stages=[
        StageExecution(
            ref_id="build",
            type="shell",
            name="Build Application",
            context={"command": "npm run build"},
            tasks=[TaskExecution.create(
                "Build", "shell",
                stage_start=True, stage_end=True
            )],
        ),
        StageExecution(
            ref_id="deploy",
            type="shell",
            name="Deploy",
            requisite_stage_ref_ids={"build"},  # Runs after build
            context={"command": "./deploy.sh"},
            tasks=[TaskExecution.create(
                "Deploy", "shell",
                stage_start=True, stage_end=True
            )],
        ),
    ],
)

# Execute the workflow
orchestrator.start(workflow)
processor.process_all(timeout=60.0)

MCP Server Integration

Connect AI agents directly to Stabilize workflows using the Model Context Protocol. Create, monitor, and manage workflows programmatically.

  • Create and execute workflows via AI assistants
  • Monitor workflow status and retrieve results
  • Access workflow context and stage outputs
  • Full JSON-RPC 2.0 protocol support
Connect to MCP Server

Available Tools

TOOL create_workflow
TOOL get_workflow_status
TOOL list_workflows
TOOL execute_shell_workflow

Built-in Task Types

ShellTask

Execute shell commands with timeout, env vars, and output capture

HTTPTask

HTTP requests with auth, retries, file uploads and downloads

PythonTask

Run Python scripts inline or from files with INPUT/RESULT

DockerTask

Run containers, build images, manage Docker operations

SSHTask

Execute commands on remote hosts via SSH

Custom Tasks

Extend Task class for your own implementations