SDK ReferenceCore API

Messaging APIs

Publish, subscribe, and message history APIs for real-time communication

Messaging APIs

Complete guide to real-time messaging with publish/subscribe patterns, advanced subscription options, and message history retrieval.

Real-Time Messaging

subscribe(topic, handler, options?): Promise<void>

Subscribe to messages on a topic with full acknowledgement support.

Parameters:

  • topic: string - Topic name to subscribe to
  • handler: (message: MessageBody) => void - Message callback
  • onAck?: SubscriptionCallback - Optional subscription acknowledgement callback
  • timeoutMs?: number - Subscription timeout in milliseconds (default: 10000)
  • options?: SubscribeOptions - Optional configuration

Message Format:

interface MessageBody {
  id: string; // Unique message ID
  topic: string; // Topic name
  senderId: string; // Sender's client ID
  seq: string; // Sequence number (ULID)
  sentAt: Date; // Server timestamp
  payload: string; // Message content
  clientMsgId?: string; // Optional client correlation ID
}

SubscribeOptions:

interface SubscribeOptions {
  streamOldMessages?: boolean; // Deliver missed messages (default: false)
}

type SubscriptionCallback = (response: {
  success: boolean;
  error?: { code: string; message: string };
}) => void;

Examples:

// Simple subscription
await client.subscribe("chat:lobby", (msg) => {
  console.log(`${msg.senderId}: ${msg.payload}`);
});

// With options
await client.subscribe("chat:lobby", (msg) => console.log(msg), {
  streamOldMessages: true,
});

// With acknowledgement callback
await client.subscribe(
  "chat:lobby",
  (msg) => console.log("Message:", msg.payload),
  (response) => {
    if (response.success) {
      console.log("Subscribed successfully");
    } else {
      console.error("Subscription failed:", response.error?.message);
    }
  },
);

// Full control: acknowledgement, timeout, and options
await client.subscribe(
  "chat:lobby",
  (msg) => console.log("Message:", msg.payload),
  (response) => {
    if (response.success) {
      console.log("Subscription confirmed");
    }
  },
  5000, // 5 second timeout
  { streamOldMessages: true },
);

unsubscribe(topic: string): void

Unsubscribe from a topic.

Parameters:

  • topic: string - Topic to unsubscribe from
// Simple unsubscribe
client.unsubscribe("chat:lobby");

// With acknowledgement callback (optional)
client.unsubscribe(
  "chat:lobby",
  (response) => {
    if (response.success) {
      console.log("Unsubscribed successfully");
    }
  },
  5000, // timeout in ms
);

publish(topic, payload): Promise<string>

Publish a message without acknowledgement.

Parameters:

  • topic: string - Topic to publish to
  • payload: string - Message content

Returns: Promise<string> - Client message ID

const msgId = await client.publish("chat:lobby", "Hello!");

publishWithAck(topic, payload, onAck, timeout?): Promise<string>

Publish with server acknowledgement.

Parameters:

  • topic: string - Topic to publish to
  • payload: string - Message content
  • onAck: AckCallback - Acknowledgement callback
  • timeout?: number - Timeout in milliseconds (default: 3000)

AckCallback Type:

type AckCallback = (response: AckResponse) => void;

type AckResponse = {
  success: boolean;
  ack?: AckPacketType;
  seq?: string;
  serverMsgId?: string;
  topic: string;
  error?: {
    code: string;
    message: string;
  };
};

Example:

await client.publishWithAck(
  "chat:lobby",
  "Hello!",
  (ack) => {
    if (ack.success) {
      console.log("Message delivered! Seq:", ack.seq);
    } else {
      console.error("Failed:", ack.error?.message);
    }
  },
  5000,
);

Message History

Retrieve past messages from any topic with cursor-based pagination. Messages are stored for 3 days with up to 100 messages per topic.

Quick Start

// Fetch latest 50 messages
const history = await client.getHistory("chat:lobby");

history.items.forEach((msg) => {
  console.log(`[${msg.seq}] ${msg.senderId}: ${msg.payload}`);
});

// Paginate with cursor
if (history.nextCursor) {
  const nextPage = await client.getHistory("chat:lobby", {
    cursor: history.nextCursor,
    limit: 50,
  });
}

Pagination Iterator

For cleaner pagination code, use the history iterator:

const getNext = client.createHistoryIterator("chat:lobby", { limit: 50 });

