Skip to main content

rsocket-js

rsocket-js implements the 1.0 version of the RSocket protocol and is designed for use in Node.js and browsers.

Packages

The following packages are published to npm:

Status

The following are currently implemented:

  • RSocketClient / RSocketServer
  • Node.js TCP/WebSocket server/client transport
  • Browser WebSocket client (binary)
  • TCK client for spec compliance testing
  • UTF-8 and Binary encoding at the transport layer
  • Optional JSON (de)serialization at the rsocket layer (send and receive objects instead of strings)
  • ReactiveStream data types

Reactive Streams

rsocket-js includes an implementation of the Reactive Streams API in JavaScript. Note that unlike standard Rx Observables, Reactive Streams are lazy, pull-based, and support back-pressure. Two types are implemented:

  • Flowable: An implementation of the Reactive Streams Publisher type, providing a demand-driven stream of values over time.
  • Single: Like Flowable, but resolves to a single value.

rsocket-js public API methods typically return values of these types.

WebSocket Client & Server example

Client Example

The client sends a request/response message to the server on an interval, and exits after a certain amount of time has elapsed.

// rsocket-client.js
const { RSocketClient } = require('rsocket-core');
const RSocketWebsocketClient = require('rsocket-websocket-client').default;
const WebSocket = require('ws');

function now() {
return new Date().getTime();
}

async function connect(options) {
const transportOptions = {
url: 'ws://127.0.0.1:9898',
wsCreator: (url) => {
return new WebSocket(url);
},
};
const setup = {
keepAlive: 1000000,
lifetime: 100000,
dataMimeType: 'text/plain',
metadataMimeType: 'text/plain',
};
const transport = new RSocketWebsocketClient(transportOptions);
const client = new RSocketClient({ setup, transport });
return await client.connect();
}

async function run() {
return new Promise(async (resolve, reject) => {
const rsocket = await connect();
const start = now();
const interval = setInterval(() => {
rsocket.requestResponse({ data: 'What is the current time?' }).subscribe({
onComplete: (response) => {
console.log(response);
},
onError: (error) => {
console.error(error);
},
onSubscribe: (cancel) => {
/* call cancel() to stop onComplete/onError */
},
});

if (now() - start >= 5000) {
clearInterval(interval);
resolve();
}
}, 750);
});
}

Promise.resolve(run()).then(
() => process.exit(0),
(error) => {
console.error(error.stack);
process.exit(1);
}
);

Server Example

The server responds to request/response messages with the current time.

// rsocket-server.js
const { RSocketServer } = require('rsocket-core');
const RSocketWebSocketServer = require('rsocket-websocket-server');
const { Single } = require('rsocket-flowable');

const WebSocketTransport = RSocketWebSocketServer.default;
const host = '127.0.0.1';
const port = 9898;

const transportOpts = {
host: host,
port: port,
};

const transport = new WebSocketTransport(transportOpts);

const statuses = {
PENDING: 'pending',
CANCELLED: 'cancelled',
};

const getRequestHandler = (requestingRSocket, setupPayload) => {
function handleRequestResponse(payload) {
let status = statuses.PENDING;

console.log(`requestResponse request`, payload);

return new Single((subscriber) => {
function handleCancellation() {
status = statuses.CANCELLED;
}

subscriber.onSubscribe(() => handleCancellation());

/**
* Leverage `setTimeout` to simulate a delay
* in responding to the client.
*/
setTimeout(() => {
if (status === statuses.CANCELLED) {
return;
}

const msg = `${new Date()}`;
console.log(`requestResponse response`, msg);
try {
subscriber.onComplete({
data: msg,
metadata: null, // or new Buffer(...)
});
} catch (e) {
subscriber.onError(e);
}
}, 100);
});
}

return {
requestResponse: handleRequestResponse,
};
};

const rSocketServer = new RSocketServer({
transport,
getRequestHandler,
});

console.log(`Server starting on port ${port}...`);

rSocketServer.start();

More Examples

Browse the following repositories for more rsocket-js examples: