Install Quickstart GitHub
agentmesh banner

Agent-native event bus.

Connect agents, humans, and systems via typed events. Every event carries tenant, trace, causality, and context — not just bytes.

CI PyPI Python License

Installation

pip install agentmesh-bus                    # zero-dep core
pip install "agentmesh-bus[redis]"           # Redis Streams transport
pip install "agentmesh-bus[nats]"            # NATS JetStream transport
pip install "agentmesh-bus[kafka]"           # Kafka transport (aiokafka)
pip install "agentmesh-bus[sqlite]"          # SQLite event store
pip install "agentmesh-bus[otel]"            # OpenTelemetry metrics + traces
pip install "agentmesh-bus[all]"             # everything
Zero deps to start The in-process transport and JSONL store require no external infrastructure. Start locally, graduate to Redis or Kafka when you need scale.

Quickstart

import asyncio
from agentmesh import AgentMesh, AgentEvent

async def main():
    mesh = AgentMesh()
    await mesh.start()

    # Subscribe with a wildcard — catches order.created, order.updated, ...
    @mesh.subscribe("order.*")
    async def handle(e: AgentEvent) -> None:
        print(f"[{e.tenant_id}] {e.event_type}: {e.data}")

    # Publish — tenant-scoped, trace-correlated, causally-linked
    await mesh.publish("order.created",
        data={"order_id": "ORD-001", "amount": 299.99},
        publisher_id="billing-agent",
        session_id="sess-001", run_id="run-001",
        tenant_id="acme",
    )

    await asyncio.sleep(0.1)
    print(mesh.stats())
    await mesh.close()

asyncio.run(main())

Why AgentMesh

Kafka and Redis move bytes. AgentMesh moves meaning. Use Kafka as a transport under AgentMesh when you need production scale — same API either way.

Kafka / RedisAgentMesh
Tenant isolationManual bolt-on✓ Built-in on every event
Trace propagationManual✓ trace_id propagates automatically
Causality chainNone✓ caused_by_event_id
Human publishersNo concept✓ publisher_type="human"
Policy enforcementExternal middleware✓ agentplane integration
Zero-dep startNeeds JVM / server✓ pip install, run
Typed event taxonomyRaw bytes / strings✓ 14 categories, 100+ types
Always persistentConfig required✓ JSONL by default

Event Envelope

Every event — regardless of type — carries this standard envelope:

@dataclass
class AgentEvent:
    # Identity
    event_id:           str       # UUID — also the idempotency key
    event_type:         str       # "order.created"
    topic:              str       # "acme:order.created"
    schema_version:     str       # "1.0"
    timestamp:          float     # unix epoch

    # Execution tree
    session_id:         str       # top-level agent session
    run_id:             str       # current operation
    parent_run_id:      str | None  # nesting
    caused_by_event_id: str | None  # causality chain

    # OTel propagation
    trace_id:           str | None
    span_id:            str | None

    # Source
    publisher_id:       str       # agent_id or user_id
    publisher_type:     str       # "agent" | "human" | "system"
    tenant_id:          str | None
    agent_id:           str | None
    agent_name:         str | None

    # Routing
    delivery_mode:      str       # "broadcast" | "exclusive"
    ttl_s:              float | None

    # Payload
    data:               dict[str, Any]
    tags:               list[str]
    metadata:           dict[str, Any]

Topics & Wildcards

Topics follow category.entity.verb naming. NATS-style wildcards are supported:

PatternMatchesNot
order.createdExact matchorder.updated
order.*order.created, order.updatedorder.item.created
order.>order.created, order.item.created, order.item.v.updatedpayment.created
*.createdorder.created, payment.createdorder.updated
>Everything
acme:order.*acme:order.created (ACME only)siemens:order.created
acme:>All ACME topicssiemens:*
Tenant namespacing Prefix topics with tenant_id: for hard tenant isolation. The mesh enforces that publishers cannot publish to another tenant's namespace.

