Skip to main content
The ParallelRunner class enables running multiple tasks in parallel with controlled concurrency.

Basic Usage

import asyncio
from helios import ParallelRunner, discover_tasks

async def main():
    task_paths = discover_tasks("tasks/")

    runner = ParallelRunner(
        task_paths=task_paths,
        n_concurrent=4,
        model="claude-sonnet-4-20250514"
    )

    result = await runner.run()
    print(f"Passed: {result.passed}/{result.total_tasks}")

asyncio.run(main())

Constructor

ParallelRunner(
    task_paths: list[Path],
    n_concurrent: int = 2,
    model: str = "gemini/gemini-2.5-computer-use-preview-10-2025",
    provider: str = "docker",
    output_dir: str | Path = "output",
    quiet: bool = False,
    on_complete: Callable | None = None
)

Parameters

ParameterTypeDefaultDescription
task_pathslist[Path]RequiredList of task directories
n_concurrentint2Max concurrent tasks
modelstrGemini defaultModel identifier
providerstr"docker"Environment provider
output_dirstr | Path"output"Where to save outputs
quietboolFalseSuppress individual task output
on_completeCallableNoneCallback for each completed task

Helper Functions

discover_tasks()

Find all tasks in a directory.
from helios import discover_tasks

# Find all tasks
tasks = discover_tasks("tasks/")

# Find tasks matching a pattern
tasks = discover_tasks("tasks/pdfbench/", pattern="**/pdfbench_eyemed*/task.toml")

Methods

run()

Execute all tasks and return aggregate results.
async def run(self) -> BatchResult
Returns: BatchResult with aggregate statistics

BatchResult

The result object returned by run().
@dataclass
class BatchResult:
    total_tasks: int        # Number of tasks run
    passed: int             # Tasks with reward >= 1.0
    failed: int             # Tasks with reward < 1.0
    mean_reward: float      # Average reward
    duration: float         # Total execution time
    tasks: list[TaskResult] # Individual results

TaskResult

@dataclass
class TaskResult:
    name: str               # Task name
    status: str             # "passed" or "failed"
    reward: float           # Reward value
    duration: float         # Execution time
    error: str | None       # Error if failed

Examples

Basic Batch Run

import asyncio
from helios import ParallelRunner, discover_tasks

async def run_batch():
    tasks = discover_tasks("tasks/benchmark/")

    runner = ParallelRunner(
        task_paths=tasks,
        n_concurrent=4,
        model="claude-sonnet-4-20250514"
    )

    result = await runner.run()

    print(f"Total: {result.total_tasks}")
    print(f"Passed: {result.passed}")
    print(f"Failed: {result.failed}")
    print(f"Mean reward: {result.mean_reward:.3f}")
    print(f"Duration: {result.duration:.1f}s")

asyncio.run(run_batch())

With Progress Callback

import asyncio
from helios import ParallelRunner, discover_tasks

async def on_task_complete(task_name: str, result):
    status = "PASS" if result.reward >= 1.0 else "FAIL"
    print(f"[{status}] {task_name}: {result.reward:.2f}")

async def run_with_callback():
    tasks = discover_tasks("tasks/")

    runner = ParallelRunner(
        task_paths=tasks,
        n_concurrent=4,
        model="claude-sonnet-4-20250514",
        on_complete=on_task_complete
    )

    result = await runner.run()

asyncio.run(run_with_callback())

Analyzing Results

import asyncio
from helios import ParallelRunner, discover_tasks

async def analyze_results():
    tasks = discover_tasks("tasks/benchmark/")

    runner = ParallelRunner(
        task_paths=tasks,
        n_concurrent=4,
        model="claude-sonnet-4-20250514"
    )

    result = await runner.run()

    # Group by result
    passed = [t for t in result.tasks if t.status == "passed"]
    failed = [t for t in result.tasks if t.status == "failed"]

    print("PASSED:")
    for t in passed:
        print(f"  {t.name}: {t.duration:.1f}s")

    print("\nFAILED:")
    for t in failed:
        print(f"  {t.name}: {t.error or 'No error message'}")

    # Statistics
    rewards = [t.reward for t in result.tasks]
    print(f"\nMin reward: {min(rewards):.2f}")
    print(f"Max reward: {max(rewards):.2f}")
    print(f"Median: {sorted(rewards)[len(rewards)//2]:.2f}")

