Pub/Sub SDK Signatures

We want to ensure as consistent an experience as possible between our SDKs, while still complying with best practices for each language.

The entry point for each pub/sub connection will be the connect() method. This will establish a connection to the server, authenticated using one or more project keys, and supply a handle by which the user can subscribe/unsubscribe to/from channels and publish to channels.

All operations on each SDK must be asynchronous. All methods must either accept a callback/listener or return a future/promise which will be used to notify of an operation's success or failure.

connect

Core connect function

def connect(projectKeys: List[String], options: PubSubOptions = PubSubOptions.default): Future[ConnectionHandle]


This function will authenticate using the supplied project keys, and the connection will permit the read (subscribe), write (unsubscribe) and admin operations based on the supplied keys. It returns a Future which will either be completed with a ConnectionHandle or failed with a Throwable if the initial connection attempt failed.

PubSubOptions

The various handler options set their respective handlers documented under event handlers.

url

String | default: 'wss://api.cogswell.io/pubsub'

This is used in order to override the location of the pub/sub server. This will be essential for using SDKs against QA and development environments. Remember, this should not contain any portion of the patch specific to Cogswell (i.e., the /pubsub path).

autoReconnect

Boolean | default: true

This indicates whether the connection should automatically be re-established if it is dropped. The implementation MUST apply a back-off reconnect.

connectTimeout

Duration | default: Duration(30L, TimeUnit.SECONDS)

This indicates the duration after which a connection attempt should timeout.

sessionUuid

Uuid | optional

If provided, this indicates the UUID of the session that should be restored if possible.

onRawRecordHandler

(RawRecord) => Unit | optional

onMessageHandler

(MessageRecord) => Unit | optional

onErrorHandler

(Throwable) => Unit | optional

onErrorResponseHandler

(PubSubErrorResponse) => Unit | optional

onCloseHandler

(Option[Throwable]) => Unit | optional

onNewSessionHandler

(UUID) => Unit | optional

ConnectionHandle

Pub/Sub

getSessionUuid

def getSessionUuid(): Future[UUID]

Fetch the session UUID from the server, which will trigger caching to be enabled if it is enabled on the project. The result, if successful, is the UUID associated with this connection's session.

subscribe

case class MessageRecord(
  channel: String,
  message: String,
  timestamp: DateTime,
  id: UUID
)

type MessageHandler = (MessageRecord) => Unit

def subscribe(channel: String, messageHandler: MessageHandler): Future[List[String]]

Subscribe to a channel, supplying a handler which will be called with each message received from this channel. The result, if successful, contains a list of the channels to which this connection is subscribed.

unsubscribe

def unsubscribe(channel: String): Future[List[String]]

Unsubscribe from a channel. The result, if successful, contains a list of the channels to which this connection is still subscribed.

unsubscribeAll

def unsubscribeAll(): Future[List[String]]

Unsubscribe from all channels. The result, if successful, contains a list of the channels to which this connection used to be subscribed.

listSubscriptions

def listSubscriptions(): Future[List[String]]

The result, if successful, contains a list of the channels to which this connection is still subscribed.

publish

case class PubSubErrorResponse(
  code: Integer,
  details: Option[String],
  action: Option[String],
  message: Option[String]
  sequence: Option[Long]
)

type ErrorResponseHandler = (PubSubErrorResponse) => Unit

def publish(channel: String, message: String, errorHandler: Option[ErrorResponseHandler] = None): Future[Long]

Publishes a message to a channel. The result, if successful, contains the sequence number of the publish directive.

The message string is limited to 64KiB. Messages that exceed this limit will result in the termination of the websocket connection.

NOTE: errorHandler is only called if the server reports an error within 30 seconds.  The SDK will delete the errorHandler after 30 seconds.  If the server did not send any errors, errorHandler will not be called.

publishWithAck

def publishWithAck(channel: String, message: String): Future[UUID]

Publishes a message to a channel. The result is a Future which, if successful, contains the UUID of the message published.

close

def close(): Future[Unit]

Closes the pub/sub client handle.

dropConnection

case class DropConnectionOptions(
  autoReconnectDelay: Option[Duration]
)


def dropConnection(dropConnectionOptions: DropConnectionOptions): Unit

Closes the pub/sub client (simulates an unexpected connection drop). To be used for internal testing only. The autoReconnectDelay option, if present, overrides the default initial auto-reconnect delay. 

Event Handlers

onRawRecord

type RawRecord = String
type RawRecordHandler = (RawRecord) => Unit

def onRawRecord(rawRecordHandler: RawRecordHandler): Unit

Register a handler for any raw record received from the server, whether a response to a request or a message. This is mostly useful for debugging issues with server communication.

onMessage

case class MessageRecord(
  channel: String,
  message: String,
  timestamp: DateTime,
  id: UUID
)

type MessageHandler = (MessageRecord) => Unit

def onMessage(messageHandler: MessageHandler): Unit

Register a handler for messages from any channel.

onError

type ErrorHandler = (Throwable) => Unit

def onError(errorHandler: ErrorHandler): Unit

Register a handler for any and all exceptions thrown on the client side.

onErrorResponse

case class PubSubErrorResponse(
  code: Integer,
  details: Option[String],
  action: Option[String],
  message: Option[String]
  sequence: Option[Long]
)

type ErrorResponseHandler = (PubSubErrorResponse) => Unit

def onErrorResponse(errorResponseHandler: ErrorResponseHandler): Unit

represents the handler for any and all error responses sent back from the server, i.e. responses that do not have code 200

onReconnect

def onReconnect(reconnectHandler: => Unit): Unit

Register a handler for reconnect events.

onClose

def onClose(closeHandler: (Option[Throwable]) => Unit): Unit

Register a handler for close events.

onNewSession

def onNewSession(newSessionHandler: (UUID) => Unit): Unit

Register a handler for new session events. This indicates that the session associated with this connection is not a resumed session, therefore there are no subscriptions associated with this session. If there had been a previous session and the connection was replaced by an auto-reconnect, the previous session was not restored resulting in all subscriptions being lost.

Example from the Node.js SDK

const cogs = require('cogs-sdk');
const keys = [
  'R-2cf87f44ca4b21a1fdd38e7553022075-b351e21fc356b9af2897886210fb6b72cfdfa8e013a9c38c7ff238dc8b804f71',
  'W-2cf87f44ca4b21a1fdd38e7553022075-c2fe40dc8c31cde3f04eb29337661d5969edb5c9b70f4107c04ea7ff9eaf2a3c',
];
const testChannel = 'test-channel';

// Function to run a test using the pub/sub client handle.
function simpleTest(handle) {
  console.log("Successfully connected to pub/sub server.");
  
  handle.subscribe(testChannel, (channel, message) => {
    console.log(`Received message from channel '${channel}': ${message}`);
    handle.close();
  })
  .then(() => {
    console.log(`Successfully subscribed to '${testChannel}'`);
    handle.publish(testChannel, 'Test message.');
  })
  .catch((error) => {
    console.error(`Error subscribing to '${testChannel}`);
  });
}

// Connect to the pub/sub server
cogs.pubsub.connect(keys)
.then(handle => simpleTest(handle))
.catch(error => console.error("Error connecting to pub/sub server:", error));