FI
ForgeOrchestrator

PipelineOrchestrator

Runs actions sequentially with typed shared state and explicit results.

When to Use

Use PipelineOrchestrator when your actions form a data-processing flow — action A loads data, action B filters it, action C renders it. Each action can read what previous ones stored via PipelineContext.

Key difference from SequenceOrchestrator: pipeline actions take a PipelineContext parameter and return an ActionResult. This gives downstream actions typed access to upstream data, and the orchestrator returns a full results array you can observe.

Basic Usage

import ForgeOrchestrator

let pipeline = PipelineOrchestrator()
pipeline.register(LoadPostsAction())
pipeline.register(FilterPostsAction())
pipeline.register(ShowBannerAction())

let results: [ActionResult] = await pipeline.evaluate()

Sharing Data via Context

Each evaluate() call creates a fresh PipelineContext. Actions write to it with set, and subsequent actions read with get. The context is typed via generics — reading with the wrong type returns nil.

struct LoadPostsAction: PipelineAction {
    let id: ActionID = "load-posts"
    let priority: ActionPriority = .high

    func shouldRun() async -> Bool { true }

    func execute(context: PipelineContext) async -> ActionResult {
        do {
            let posts = try await api.fetchPosts()
            context.set("posts", posts)
            return .completed
        } catch {
            return .failed(error.localizedDescription)
        }
    }
}

struct FilterPostsAction: PipelineAction {
    let id: ActionID = "filter-posts"
    let priority: ActionPriority = .medium

    func shouldRun() async -> Bool { true }

    func execute(context: PipelineContext) async -> ActionResult {
        guard let posts: [Post] = context.get("posts") else {
            return .skipped  // upstream action didn't populate posts
        }
        let filtered = posts.filter { $0.isPublished }
        context.set("filtered-posts", filtered)
        return .completed
    }
}

Context API

let context = PipelineContext()

// Set — accepts any Sendable value
context.set("posts", [Post, Post, Post])
context.set("filter-count", 42)

// Get — strongly typed via generic, returns nil on missing or wrong type
let posts: [Post]? = context.get("posts")
let count: Int? = context.get("filter-count")

// Remove
context.remove("posts")

Fresh per run: a new PipelineContext is created at the start of every evaluate() call. Data does not leak between runs.

Handling Results

evaluate() returns an array of ActionResult values in priority order. You can use this to log, report, or branch on the outcome.

let results = await pipeline.evaluate()

for result in results {
    switch result {
    case .completed:
        logger.info("Action completed")
    case .skipped:
        logger.info("Action skipped")
    case .failed(let reason):
        logger.error("Action failed: \(reason)")
    }
}

Result semantics

  • .completed — the action did its job. Downstream actions can rely on its side effects (e.g., context keys it wrote).
  • .skipped — the action started, checked a runtime precondition, and bailed out. Typically used when an upstream action didn't populate a required context key.
  • .failed(reason) — the action hit an error. Downstream actions still run by default — pipeline does not short-circuit on failure.

Pipelines don't short-circuit: a failed action does not stop the pipeline. If you need abort-on-failure semantics, have downstream actions check the previous results or context keys and return .skipped.

Observable State

Same as SequenceOrchestrator: isProcessing, currentActionId, eligibleCount, completedCount. Bind them to SwiftUI views for progress indicators.

Thread Safety

PipelineContext is backed by a LockedState (internally Mutex<[String: Any]>). Actions can safely read/write from any actor — though since pipeline actions run sequentially, concurrent context access from the pipeline itself doesn't happen.

Best Practices

  • Use stable, namespaced context keys. "feed.posts" or "user.profile" beats "data". Collisions are silent.
  • Document context contracts. Add a comment to each action listing the keys it reads and writes. The compiler won't enforce it.
  • Return .skipped, not .failed, for missing upstream data. .failed is for actual errors; missing data is a normal control-flow case.
  • Don't store non-Sendable values. The set API requires T: Sendable. Wrap non-sendable values in a box or refactor the data.