Skip to main content

NATS JetStream

WA-RS integrates with NATS JetStream for durable event streaming and queue-based outbound messaging. NATS is optional — if NATS_URL is not set, the API works exactly as before using webhooks only.

Overview

NATS JetStream provides two capabilities:

  1. Incoming Events — All WhatsApp events are published to a JetStream stream, allowing external consumers to subscribe and process events with delivery guarantees
  2. Outbound Messages — External systems can publish message commands to a queue, and WA-RS will consume and send them via WhatsApp
┌──────────────┐     wa.events.{session}.{type}     ┌──────────────────┐
│ WA-RS │ ─────────────────────────────────► │ Your Consumer │
│ Gateway │ │ (subscribe) │
│ │ ◄───────────────────────────────── │ │
└──────────────┘ wa.send.{session} │ Your Publisher │
└──────────────────┘

Configuration

Environment Variables

VariableDefaultDescription
NATS_URL(none)NATS server URL. Required to enable NATS
NATS_EVENTS_STREAMWA_EVENTSStream name for incoming events
NATS_SEND_STREAMWA_SENDStream name for outbound commands
NATS_EVENTS_MAX_AGE_DAYS7Max age for events (days)
NATS_SEND_MAX_AGE_DAYS1Max age for outbound commands (days)
NATS_TOKEN(none)Optional authentication token
NATS_CREDS_FILE(none)Optional NATS credentials file path

Docker Compose

The default docker-compose.yml includes a NATS server with JetStream enabled:

nats:
image: nats:2.10-alpine
container_name: wagateway-nats
command: ["--jetstream", "--store_dir=/data"]
volumes:
- nats_data:/data
ports:
- "4222:4222" # Client connections
- "8222:8222" # HTTP monitoring
restart: unless-stopped

The API service connects automatically:

api:
environment:
NATS_URL: nats://nats:4222

Disabling NATS

To run without NATS, simply remove the NATS_URL environment variable. The API will function in webhooks-only mode.


Subject Hierarchy

wa.events.{session_id}.{event_type}   ← incoming WhatsApp events
wa.send.{session_id} ← outbound message commands

Examples

SubjectDescription
wa.events.my-session.messageNew message on session "my-session"
wa.events.my-session.connectedSession connected
wa.events.my-session.send_resultResult of an outbound command
wa.events.*.connectedAll sessions' connected events
wa.events.>Subscribe to all events
wa.send.my-sessionSend a message via "my-session"

JetStream Streams

StreamSubjectsRetentionMax AgeMax SizePurpose
WA_EVENTSwa.events.>Limits7 days1 GBIncoming events
WA_SENDwa.send.>WorkQueue1 day512 MBOutbound commands

The WA_SEND stream uses WorkQueue retention — messages are removed after acknowledgment.


Incoming Events

All WhatsApp events that are sent to webhooks are also published to NATS. The payload format is identical to webhook payloads:

{
"session_id": "my-session",
"event": "message",
"timestamp": 1700000000,
"data": {
"from": "628123456789@s.whatsapp.net",
"chat": "628123456789@s.whatsapp.net",
"message_id": "3EB0ABC123..."
}
}

Subscribing (NATS CLI)

# Subscribe to all events
nats sub "wa.events.>"

# Subscribe to messages only
nats sub "wa.events.*.message"

# Subscribe to a specific session
nats sub "wa.events.my-session.>"

Subscribing (Node.js)

import { connect, JSONCodec } from 'nats';

const nc = await connect({ servers: 'nats://localhost:4222' });
const js = nc.jetstream();
const jc = JSONCodec();

const sub = await js.subscribe('wa.events.>', {
durable: 'my-consumer',
ack_policy: 'explicit',
});

for await (const msg of sub) {
const event = jc.decode(msg.data);
console.log(`[${event.event}] ${event.session_id}:`, event.data);
msg.ack();
}

Subscribing (Python)

import asyncio
import nats

async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()

sub = await js.subscribe("wa.events.>", durable="my-consumer")
async for msg in sub.messages:
print(f"Received: {msg.data.decode()}")
await msg.ack()

asyncio.run(main())

Outbound Messages

Publish message commands to wa.send.{session_id} and WA-RS will consume and send them via WhatsApp.

Consumer Details

PropertyValue
Consumer namewa-send-worker
Ack policyExplicit
Ack wait30 seconds
Max retries3
Retry delay5 seconds (NAK)

Command Format

All commands are JSON objects with a type field:

{
"type": "text",
"to": "628123456789",
"text": "Hello from NATS!",
"request_id": "optional-tracking-uuid"
}

The request_id field is optional. If provided, it will be included in the send_result event.

Supported Message Types

Text

{
"type": "text",
"to": "628123456789",
"text": "Hello!",
"request_id": "uuid"
}

Image

{
"type": "image",
"to": "628123456789",
"image": { "url": "https://example.com/photo.jpg" },
"caption": "Check this out",
"request_id": "uuid"
}

Video

{
"type": "video",
"to": "628123456789",
"video": { "url": "https://example.com/video.mp4" },
"caption": "Watch this",
"request_id": "uuid"
}

Audio

{
"type": "audio",
"to": "628123456789",
"audio": { "url": "https://example.com/audio.ogg" },
"ptt": true,
"request_id": "uuid"
}

Document

{
"type": "document",
"to": "628123456789",
"document": { "url": "https://example.com/file.pdf" },
"filename": "report.pdf",
"caption": "Monthly report",
"request_id": "uuid"
}

Sticker

{
"type": "sticker",
"to": "628123456789",
"sticker": { "url": "https://example.com/sticker.webp" },
"request_id": "uuid"
}

