Pub/Sub SDK Signatures
We want to ensure as consistent an experience as possible between our SDKs, while still complying with best practices for each language.
The entry point for each pub/sub connection will be the connect()
method. This will establish a connection to the server, authenticated using one or more project keys, and supply a handle by which the user can subscribe/unsubscribe to/from channels and publish to channels.
All operations on each SDK must be asynchronous. All methods must either accept a callback/listener or return a future/promise which will be used to notify of an operation's success or failure.
connect
Core connect function
def connect(projectKeys: List[String], options: PubSubOptions = PubSubOptions.default): Future[ConnectionHandle]
This function will authenticate using the supplied project keys, and the connection will permit the read (subscribe), write (unsubscribe) and admin operations based on the supplied keys. It returns a Future which will either be completed with a ConnectionHandle or failed with a Throwable if the initial connection attempt failed.
PubSubOptions
The various handler options set their respective handlers documented under event handlers.
url
String | default: 'wss://api.cogswell.io/pubsub'
This is used in order to override the location of the pub/sub server. This will be essential for using SDKs against QA and development environments. Remember, this should not contain any portion of the patch specific to Cogswell (i.e., the /pubsub path).
autoReconnect
Boolean | default: true
This indicates whether the connection should automatically be re-established if it is dropped. The implementation MUST apply a back-off reconnect.
connectTimeout
Duration | default: Duration(30L, TimeUnit.SECONDS)
This indicates the duration after which a connection attempt should timeout.
sessionUuid
Uuid | optional
If provided, this indicates the UUID of the session that should be restored if possible.
onRawRecordHandler
(RawRecord) => Unit | optional
onMessageHandler
(MessageRecord) => Unit | optional
onErrorHandler
(Throwable) => Unit | optional
onErrorResponseHandler
(PubSubErrorResponse) => Unit | optional
onCloseHandler
(Option[Throwable]) => Unit | optional
onNewSessionHandler
(UUID) => Unit | optional
ConnectionHandle
Pub/Sub
getSessionUuid
def getSessionUuid(): Future[UUID]
Fetch the session UUID from the server, which will trigger caching to be enabled if it is enabled on the project. The result, if successful, is the UUID associated with this connection's session.
subscribe
case class MessageRecord( channel: String, message: String, timestamp: DateTime, id: UUID ) type MessageHandler = (MessageRecord) => Unit def subscribe(channel: String, messageHandler: MessageHandler): Future[List[String]]
Subscribe to a channel, supplying a handler which will be called with each message received from this channel. The result, if successful, contains a list of the channels to which this connection is subscribed.
unsubscribe
def unsubscribe(channel: String): Future[List[String]]
Unsubscribe from a channel. The result, if successful, contains a list of the channels to which this connection is still subscribed.
unsubscribeAll
def unsubscribeAll(): Future[List[String]]
Unsubscribe from all channels. The result, if successful, contains a list of the channels to which this connection used to be subscribed.
listSubscriptions
def listSubscriptions(): Future[List[String]]
The result, if successful, contains a list of the channels to which this connection is still subscribed.
publish
case class PubSubErrorResponse( code: Integer, details: Option[String], action: Option[String], message: Option[String] sequence: Option[Long] ) type ErrorResponseHandler = (PubSubErrorResponse) => Unit def publish(channel: String, message: String, errorHandler: Option[ErrorResponseHandler] = None): Future[Long]
Publishes a message to a channel. The result, if successful, contains the sequence number of the publish directive.
The message string is limited to 64KiB. Messages that exceed this limit will result in the termination of the websocket connection.
NOTE: errorHandler
is only called if the server reports an error within 30 seconds. The SDK will delete the errorHandler
after 30 seconds. If the server did not send any errors, errorHandler
will not be called.
publishWithAck
def publishWithAck(channel: String, message: String): Future[UUID]
Publishes a message to a channel. The result is a Future which, if successful, contains the UUID of the message published.
close
def close(): Future[Unit]
Closes the pub/sub client handle.
dropConnection
case class DropConnectionOptions( autoReconnectDelay: Option[Duration] ) def dropConnection(dropConnectionOptions: DropConnectionOptions): Unit
Closes the pub/sub client (simulates an unexpected connection drop). To be used for internal testing only. The autoReconnectDelay
option, if present, overrides the default initial auto-reconnect delay.
Event Handlers
onRawRecord
type RawRecord = String type RawRecordHandler = (RawRecord) => Unit def onRawRecord(rawRecordHandler: RawRecordHandler): Unit
Register a handler for any raw record received from the server, whether a response to a request or a message. This is mostly useful for debugging issues with server communication.
onMessage
case class MessageRecord( channel: String, message: String, timestamp: DateTime, id: UUID ) type MessageHandler = (MessageRecord) => Unit def onMessage(messageHandler: MessageHandler): Unit
Register a handler for messages from any channel.
onError
type ErrorHandler = (Throwable) => Unit def onError(errorHandler: ErrorHandler): Unit
Register a handler for any and all exceptions thrown on the client side.
onErrorResponse
case class PubSubErrorResponse( code: Integer, details: Option[String], action: Option[String], message: Option[String] sequence: Option[Long] ) type ErrorResponseHandler = (PubSubErrorResponse) => Unit def onErrorResponse(errorResponseHandler: ErrorResponseHandler): Unit
represents the handler for any and all error responses sent back from the server, i.e. responses that do not have code 200
onReconnect
def onReconnect(reconnectHandler: => Unit): Unit
Register a handler for reconnect events.
onClose
def onClose(closeHandler: (Option[Throwable]) => Unit): Unit
Register a handler for close events.
onNewSession
def onNewSession(newSessionHandler: (UUID) => Unit): Unit
Register a handler for new session events. This indicates that the session associated with this connection is not a resumed session, therefore there are no subscriptions associated with this session. If there had been a previous session and the connection was replaced by an auto-reconnect, the previous session was not restored resulting in all subscriptions being lost.
Example from the Node.js SDK
const cogs = require('cogs-sdk'); const keys = [ 'R-2cf87f44ca4b21a1fdd38e7553022075-b351e21fc356b9af2897886210fb6b72cfdfa8e013a9c38c7ff238dc8b804f71', 'W-2cf87f44ca4b21a1fdd38e7553022075-c2fe40dc8c31cde3f04eb29337661d5969edb5c9b70f4107c04ea7ff9eaf2a3c', ]; const testChannel = 'test-channel'; // Function to run a test using the pub/sub client handle. function simpleTest(handle) { console.log("Successfully connected to pub/sub server."); handle.subscribe(testChannel, (channel, message) => { console.log(`Received message from channel '${channel}': ${message}`); handle.close(); }) .then(() => { console.log(`Successfully subscribed to '${testChannel}'`); handle.publish(testChannel, 'Test message.'); }) .catch((error) => { console.error(`Error subscribing to '${testChannel}`); }); } // Connect to the pub/sub server cogs.pubsub.connect(keys) .then(handle => simpleTest(handle)) .catch(error => console.error("Error connecting to pub/sub server:", error));