Skip to main content

kafka-mcp-channel

Proof of concept — Bridge Kafka topics into Claude Code sessions via MCP channels.

Claude Code channels are a new primitive that lets external systems push real-time messages into an active Claude session. Instead of Claude pulling for updates, channels deliver events as they happen — like a notification feed wired directly into the conversation.

kafka-mcp-channel connects Apache Kafka to this system. It runs as an MCP server that consumes from Kafka topics and forwards each message into the Claude Code session as a <channel> notification. Claude sees the message inline, with metadata (topic, partition, offset, key) as attributes, and can react to it immediately — triaging errors, summarising deployments, or flagging anomalies without being asked.

<channel source="kafka" topic="deployments" partition="0" offset="42">
{
"event": "deploy",
"service": "api",
"status": "deployed",
"version": "3.1.0"
}
</channel>

This is useful when your team already publishes events to Kafka — deployment notifications, error logs, CI results, alerts — and you want Claude to be aware of them in real time while you work.

Proof of Concept

This project is a proof of concept exploring MCP channels with Kafka. Channels are currently in research preview in Claude Code. The API and behavior may change. See the official channels documentation for the latest on the feature.

How it works

This is a one-way channel: Kafka messages are pushed into your Claude Code session, but Claude doesn't write back to Kafka.

  1. The server declares capabilities.experimental['claude/channel'] so Claude Code registers a notification listener
  2. It connects to Kafka as a consumer and subscribes to configured topics
  3. On each message, it emits notifications/claude/channel with the message content and metadata (topic, partition, offset, key) as <channel> tag attributes

Claude can then manage subscriptions, set filters, and check status using the built-in MCP tools.

Installation

Prerequisites

  • Node.js >= 18
  • A running Kafka broker
  • Claude Code v2.1.80+ with a claude.ai login

Install from source

git clone https://github.com/pipie-io/kafka-mcp-channel.git
cd kafka-mcp-channel
npm install
npm run build

Configure Claude Code

Add to your .mcp.json (project-level) or ~/.claude.json (global):

{
"mcpServers": {
"kafka": {
"command": "node",
"args": ["/path/to/kafka-mcp-channel/dist/index.js"],
"env": {
"KAFKA_BROKERS": "localhost:9092",
"KAFKA_TOPICS": "deployments,alerts,errors"
}
}
}
}

Launch with channels enabled

During the research preview, custom channels need the development flag:

claude --dangerously-load-development-channels server:kafka

Without this flag, the MCP tools still work but push notifications won't arrive.

Configuration

All configuration is via environment variables:

VariableRequiredDefaultDescription
KAFKA_BROKERSYesComma-separated broker list
KAFKA_TOPICSYesComma-separated topics to subscribe to
KAFKA_GROUP_IDNoclaude-mcp-channelConsumer group ID
KAFKA_SASL_USERNAMENoSASL username
KAFKA_SASL_PASSWORDNoSASL password
KAFKA_SASL_MECHANISMNoplainplain, scram-sha-256, scram-sha-512
KAFKA_SSLNofalseEnable SSL
KAFKA_FILTERNoInitial regex filter on message value
KAFKA_FROM_BEGINNINGNofalseStart from beginning of topic

MCP Tools

Claude can manage the channel at runtime:

subscribe_topic

Start listening to a new Kafka topic.

subscribe_topic({ topic: "new-events", from_beginning: true })

unsubscribe_topic

Stop receiving messages from a topic.

unsubscribe_topic({ topic: "noisy-topic" })

list_subscriptions

See which topics are currently subscribed.

set_filter

Set a filter to control which messages get forwarded. All conditions are AND'd together.

ParamDescription
patternRegex on message value (or the field specified by field)
key_patternRegex on the message key
fieldJSON field path to match against instead of full value (e.g. .level)
topicScope filter to one topic only

Examples:

# Only errors from structured logs
set_filter({ pattern: "error|fatal", field: ".level" })

# Only production keys
set_filter({ key_pattern: "prod-.*" })

# Only payments errors in production
set_filter({ pattern: "error", field: ".level", key_pattern: "prod-payments.*" })

# Clear filter
set_filter({})

get_status

Check connection state, subscribed topics, message counts, and active filters.

Quick test

Start a local Kafka and send a test event:

# Start Kafka
docker compose -f docker-compose.test.yml up -d

# Wait for startup, then send a message
echo '{"event":"deploy","service":"api","status":"deployed"}' | \
docker exec -i $(docker ps -q --filter ancestor=apache/kafka:3.9.0) \
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic deployments

Or use the included test script:

./test.sh

Development

npm install
npm run build
npm test # 60 unit tests
npm run test:integration # integration tests (requires Docker)