Messaging APIs

Publish, subscribe, and message history APIs for real-time communication

Messaging APIs

Complete guide to real-time messaging with publish/subscribe patterns, advanced subscription options, and message history retrieval.

Real-Time Messaging

subscribe(topic, handler, options?): Promise<void>

Subscribe to messages on a topic with full acknowledgement support.

Parameters:

  • topic: string - Topic name to subscribe to
  • handler: (message: MessageBody) => void - Message callback
  • onAck?: SubscriptionCallback - Optional subscription acknowledgement callback
  • timeoutMs?: number - Subscription timeout in milliseconds (default: 10000)
  • options?: SubscribeOptions - Optional configuration

Message Format:

interface MessageBody {
  id: string;           // Unique message ID
  topic: string;        // Topic name
  senderId: string;     // Sender's client ID
  seq: string;          // Sequence number (ULID)
  sentAt: Date;         // Server timestamp
  payload: string;      // Message content
  clientMsgId?: string; // Optional client correlation ID
}

SubscribeOptions:

interface SubscribeOptions {
  streamOldMessages?: boolean; // Deliver missed messages (default: false)
}

type SubscriptionCallback = (response: {
  success: boolean;
  error?: { code: string; message: string };
}) => void;

Examples:

// Simple subscription
await client.subscribe("chat:lobby", (msg) => {
  console.log(`${msg.senderId}: ${msg.payload}`);
});

// With options
await client.subscribe(
  "chat:lobby", 
  (msg) => console.log(msg),
  { streamOldMessages: true }
);

// With acknowledgement callback
await client.subscribe(
  "chat:lobby",
  (msg) => console.log("Message:", msg.payload),
  (response) => {
    if (response.success) {
      console.log("Subscribed successfully");
    } else {
      console.error("Subscription failed:", response.error?.message);
    }
  }
);

// Full control: acknowledgement, timeout, and options
await client.subscribe(
  "chat:lobby",
  (msg) => console.log("Message:", msg.payload),
  (response) => {
    if (response.success) {
      console.log("Subscription confirmed");
    }
  },
  5000, // 5 second timeout
  { streamOldMessages: true }
);

unsubscribe(topic: string): void

Unsubscribe from a topic.

Parameters:

  • topic: string - Topic to unsubscribe from
// Simple unsubscribe
client.unsubscribe("chat:lobby");

// With acknowledgement callback (optional)
client.unsubscribe(
  "chat:lobby",
  (response) => {
    if (response.success) {
      console.log("Unsubscribed successfully");
    }
  },
  5000 // timeout in ms
);

publish(topic, payload): Promise<string>

Publish a message without acknowledgement.

Parameters:

  • topic: string - Topic to publish to
  • payload: string - Message content

Returns: Promise<string> - Client message ID

const msgId = await client.publish("chat:lobby", "Hello!");

publishWithAck(topic, payload, onAck, timeout?): Promise<string>

Publish with server acknowledgement.

Parameters:

  • topic: string - Topic to publish to
  • payload: string - Message content
  • onAck: AckCallback - Acknowledgement callback
  • timeout?: number - Timeout in milliseconds (default: 3000)

AckCallback Type:

type AckCallback = (response: AckResponse) => void;

type AckResponse = {
  success: boolean;
  ack?: AckPacketType;
  seq?: string;
  serverMsgId?: string;
  topic: string;
  error?: {
    code: string;
    message: string;
  };
};

Example:

await client.publishWithAck(
  "chat:lobby",
  "Hello!",
  (ack) => {
    if (ack.success) {
      console.log("Message delivered! Seq:", ack.seq);
    } else {
      console.error("Failed:", ack.error?.message);
    }
  },
  5000
);

Message History

Retrieve past messages from any topic with cursor-based pagination. Messages are stored for 3 days with up to 100 messages per topic.

Quick Start

// Fetch latest 50 messages
const history = await client.getHistory("chat:lobby");

history.items.forEach(msg => {
  console.log(`[${msg.seq}] ${msg.senderId}: ${msg.payload}`);
});

// Paginate with cursor
if (history.nextCursor) {
  const nextPage = await client.getHistory("chat:lobby", {
    cursor: history.nextCursor,
    limit: 50
  });
}

Pagination Iterator

For cleaner pagination code, use the history iterator:

const getNext = client.createHistoryIterator("chat:lobby", { limit: 50 });

