Message Bus
The message bus is the backbone of PocketPaw’s architecture. All communication between channels, the agent loop, and the web dashboard flows through it.
Overview
The message bus implements a simple publish/subscribe pattern. Publishers emit events, and subscribers receive them asynchronously. This decouples components so they don’t need to know about each other.
from pocketclaw.bus.message_bus import MessageBusfrom pocketclaw.bus.events import InboundMessage, OutboundMessage
bus = MessageBus()
# Subscribe to eventsbus.subscribe(InboundMessage, handler_function)
# Publish eventsawait bus.publish(InboundMessage( content="Hello!", channel="telegram", session_id="user_123", metadata={"chat_id": 123456}))Event Types
InboundMessage
Represents user input from any channel:
| Field | Type | Description |
|---|---|---|
content | str | The message text |
channel | str | Source channel (telegram, discord, slack, etc.) |
session_id | str | Unique session identifier |
metadata | dict | Channel-specific metadata (chat_id, thread_ts, etc.) |
OutboundMessage
Agent responses sent back to channels:
| Field | Type | Description |
|---|---|---|
content | str | Response text |
channel | str | Target channel |
session_id | str | Session identifier |
is_stream_chunk | bool | Whether this is a streaming chunk |
is_stream_end | bool | Whether this is the final chunk |
metadata | dict | Channel-specific metadata |
SystemEvent
Internal events consumed by the web dashboard:
| Field | Type | Description |
|---|---|---|
event_type | str | Event type (tool_start, tool_result, thinking, error, inbox_update) |
data | dict | Event-specific data |
session_id | str | Session identifier |
Streaming Protocol
PocketPaw supports real-time streaming of agent responses:
- The agent backend yields response chunks
- Each chunk is published as an
OutboundMessagewithis_stream_chunk=True - The final message includes
is_stream_end=True - Channel adapters handle streaming differently per platform:
- WebSocket — Sends each chunk immediately
- Discord — Buffers chunks and edits messages (1.5s rate limit)
- Slack — Buffers and updates thread messages
- WhatsApp/Signal — Accumulates all chunks, sends on stream end
- Telegram — Edit-in-place streaming
Tool Events
When the agent uses a tool, SystemEvent events are emitted:
SystemEvent(event_type="tool_start", data={"tool": "web_search", "input": {...}})SystemEvent(event_type="tool_result", data={"tool": "web_search", "result": "..."})These events power the Activity panel in the web dashboard, giving users visibility into what tools are being used.