Messaging APIs
Publish, subscribe, and message history APIs for real-time communication
Messaging APIs
Complete guide to real-time messaging with publish/subscribe patterns, advanced subscription options, and message history retrieval.
Real-Time Messaging
subscribe(topic, handler, options?): Promise<void>
Subscribe to messages on a topic with full acknowledgement support.
Parameters:
topic: string
- Topic name to subscribe tohandler: (message: MessageBody) => void
- Message callbackonAck?: SubscriptionCallback
- Optional subscription acknowledgement callbacktimeoutMs?: number
- Subscription timeout in milliseconds (default: 10000)options?: SubscribeOptions
- Optional configuration
Message Format:
interface MessageBody {
id: string; // Unique message ID
topic: string; // Topic name
senderId: string; // Sender's client ID
seq: string; // Sequence number (ULID)
sentAt: Date; // Server timestamp
payload: string; // Message content
clientMsgId?: string; // Optional client correlation ID
}
SubscribeOptions:
interface SubscribeOptions {
streamOldMessages?: boolean; // Deliver missed messages (default: false)
}
type SubscriptionCallback = (response: {
success: boolean;
error?: { code: string; message: string };
}) => void;
Examples:
// Simple subscription
await client.subscribe("chat:lobby", (msg) => {
console.log(`${msg.senderId}: ${msg.payload}`);
});
// With options
await client.subscribe(
"chat:lobby",
(msg) => console.log(msg),
{ streamOldMessages: true }
);
// With acknowledgement callback
await client.subscribe(
"chat:lobby",
(msg) => console.log("Message:", msg.payload),
(response) => {
if (response.success) {
console.log("Subscribed successfully");
} else {
console.error("Subscription failed:", response.error?.message);
}
}
);
// Full control: acknowledgement, timeout, and options
await client.subscribe(
"chat:lobby",
(msg) => console.log("Message:", msg.payload),
(response) => {
if (response.success) {
console.log("Subscription confirmed");
}
},
5000, // 5 second timeout
{ streamOldMessages: true }
);
unsubscribe(topic: string): void
Unsubscribe from a topic.
Parameters:
topic: string
- Topic to unsubscribe from
// Simple unsubscribe
client.unsubscribe("chat:lobby");
// With acknowledgement callback (optional)
client.unsubscribe(
"chat:lobby",
(response) => {
if (response.success) {
console.log("Unsubscribed successfully");
}
},
5000 // timeout in ms
);
publish(topic, payload): Promise<string>
Publish a message without acknowledgement.
Parameters:
topic: string
- Topic to publish topayload: string
- Message content
Returns: Promise<string>
- Client message ID
const msgId = await client.publish("chat:lobby", "Hello!");
publishWithAck(topic, payload, onAck, timeout?): Promise<string>
Publish with server acknowledgement.
Parameters:
topic: string
- Topic to publish topayload: string
- Message contentonAck: AckCallback
- Acknowledgement callbacktimeout?: number
- Timeout in milliseconds (default: 3000)
AckCallback Type:
type AckCallback = (response: AckResponse) => void;
type AckResponse = {
success: boolean;
ack?: AckPacketType;
seq?: string;
serverMsgId?: string;
topic: string;
error?: {
code: string;
message: string;
};
};
Example:
await client.publishWithAck(
"chat:lobby",
"Hello!",
(ack) => {
if (ack.success) {
console.log("Message delivered! Seq:", ack.seq);
} else {
console.error("Failed:", ack.error?.message);
}
},
5000
);
Message History
Retrieve past messages from any topic with cursor-based pagination. Messages are stored for 3 days with up to 100 messages per topic.
Quick Start
// Fetch latest 50 messages
const history = await client.getHistory("chat:lobby");
history.items.forEach(msg => {
console.log(`[${msg.seq}] ${msg.senderId}: ${msg.payload}`);
});
// Paginate with cursor
if (history.nextCursor) {
const nextPage = await client.getHistory("chat:lobby", {
cursor: history.nextCursor,
limit: 50
});
}
Pagination Iterator
For cleaner pagination code, use the history iterator:
const getNext = client.createHistoryIterator("chat:lobby", { limit: 50 });
const page1 = await getNext(); // { items: [...], hasMore: true }
const page2 = await getNext(); // { items: [...], hasMore: false }
const done = await getNext(); // null (exhausted)
For comprehensive history features, see the full guide:
- Pagination patterns (infinite scroll, load more)
- Catch-up after reconnection
- Search in history
- Time-based queries with ULID
- Performance optimization tips
Advanced Messaging Patterns
Message Acknowledgment Patterns
Fire-and-Forget
// Simple publish without confirmation
await client.publish("notifications", "System update complete");
Reliable Delivery
// Publish with acknowledgment
await client.publishWithAck(
"critical-alerts",
"Server maintenance starting",
(ack) => {
if (ack.success) {
console.log("Alert delivered successfully");
} else {
// Retry logic or fallback notification
console.error("Alert delivery failed:", ack.error?.message);
}
},
5000
);
Batch Publishing
async function publishBatch(topic: string, messages: string[]) {
const promises = messages.map(msg =>
client.publishWithAck(topic, msg, (ack) => {
console.log(`Message ${msg}: ${ack.success ? "✅" : "❌"}`);
})
);
await Promise.all(promises);
console.log("All messages sent");
}
Subscription Patterns
Topic Subscription with Cleanup
const subscriptions = new Set<string>();
async function subscribeWithCleanup(topic: string, handler: (msg: MessageBody) => void) {
await client.subscribe(topic, handler);
subscriptions.add(topic);
// Return unsubscribe function
return () => {
client.unsubscribe(topic);
subscriptions.delete(topic);
};
}
// Cleanup all subscriptions
function unsubscribeAll() {
for (const topic of subscriptions) {
client.unsubscribe(topic);
}
subscriptions.clear();
}
Pattern-Based Subscriptions
// Subscribe to multiple related topics
const roomTopics = ["room-123", "room-456", "room-789"];
for (const topic of roomTopics) {
await client.subscribe(topic, (msg) => {
handleRoomMessage(msg.topic, msg.payload);
});
}
Subscription with Buffering
class MessageBuffer {
private buffer: MessageBody[] = [];
private batchSize = 10;
private flushInterval = 1000; // 1 second
constructor(topic: string, private processBatch: (messages: MessageBody[]) => void) {
client.subscribe(topic, (msg) => this.addMessage(msg));
setInterval(() => this.flush(), this.flushInterval);
}
private addMessage(msg: MessageBody) {
this.buffer.push(msg);
if (this.buffer.length >= this.batchSize) {
this.flush();
}
}
private flush() {
if (this.buffer.length > 0) {
this.processBatch([...this.buffer]);
this.buffer.length = 0;
}
}
}
Error Handling
Publishing Errors
try {
await client.publish("topic", "message");
} catch (error) {
if (error.message.includes("Not connected")) {
console.error("Connection lost - will reconnect automatically");
} else if (error.message.includes("Invalid topic")) {
console.error("Topic name must be non-empty string");
}
}
Subscription Errors
// With callback for error handling
await client.subscribe(
"topic",
(msg) => console.log(msg),
(response) => {
if (!response.success) {
console.error("Subscription failed:", response.error?.message);
// Implement retry logic
}
}
);
// Or use try-catch
try {
await client.subscribe("topic", (msg) => console.log(msg));
} catch (error) {
console.error("Subscription failed:", error.message);
}
Performance Optimization
Message Batching
Batch multiple publishes to reduce overhead:
class MessageBatcher {
private queue: Array<{ topic: string; payload: string }> = [];
private batchTimeout?: NodeJS.Timeout;
publish(topic: string, payload: string) {
this.queue.push({ topic, payload });
if (this.queue.length >= 10) {
this.flush();
} else if (!this.batchTimeout) {
this.batchTimeout = setTimeout(() => this.flush(), 100);
}
}
private async flush() {
if (this.batchTimeout) {
clearTimeout(this.batchTimeout);
this.batchTimeout = undefined;
}
const batch = [...this.queue];
this.queue.length = 0;
// Send all messages in parallel
await Promise.all(
batch.map(({ topic, payload }) =>
client.publish(topic, payload)
)
);
}
}
Next Steps
- Message History Guide - Comprehensive guide to history API, pagination patterns, and advanced use cases
- Presence Tracking - Monitor user activity and awareness
- Connection Management - Connection lifecycle and error handling
- Types Reference - Complete TypeScript definitions