const page1 = await getNext(); // { items: [...], hasMore: true }
const page2 = await getNext(); // { items: [...], hasMore: false }
const done = await getNext();  // null (exhausted)
ℹ️

For comprehensive history features, see the full guide:

  • Pagination patterns (infinite scroll, load more)
  • Catch-up after reconnection
  • Search in history
  • Time-based queries with ULID
  • Performance optimization tips

→ Read the Message History Guide


Advanced Messaging Patterns

Message Acknowledgment Patterns

Fire-and-Forget

// Simple publish without confirmation
await client.publish("notifications", "System update complete");

Reliable Delivery

// Publish with acknowledgment
await client.publishWithAck(
  "critical-alerts",
  "Server maintenance starting",
  (ack) => {
    if (ack.success) {
      console.log("Alert delivered successfully");
    } else {
      // Retry logic or fallback notification
      console.error("Alert delivery failed:", ack.error?.message);
    }
  },
  5000
);

Batch Publishing

async function publishBatch(topic: string, messages: string[]) {
  const promises = messages.map(msg => 
    client.publishWithAck(topic, msg, (ack) => {
      console.log(`Message ${msg}: ${ack.success ? "✅" : "❌"}`);
    })
  );
  
  await Promise.all(promises);
  console.log("All messages sent");
}

Subscription Patterns

Topic Subscription with Cleanup

const subscriptions = new Set<string>();

async function subscribeWithCleanup(topic: string, handler: (msg: MessageBody) => void) {
  await client.subscribe(topic, handler);
  subscriptions.add(topic);
  
  // Return unsubscribe function
  return () => {
    client.unsubscribe(topic);
    subscriptions.delete(topic);
  };
}

// Cleanup all subscriptions
function unsubscribeAll() {
  for (const topic of subscriptions) {
    client.unsubscribe(topic);
  }
  subscriptions.clear();
}

Pattern-Based Subscriptions

// Subscribe to multiple related topics
const roomTopics = ["room-123", "room-456", "room-789"];

for (const topic of roomTopics) {
  await client.subscribe(topic, (msg) => {
    handleRoomMessage(msg.topic, msg.payload);
  });
}

Subscription with Buffering

class MessageBuffer {
  private buffer: MessageBody[] = [];
  private batchSize = 10;
  private flushInterval = 1000; // 1 second
  
  constructor(topic: string, private processBatch: (messages: MessageBody[]) => void) {
    client.subscribe(topic, (msg) => this.addMessage(msg));
    setInterval(() => this.flush(), this.flushInterval);
  }
  
  private addMessage(msg: MessageBody) {
    this.buffer.push(msg);
    if (this.buffer.length >= this.batchSize) {
      this.flush();
    }
  }
  
  private flush() {
    if (this.buffer.length > 0) {
      this.processBatch([...this.buffer]);
      this.buffer.length = 0;
    }
  }
}

Error Handling

Publishing Errors

try {
  await client.publish("topic", "message");
} catch (error) {
  if (error.message.includes("Not connected")) {
    console.error("Connection lost - will reconnect automatically");
  } else if (error.message.includes("Invalid topic")) {
    console.error("Topic name must be non-empty string");
  }
}

Subscription Errors

// With callback for error handling
await client.subscribe(
  "topic",
  (msg) => console.log(msg),
  (response) => {
    if (!response.success) {
      console.error("Subscription failed:", response.error?.message);
      // Implement retry logic
    }
  }
);

// Or use try-catch
try {
  await client.subscribe("topic", (msg) => console.log(msg));
} catch (error) {
  console.error("Subscription failed:", error.message);
}

Performance Optimization

Message Batching

Batch multiple publishes to reduce overhead:

class MessageBatcher {
  private queue: Array<{ topic: string; payload: string }> = [];
  private batchTimeout?: NodeJS.Timeout;
  
  publish(topic: string, payload: string) {
    this.queue.push({ topic, payload });
    
    if (this.queue.length >= 10) {
      this.flush();
    } else if (!this.batchTimeout) {
      this.batchTimeout = setTimeout(() => this.flush(), 100);
    }
  }
  
  private async flush() {
    if (this.batchTimeout) {
      clearTimeout(this.batchTimeout);
      this.batchTimeout = undefined;
    }
    
    const batch = [...this.queue];
    this.queue.length = 0;
    
    // Send all messages in parallel
    await Promise.all(
      batch.map(({ topic, payload }) => 
        client.publish(topic, payload)
      )
    );
  }
}

Next Steps