Async Streams
How ForgeObservers uses AsyncStream for reactive system event delivery, and how to consume streams in your code.
How Streams Work
Every observer exposes one or more AsyncStream properties. Each access creates a fresh stream with its own system subscription. When the stream is terminated (via task cancellation or scope exit), the onTermination handler removes system observers and cleans up resources.
// Every observer exposes a stream as a computed property.
// Each access creates a new AsyncStream with its own subscription.
public var statusStream: AsyncStream<ConnectivityStatus> {
AsyncStream { continuation in
// Set up system listener (NWPathMonitor, NotificationCenter, etc.)
// Call continuation.yield(value) on each change
continuation.onTermination = { _ in
// Clean up: remove observers, cancel monitors
}
}
} Current Value vs. Stream
All observers provide both a synchronous property for the current state and an async stream for changes. The synchronous property is thread-safe and returns immediately.
All streams emit the current value immediately on subscription, then subsequent changes. You don't need to read the synchronous property separately before subscribing — the first emission gives you the current state.
let connectivity = ConnectivityObserver()
// Synchronous read — returns the current value immediately.
// Thread-safe, no suspension.
let isOnline = connectivity.status.isConnected
// Async stream — emits the current value immediately on subscription,
// then delivers future changes as they occur.
for await status in connectivity.statusStream {
// First emission is the current state, subsequent emissions are changes.
isOffline = !status.isConnected
} Iterating with for await
The for await loop is the primary way to consume an AsyncStream. The loop suspends between emissions and resumes each time a new value arrives. Code after the loop only runs when the stream finishes or the task is cancelled.
// The most direct way to consume a stream
for await status in connectivity.statusStream {
if status.isConnected {
await resumeSync()
} else {
pauseSync()
}
}
// This line is never reached while the stream is active.
// It runs only after the stream finishes or the task is cancelled. Using with SwiftUI .task
SwiftUI's .task modifier is the ideal place to start stream subscriptions. The task starts when the view appears and is automatically cancelled when the view disappears, which triggers the stream's cleanup handler.
Use separate .task modifiers for independent streams. Each stream runs in its own cooperative task.
struct ContentView: View {
let connectivity: ConnectivityObserving
let lifecycle: AppLifecycleObserving
@State private var isOffline = false
@State private var appState: AppLifecycleState = .active
var body: some View {
VStack {
if isOffline {
OfflineBanner()
}
MainContent()
}
.task {
// This task is cancelled when the view disappears.
// The stream's onTermination handler cleans up
// system observers automatically.
for await status in connectivity.statusStream {
isOffline = !status.isConnected
}
}
.task {
for await state in lifecycle.stateStream {
appState = state
}
}
}
} Multiple Streams with async let
When you need to observe multiple streams concurrently in a single function (e.g., in a ViewModel), use async let. Each stream runs in a child task, and all are cancelled when the parent task is cancelled.
func startObserving() async {
// async let runs each stream concurrently.
// All three subscribe and process updates in parallel.
async let _: () = observeConnectivity()
async let _: () = observeLifecycle()
async let _: () = observeKeyboard()
}
private func observeConnectivity() async {
for await status in connectivity.statusStream {
isOffline = !status.isConnected
}
}
private func observeLifecycle() async {
for await state in lifecycle.stateStream {
if state == .active {
await refreshData()
}
}
}
private func observeKeyboard() async {
for await state in keyboard.stateStream {
keyboardHeight = state.height
}
} With the .assign(to:on:) extension, this becomes much more concise:
func startObserving() async {
async let _: () = connectivity.statusStream
.map { !$0.isConnected }
.assign(to: \.isOffline, on: self)
async let _: () = lifecycle.stateStream
.map { $0 == .background }
.assign(to: \.isInBackground, on: self)
async let _: () = keyboard.stateStream
.map { $0.height }
.assign(to: \.keyboardHeight, on: self)
} Cancellation
AsyncStream respects Swift's cooperative cancellation model. When a task is cancelled, the for await loop exits and the stream's onTermination handler runs, removing system observers and freeing resources.
There are three common cancellation patterns:
// 1. SwiftUI automatic cancellation
struct MyView: View {
var body: some View {
Text("Hello")
.task {
// Cancelled automatically when the view disappears.
for await status in connectivity.statusStream {
// ...
}
}
}
}
// 2. Manual task cancellation
let task = Task {
for await status in connectivity.statusStream {
// ...
}
}
// Later:
task.cancel() // The stream terminates, cleanup runs
// 3. Cooperative cancellation check
for await status in connectivity.statusStream {
// The for-await loop checks Task.isCancelled between iterations.
// When cancelled, the loop exits and onTermination fires.
} Memory safety -- All observer streams use [weak self] in their closures and clean up system observers in onTermination. As long as you cancel the consuming task (which SwiftUI's .task does automatically), there are no retain cycles or leaked subscriptions.