What is the bus?
The worker bus is the communication backbone of every multi-worker system. All workers connect to the same bus and exchange typed messages for frame routing, lifecycle events, and job coordination.
Think of it as an internal event bus — workers publish messages and other workers receive them through their subscriptions. The bus handles priority queuing so that urgent messages (like cancellations) are delivered before queued data.
Bus implementations
Pipecat provides three bus implementations:
AsyncQueueBus (local)
The default bus, created automatically by WorkerRunner when you don’t provide one. It uses asyncio queues for in-process communication with no serialization overhead.
runner = WorkerRunner() # Creates AsyncQueueBus automatically
This is all you need for single-process applications where all workers run together.
RedisBus (distributed)
For distributed setups where workers run in separate processes or on different machines, use RedisBus. It uses Redis pub/sub to relay messages across process boundaries.
from redis.asyncio import Redis
from pipecat.bus.network.redis import RedisBus
redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus)
All processes that share the same Redis channel can exchange messages. The programming model stays the same — your agent code doesn’t change between local and distributed setups.
RedisBus requires the redis extra: uv add "pipecat-ai[redis]"
PgmqBus (distributed)
An alternative distributed bus backed by PGMQ (PostgreSQL Message Queue). Each instance creates its own queue and broadcasts to peer queues.
from pgmq.async_queue import PGMQueue
from pipecat.bus.network.pgmq import PgmqBus
pgmq = PGMQueue(
host="localhost",
port="5432",
database="postgres",
username="postgres",
password="...",
pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus)
PgmqBus requires the pgmq extra: uv add "pipecat-ai[pgmq]"
Message types
Messages on the bus fall into four categories:
Data messages
Normal-priority messages that carry data between agents. The most important one is BusFrameMessage, which wraps a Pipecat frame (audio, text, etc.) for transport across the bus.
System messages
High-priority messages for lifecycle events: activation, deactivation, shutdown, and worker readiness. These are delivered before data messages in the queue.
Job messages
Messages for coordinating work between agents: job requests, responses, progress updates, streaming, and cancellation.
Local messages
Some messages are local-only and never cross process boundaries. For example, child agent errors stay local to the parent. This keeps internal state from leaking across distributed runners.
Message routing
Messages have source and target fields:
- Targeted messages (with a specific
target) are delivered only to the named agent
- Broadcast messages (with no
target) are delivered to all subscribers
The bus handles this routing automatically. When you call activate_worker("greeter"), it sends a BusActivateWorkerMessage targeted at "greeter" — only that agent receives it.
The agent registry
The runner maintains a WorkerRegistry that tracks which agents are available. To get notified when an agent is ready, use the @worker_ready decorator (or call watch_workers() explicitly):
from pipecat.pipeline.base_worker import BaseWorker
from pipecat.pipeline.worker_ready_decorator import worker_ready
from pipecat.registry.types import WorkerReadyData
class MainAgent(BaseWorker):
@worker_ready(name="greeter")
async def on_greeter_ready(self, data: WorkerReadyData) -> None:
await self.activate_worker("greeter")
The framework automatically calls watch_workers() for each @worker_ready handler when the agent starts. If the watched agent is already registered, the handler fires immediately, so you don’t need to worry about race conditions.