Message Bus

The message bus is the backbone of PocketPaw’s architecture. All communication between channels, the agent loop, and the web dashboard flows through it.

Overview

The message bus implements a simple publish/subscribe pattern. Publishers emit events, and subscribers receive them asynchronously. This decouples components so they don’t need to know about each other.

from pocketclaw.bus.message_bus import MessageBus
from pocketclaw.bus.events import InboundMessage, OutboundMessage
bus = MessageBus()
# Subscribe to events
bus.subscribe(InboundMessage, handler_function)
# Publish events
await bus.publish(InboundMessage(
content="Hello!",
channel="telegram",
session_id="user_123",
metadata={"chat_id": 123456}
))

Event Types

InboundMessage

Represents user input from any channel:

FieldTypeDescription
contentstrThe message text
channelstrSource channel (telegram, discord, slack, etc.)
session_idstrUnique session identifier
metadatadictChannel-specific metadata (chat_id, thread_ts, etc.)

OutboundMessage

Agent responses sent back to channels:

FieldTypeDescription
contentstrResponse text
channelstrTarget channel
session_idstrSession identifier
is_stream_chunkboolWhether this is a streaming chunk
is_stream_endboolWhether this is the final chunk
metadatadictChannel-specific metadata

SystemEvent

Internal events consumed by the web dashboard:

FieldTypeDescription
event_typestrEvent type (tool_start, tool_result, thinking, error, inbox_update)
datadictEvent-specific data
session_idstrSession identifier

Streaming Protocol

PocketPaw supports real-time streaming of agent responses:

  1. The agent backend yields response chunks
  2. Each chunk is published as an OutboundMessage with is_stream_chunk=True
  3. The final message includes is_stream_end=True
  4. Channel adapters handle streaming differently per platform:
    • WebSocket — Sends each chunk immediately
    • Discord — Buffers chunks and edits messages (1.5s rate limit)
    • Slack — Buffers and updates thread messages
    • WhatsApp/Signal — Accumulates all chunks, sends on stream end
    • Telegram — Edit-in-place streaming

Tool Events

When the agent uses a tool, SystemEvent events are emitted:

SystemEvent(event_type="tool_start", data={"tool": "web_search", "input": {...}})
SystemEvent(event_type="tool_result", data={"tool": "web_search", "result": "..."})

These events power the Activity panel in the web dashboard, giving users visibility into what tools are being used.