Publisher Types

Three types of publishers — all first-class:

🤖

Agent

publisher_type="agent" — automated, high-frequency. Most events in production.

🧑

Human

publisher_type="human" — deliberate, awaited. Human decisions are audited and governed like any other event.

⚙️

System

publisher_type="system" — infrastructure. The mesh itself, agentplane, agentguard publish here.

Consumer Groups

Consumer groups enable competing consumers — only one subscriber in the group processes each event:

# Both workers subscribe to the same topic + group
@mesh.subscribe("payment.initiated", group="billing-workers")
async def worker_1(e: AgentEvent) -> None:
    await charge_card(e.data)

@mesh.subscribe("payment.initiated", group="billing-workers")
async def worker_2(e: AgentEvent) -> None:
    await charge_card(e.data)

# Each payment event is processed by exactly ONE worker
# Round-robin load balancing across group members
Broadcast vs Exclusive Default delivery is broadcast — all subscribers get every event. Add group= for exclusive delivery within a group.

Replay

Every event is persisted before delivery. Replay any topic from any point in time:

# Replay all events
async for event in mesh.replay("order.created"):
    print(event.data)

# Replay last 24 hours
import time
async for event in mesh.replay("order.created", since=time.time() - 86400):
    print(event.data)

# Replay a specific window
async for event in mesh.replay("payment.charged",
                               since=start_ts, until=end_ts):
    await reconcile(event)

Dead Letter Queue

# Configure per topic
mesh.configure_topic("payment.initiated",
    dlq=True,
    max_retries=3,
    retry_backoff_ms=500,
)

# Fire-and-forget topics don't need DLQ
mesh.configure_topic("system.heartbeat", dlq=False)

# Inspect + retry
async for dead in mesh.dlq("payment.initiated"):
    print(f"Failed {dead.attempts}x: {dead.error}")
    await mesh.retry(dead)

Server-side Filters

Evaluated before delivery — only matching events reach your handler:

# Only high-value orders
@mesh.subscribe("order.created",
    filter={"data.amount": {"$gt": 1000}, "tenant_id": "acme"})
async def handle_big_orders(e: AgentEvent) -> None:
    await notify_account_manager(e)

# Supported operators: $gt, $lt, $gte, $lte, $in, $ne
# Dot-notation for nested fields: "data.customer.tier"

Idempotency

Set a deterministic event_id to make publish idempotent — safe to retry on network failure:

import uuid

# Deterministic ID — same order always produces same event_id
order_event_id = str(uuid.uuid5(uuid.NAMESPACE_URL, f"order.created:{order_id}"))

await mesh.publish("order.created",
    data={"order_id": order_id},
    publisher_id="shop-agent",
    session_id="s1", run_id="r1",
    event_id=order_event_id,   # ← publish 10x, delivered 1x
)

Pause / Resume

# Pause during deployment — queue events, don't deliver
await mesh.pause("payment.initiated")

# Deploy new billing logic here...

# Resume — flush queued events to subscribers
await mesh.resume("payment.initiated")

Human-in-the-Loop

# Human reviewer subscribes and responds
@mesh.subscribe("human.approval.requested")
async def reviewer(e: AgentEvent) -> None:
    action = e.data["action"]
    amount = e.data["amount"]
    request_id = e.data["_request_id"]

    print(f"⚠  Review required: {action} ${amount:,.2f}")
    # In production: send to Slack/email, wait for click

    await mesh.publish(f"_reply.{request_id}",
        data={"approved": True, "approver": "alice@acme.com"},
        publisher_id="alice", publisher_type="human",
        session_id=e.session_id, run_id=e.run_id,
    )

# Agent awaits human decision
response = await mesh.request(
    "human.approval.requested",
    data={"action": "wire_transfer", "amount": 50_000},
    publisher_id="billing-agent",
    session_id="sess-001", run_id="run-001",
    timeout_s=300.0,   # 5 minutes
    fallback=None,     # None = timeout → denied
)