Location

{
"type": "location",
"to": "628123456789",
"latitude": -6.2088,
"longitude": 106.8456,
"name": "Jakarta",
"address": "DKI Jakarta, Indonesia",
"request_id": "uuid"
}

Contact

{
"type": "contact",
"to": "628123456789",
"contact": {
"display_name": "John Doe",
"phones": [
{ "number": "+628111222333", "phone_type": "CELL" }
]
},
"request_id": "uuid"
}

Reaction

{
"type": "reaction",
"to": "628123456789",
"message_id": "3EB0ABC123...",
"emoji": "👍",
"request_id": "uuid"
}

Poll

{
"type": "poll",
"to": "628123456789",
"name": "Favorite color?",
"options": ["Red", "Green", "Blue"],
"selectable_count": 1,
"request_id": "uuid"
}

Buttons

{
"type": "buttons",
"to": "628123456789",
"content_text": "Choose an option:",
"footer": "Tap a button",
"buttons": [
{ "button_id": "yes", "display_text": "Yes" },
{ "button_id": "no", "display_text": "No" }
],
"header_text": "Confirmation",
"request_id": "uuid"
}

List

{
"type": "list",
"to": "628123456789",
"title": "Menu",
"description": "Choose from the menu:",
"button_text": "View Menu",
"sections": [
{
"title": "Food",
"rows": [
{ "row_id": "pizza", "title": "Pizza", "description": "$10" },
{ "row_id": "burger", "title": "Burger", "description": "$8" }
]
}
],
"footer": "Prices include tax",
"request_id": "uuid"
}

Interactive

{
"type": "interactive",
"to": "628123456789",
"body_text": "Click a button",
"footer_text": "Powered by WA-RS",
"buttons": [
{
"name": "quick_reply",
"button_params_json": "{\"display_text\":\"Help\",\"id\":\"help\"}"
}
],
"request_id": "uuid"
}

Revoke

{
"type": "revoke",
"to": "628123456789",
"message_id": "3EB0ABC123...",
"original_sender": "628987654321",
"request_id": "uuid"
}

Edit

{
"type": "edit",
"to": "628123456789",
"message_id": "3EB0ABC123...",
"text": "Updated text",
"request_id": "uuid"
}

Read

{
"type": "read",
"chat_jid": "628123456789@s.whatsapp.net",
"sender": "628123456789@s.whatsapp.net",
"message_ids": ["3EB0ABC123..."],
"request_id": "uuid"
}

Media Data Formats

For media fields (image, video, audio, document, sticker), you can use either URL or base64:

// URL
{ "url": "https://example.com/image.jpg" }

// Base64
{ "data": "iVBORw0KGgo...", "mimetype": "image/png" }

Publishing Commands (NATS CLI)

# Send a text message
nats pub "wa.send.my-session" '{"type":"text","to":"628123456789","text":"Hello from NATS!"}'

# Send an image
nats pub "wa.send.my-session" '{"type":"image","to":"628123456789","image":{"url":"https://example.com/photo.jpg"},"caption":"NATS image"}'

Send Results

After processing an outbound command, WA-RS publishes a result to wa.events.{session_id}.send_result:

{
"request_id": "uuid",
"success": true,
"message_id": "3EB0DEF456...",
"error": null,
"timestamp": 1700000000
}

On failure:

{
"request_id": "uuid",
"success": false,
"message_id": null,
"error": "Session error: Client not connected",
"timestamp": 1700000000
}

REST API Endpoints

Get NATS Status

GET /api/v1/nats/status

Returns connection status and stream information.

Response

{
"enabled": true,
"connected": true,
"url": "nats://localhost:4222",
"events_stream": {
"name": "WA_EVENTS",
"messages": 1234,
"bytes": 567890,
"consumer_count": 2,
"first_seq": 1,
"last_seq": 1234
},
"send_stream": {
"name": "WA_SEND",
"messages": 0,
"bytes": 0,
"consumer_count": 1,
"first_seq": 1,
"last_seq": 56
}
}

When NATS is disabled:

{
"enabled": false,
"connected": false,
"url": null,
"events_stream": null,
"send_stream": null
}

Purge Stream

POST /api/v1/nats/streams/{stream_name}/purge

Remove all messages from a stream.

ParameterTypeDescription
stream_namepathStream name (WA_EVENTS or WA_SEND)

Response

{
"success": true,
"message": "Stream 'WA_EVENTS' purged"
}

List Consumers

GET /api/v1/nats/streams/{stream_name}/consumers

Get consumer count for a stream.

ParameterTypeDescription
stream_namepathStream name (WA_EVENTS or WA_SEND)

Response

{
"success": true,
"stream": "WA_EVENTS",
"consumer_count": 2
}

Error Handling

ScenarioBehavior
NATS_URL not setNATS disabled entirely, webhooks-only mode
NATS unreachable at startupLog warning, continue without NATS
NATS publish fails (incoming)Log warning, does not block webhooks
Outbound command parse failsNAK, retry up to 3x, then dropped
Session not connected (outbound)NAK with 5s delay for retry
WhatsApp send fails (outbound)NAK with 5s delay for retry

Monitoring

NATS HTTP Monitoring

If port 8222 is exposed, you can monitor NATS at:

http://localhost:8222/varz      # Server info
http://localhost:8222/jsz # JetStream info
http://localhost:8222/connz # Connections

NATS CLI

# Check server status
nats server info

# List streams
nats stream ls

# Stream details
nats stream info WA_EVENTS
nats stream info WA_SEND

# Consumer details
nats consumer info WA_SEND wa-send-worker

# Watch events in real-time
nats sub "wa.events.>" --last=10