const page1 = await getNext(); // { items: [...], hasMore: true }
const page2 = await getNext(); // { items: [...], hasMore: false }
const done = await getNext(); // null (exhausted)
ℹ️

For comprehensive history features, see the full guide:

  • Pagination patterns (infinite scroll, load more)
  • Catch-up after reconnection
  • Search in history
  • Time-based queries with ULID
  • Performance optimization tips

→ Read the Message History Guide


Advanced Messaging Patterns

Message Acknowledgment Patterns

Fire-and-Forget

// Simple publish without confirmation
await client.publish("notifications", "System update complete");

Reliable Delivery

// Publish with acknowledgment
await client.publishWithAck(
  "critical-alerts",
  "Server maintenance starting",
  (ack) => {
    if (ack.success) {
      console.log("Alert delivered successfully");
    } else {
      // Retry logic or fallback notification
      console.error("Alert delivery failed:", ack.error?.message);
    }
  },
  5000,
);

Batch Publishing

async function publishBatch(topic: string, messages: string[]) {
  const promises = messages.map((msg) =>
    client.publishWithAck(topic, msg, (ack) => {
      console.log(`Message ${msg}: ${ack.success ? "✅" : "❌"}`);
    }),
  );

  await Promise.all(promises);
  console.log("All messages sent");
}

Subscription Patterns

Topic Subscription with Cleanup

const subscriptions = new Set<string>();

async function subscribeWithCleanup(
  topic: string,
  handler: (msg: MessageBody) => void,
) {
  await client.subscribe(topic, handler);
  subscriptions.add(topic);

  // Return unsubscribe function
  return () => {
    client.unsubscribe(topic);
    subscriptions.delete(topic);
  };
}

// Cleanup all subscriptions
function unsubscribeAll() {
  for (const topic of subscriptions) {
    client.unsubscribe(topic);
  }
  subscriptions.clear();
}

Pattern-Based Subscriptions

// Subscribe to multiple related topics
const roomTopics = ["room-123", "room-456", "room-789"];

for (const topic of roomTopics) {
  await client.subscribe(topic, (msg) => {
    handleRoomMessage(msg.topic, msg.payload);
  });
}

Subscription with Buffering

class MessageBuffer {
  private buffer: MessageBody[] = [];
  private batchSize = 10;
  private flushInterval = 1000; // 1 second

  constructor(
    topic: string,
    private processBatch: (messages: MessageBody[]) => void,
  ) {
    client.subscribe(topic, (msg) => this.addMessage(msg));
    setInterval(() => this.flush(), this.flushInterval);
  }

  private addMessage(msg: MessageBody) {
    this.buffer.push(msg);
    if (this.buffer.length >= this.batchSize) {
      this.flush();
    }
  }

  private flush() {
    if (this.buffer.length > 0) {
      this.processBatch([...this.buffer]);
      this.buffer.length = 0;
    }
  }
}

Error Handling

Publishing Errors

try {
  await client.publish("topic", "message");
} catch (error) {
  if (error.message.includes("Not connected")) {
    console.error("Connection lost - will reconnect automatically");
  } else if (error.message.includes("Invalid topic")) {
    console.error("Topic name must be non-empty string");
  }
}

Subscription Errors

// With callback for error handling
await client.subscribe(
  "topic",
  (msg) => console.log(msg),
  (response) => {
    if (!response.success) {
      console.error("Subscription failed:", response.error?.message);
      // Implement retry logic
    }
  },
);

// Or use try-catch
try {
  await client.subscribe("topic", (msg) => console.log(msg));
} catch (error) {
  console.error("Subscription failed:", error.message);
}

Performance Optimization

Message Batching

Batch multiple publishes to reduce overhead:

class MessageBatcher {
  private queue: Array<{ topic: string; payload: string }> = [];
  private batchTimeout?: NodeJS.Timeout;

  publish(topic: string, payload: string) {
    this.queue.push({ topic, payload });

    if (this.queue.length >= 10) {
      this.flush();
    } else if (!this.batchTimeout) {
      this.batchTimeout = setTimeout(() => this.flush(), 100);
    }
  }

  private async flush() {
    if (this.batchTimeout) {
      clearTimeout(this.batchTimeout);
      this.batchTimeout = undefined;
    }

    const batch = [...this.queue];
    this.queue.length = 0;

    // Send all messages in parallel
    await Promise.all(
      batch.map(({ topic, payload }) => client.publish(topic, payload)),
    );
  }
}

Next Steps