Transport: In-Process (default)

Zero external dependencies. Uses asyncio queues. Works within a single process. Perfect for development and single-service deployments.

mesh = AgentMesh()   # InProcessTransport by default

Transport: Redis

pip install "agentmesh-bus[redis]"
from agentmesh.transport.redis import RedisTransport

mesh = AgentMesh(transport=RedisTransport("redis://localhost:6379"))

Transport: Kafka

pip install "agentmesh-bus[kafka]"
from agentmesh.transport.kafka import KafkaTransport

mesh = AgentMesh(transport=KafkaTransport(
    brokers=["kafka:9092"],
))
Same API, every transport Switch from in-process to Kafka with one line. All features — wildcards, consumer groups, replay, DLQ — work identically.

OpenTelemetry — Grafana, Prometheus, Datadog, CloudWatch

AgentMesh emits standard OTel spans and metrics. Wire once to your OTel Collector — all backends receive the same data.

pip install "agentmesh-bus[otel]" opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc

Grafana / Prometheus

from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Metrics → Prometheus → Grafana
reader = PrometheusMetricReader()   # scrape at :9464/metrics
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)

# Traces → Tempo → Grafana
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(tracer_provider)

# agentmesh picks it up automatically
from agentmesh import AgentMesh
mesh = AgentMesh(otel_enabled=True)

Datadog

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(
    OTLPSpanExporter(
        endpoint="https://trace.agent.datadoghq.com",
        headers={"DD-API-KEY": "your-api-key"},
    )
))
trace.set_tracer_provider(tracer_provider)

AWS CloudWatch

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Point at AWS Distro for OpenTelemetry Collector (ADOT)
tracer_provider.add_span_processor(BatchSpanProcessor(
    OTLPSpanExporter(endpoint="http://localhost:4317")  # ADOT collector sidecar
))
Zero vendor lock-in AgentMesh uses the OTel API — never a vendor SDK. Switch backends by changing the exporter, not the application code.

Metrics emitted

agentmesh.events.published    # Counter   {topic, tenant_id, publisher_type}
agentmesh.events.delivered    # Counter   {topic, tenant_id, subscriber_id}
agentmesh.events.failed       # Counter   {topic, tenant_id, error}
agentmesh.delivery.latency_ms # Histogram {topic}

Integration: agentplane

from agentplane import PolicyEngine, Policy, Selector, AllowlistRule

engine = PolicyEngine()
engine.add_policy(Policy(
    id="mesh.policy",
    selector=Selector(tenants=["acme"]),
    blocking=[AllowlistRule(tools=["order.*", "payment.*"])],
))

# Policy checked before every publish and delivery
mesh = AgentMesh(policy_engine=engine)

Integration: agenthooks

from agenthooks import HookRegistry
from agentmesh.integrations.agenthooks import register_mesh_hooks

registry = HookRegistry()
mesh = AgentMesh()
register_mesh_hooks(mesh, registry)

# Hook points: agentmesh.before_publish, agentmesh.after_publish,
#              agentmesh.before_deliver, agentmesh.after_deliver

@registry.implement("agentmesh.before_publish")
async def enrich(ctx):
    return ctx.enrich("region", "eu-west-1")

14 Event Categories

Researched across LangGraph, CrewAI, AutoGen, Anthropic, OpenAI, MCP, A2A protocol, OTel GenAI conventions, and Semantic Kernel.