asyncio.run(analyze_results())

Custom Task Selection

import asyncio
from pathlib import Path
from helios import ParallelRunner

async def run_specific_tasks():
    # Manually specify tasks
    task_paths = [
        Path("tasks/easy-task"),
        Path("tasks/medium-task"),
        Path("tasks/hard-task"),
    ]

    runner = ParallelRunner(
        task_paths=task_paths,
        n_concurrent=3,
        model="claude-sonnet-4-20250514"
    )

    result = await runner.run()
    print(f"Mean reward: {result.mean_reward:.3f}")

asyncio.run(run_specific_tasks())

Comparing Models

import asyncio
from helios import ParallelRunner, discover_tasks

async def compare_models():
    tasks = discover_tasks("tasks/benchmark/")
    models = [
        "gemini/gemini-2.5-computer-use-preview-10-2025",
        "claude-sonnet-4-20250514",
    ]

    results = {}

    for model in models:
        runner = ParallelRunner(
            task_paths=tasks,
            n_concurrent=4,
            model=model,
            output_dir=f"results/{model.replace('/', '_')}"
        )

        result = await runner.run()
        results[model] = {
            "mean_reward": result.mean_reward,
            "passed": result.passed,
            "duration": result.duration
        }

    print("\nModel Comparison:")
    for model, stats in results.items():
        print(f"\n{model}:")
        print(f"  Mean reward: {stats['mean_reward']:.3f}")
        print(f"  Passed: {stats['passed']}/{len(tasks)}")
        print(f"  Duration: {stats['duration']:.1f}s")

asyncio.run(compare_models())

Using Daytona

import asyncio
from helios import ParallelRunner, discover_tasks

async def run_in_cloud():
    tasks = discover_tasks("tasks/pdfbench/")

    runner = ParallelRunner(
        task_paths=tasks,
        n_concurrent=20,  # Higher concurrency with cloud
        model="claude-sonnet-4-20250514",
        provider="daytona"
    )

    result = await runner.run()
    print(f"Cloud execution: {result.mean_reward:.3f} mean reward")

asyncio.run(run_in_cloud())

Saving Results to JSON

import asyncio
import json
from datetime import datetime
from helios import ParallelRunner, discover_tasks

async def run_and_export():
    tasks = discover_tasks("tasks/")

    runner = ParallelRunner(
        task_paths=tasks,
        n_concurrent=4,
        model="claude-sonnet-4-20250514"
    )

    result = await runner.run()

    # Export to JSON
    export = {
        "timestamp": datetime.now().isoformat(),
        "model": "claude-sonnet-4-20250514",
        "total_tasks": result.total_tasks,
        "passed": result.passed,
        "failed": result.failed,
        "mean_reward": result.mean_reward,
        "duration": result.duration,
        "tasks": [
            {
                "name": t.name,
                "status": t.status,
                "reward": t.reward,
                "duration": t.duration,
                "error": t.error
            }
            for t in result.tasks
        ]
    }

    with open("results.json", "w") as f:
        json.dump(export, f, indent=2)

asyncio.run(run_and_export())

Best Practices

Match concurrency to your resources:
  • Local Docker: 2-8 based on RAM
  • Daytona: 10-50+ for cloud scaling
The on_complete callback provides real-time progress:
async def progress(name, result):
    print(f"Completed: {name}")

runner = ParallelRunner(..., on_complete=progress)
Check individual task results, not just aggregates:
for task in result.tasks:
    if task.status == "failed":
        investigate(task)

Next Steps