CategoryDescriptionFrameworks
session.*Agent session lifecycleAll
llm.*LLM inference + streamingAnthropic, OpenAI, LangGraph
tool.*Tool/function executionAll
memory.*Memory read/writeCrewAI, OTel
retrieval.*RAG / knowledge searchLangGraph, CrewAI, OTel
human.*Human-in-the-loopCrewAI, AutoGen, MCP, A2A
agent.*Reasoning + planningCrewAI, Anthropic thinking
multiagent.*Agent-to-agent coordinationA2A, AutoGen, LangGraph
connection.*External service connectionsMCP, A2A
resource.*MCP resourcesMCP
guardrail.*Safety + policy enforcementCrewAI, SK, AgentGuard
flow.*Orchestration + workflowCrewAI, LangGraph
system.*Infrastructure + opsAll
push.*Async webhook deliveryA2A

API Reference

AgentMesh

class AgentMesh:
    def __init__(self,
        transport: Transport | None = None,   # default: InProcessTransport
        store: EventStore | None = None,      # default: JsonlStore
        store_path: str = "~/.agentmesh/events.jsonl",
        dedup_window_s: float = 86400.0,      # 24h dedup window
        policy_engine: Any | None = None,
        hook_registry: Any | None = None,
        otel_enabled: bool = True,
    ) -> None: ...

    async def start(self) -> None: ...
    async def close(self) -> None: ...

    async def publish(self, topic: str, data: dict,
        publisher_id: str, session_id: str, run_id: str,
        publisher_type: str = "agent",
        event_id: str | None = None,
        tenant_id: str | None = None,
        caused_by_event_id: str | None = None,
        trace_id: str | None = None,
        tags: list[str] | None = None,
        ttl_s: float | None = None,
    ) -> AgentEvent: ...

    def subscribe(self, topic: str,
        group: str | None = None,
        filter: dict | None = None,
    ) -> Callable: ...          # use as decorator

    def unsubscribe(self, topic: str, handler: Callable) -> None: ...

    async def request(self, topic: str, data: dict,
        publisher_id: str, session_id: str, run_id: str,
        timeout_s: float = 30.0,
        fallback: Any = None,
    ) -> Any: ...

    async def replay(self, topic: str,
        since: float = 0.0,
        until: float | None = None,
    ) -> AsyncIterator[AgentEvent]: ...

    def configure_topic(self, topic: str,
        dlq: bool = True, max_retries: int = 3,
        retry_backoff_ms: int = 500,
        ttl_s: float | None = None,
    ) -> None: ...

    async def pause(self, topic: str) -> None: ...
    async def resume(self, topic: str) -> None: ...
    def stats(self) -> dict: ...
    async def dlq(self, topic: str) -> AsyncIterator[DeadEvent]: ...
    async def retry(self, dead: DeadEvent) -> None: ...

Examples

👋

Hello Mesh

Simplest possible example. Publish one event, subscribe, print.


examples/01_hello_mesh.py
👥

Consumer Groups

4 payment events, 2 workers — each event handled by exactly one.


examples/02_consumer_groups.py
🧑

Human-in-the-Loop

Agent requests approval, mock human reviews, mesh routes response.


examples/03_human_in_the_loop.py
⏮️

Replay

Publish 5 events, restart mesh, replay full history from JSONL.


examples/04_replay.py
🔗

Multi-Agent Workflow

order → inventory → billing → notification using causality chains.


examples/05_multi_agent_workflow.py

More examples on
agent-examples ↗

pip install agentmesh-bus
python examples/01_hello_mesh.py

The Stack

agentmesh is the connective tissue. Every other layer emits into it or reacts to it.

agent human system agentmesh pub/sub · consumer groups · replay · DLQ · wildcards · OTel · dedup · filters agentplane policy agenthooks hooks AgentGuard safety agentregistry discovery agenteval testing transport layer in-process Redis NATS Kafka
agentmesh       # event bus       connects agents, humans, systems
agentplane      # control plane   runtime policy, versioning, escalation
agenthooks      # extensibility   hookpoints, customer hooks
AgentGuard      # safety          injection, PII, jailbreak
agentregistry   # discovery       publish, version, deploy agents
agenteval       # quality         golden, adversarial, policy tests
agentobserve    # observability   unified dashboard