Node - ZMQ

Introducción

¿Por qué?

Minitutorial

Para usar el paquete:

var zmq = require('zmq');

Para crear un socket:

var sock = zmq.socket("<SOCKET_TYPE>");

en donde tenemos múltiples tipos de socket:

  • ZMQ_REQ
  • ZMQ_REP
  • ZMQ_DEALER
  • ZMQ_ROUTER
  • ZMQ_PUSH
  • ZMQ_PULL
  • ZMQ_PUB
  • ZMQ_SUB
  • ZMQ_XSUB
  • ZMQ_XPUB

Hacemos el binding, connect y close:

Se pueden usar múltiples transportes:

  • TCP: mediante “tcp://<address>:<port>”, por ejemplo:

    sock.bind(“tcp://192.168.1.100:5555”); // IP sock.bind(“tcp://eth0:5555”); // Interface eth0 sock.bind(“tcp://*:5555”); // Todas las interfaces

  • IPC: mediante “ipc://<address>”:

    sock.bind(“ipc:///tmp/mysocket”); // Requiere permisos r/w en el path

  • In-process Communication (en memoria): “inproc://<address>”

    sock.bind(“inproc://queue”); // bind antes de conectar

    // address_name < 256 chars

  • PGM (multicast): uso “pgm://<address>;<multicas_address>:<port>”:

    sock.bind(“pgm://192.168.1.100;239.192.1.1:3055”); // slide 22!

  • Encapsulated PGM (multicast)

Enviar y recibir:

  • Enviar:

    sock.send("Mi mensaje");
    
  • Recibir:

    sock.on("message", function (msg) {   // msg = buffer object
       console.log(msg.toString());
    });
    

Nota

los mensajes son atómicos. Un mensaje puede tener una o varias partes (sólo limitado por memoria). Asegura que se recibe el mensaje entero o nada.

Envío de mensajes multiparte:

sock.send("Primera parte", zmq.ZMQ_SNDMORE);
sock.send("parte final");

y la recepción:

sock.on("message", function() {
   for (var key in arguments) {
      console.log("Part" +key+": "+arguments[key]);
   };
});

Patrones

Publicamos a varios transportes simultáneamente:

var zmq = require('zmq'),
    pub = zmq.socket('pub');

pub.bindSync('tcp://127.0.0.1:5555'); // TCP Publisher
pub.bindSync('ipc:///tmp/zmq.sock');  // IPC Publisher

setInterval( function() {
   pub.send("Envia mensajes a ambos publicadores");
), 1000); // cada 1000ms

sub = zmq.socket('sub');  // socket subscriptor
sub.connect('ipc:///tmp/zmq.sock');  // al IPC publisher
sub.subscribe('');
sub.on('message', function(msg) {
   console.log("Recibido: " + msg );
});

en otro programa, un subscriptor al transporte TCP:

var zmq = require('zmq'),
    sub = zmq.socket('sub');

sub.connect('tcp://127.0.0.1:5555');
sub.subscribe('');

sub.on('message', function(msg) {
   console.log("Recibido via TCP: ' + msg );
});

Socket options

Dos formas de hacerlo:

  • Mediante “sock.setsockopt(<option>, <value>);”:

    sock.setsockopt(‘identity’, ‘monitor-1’);

  • Mediante “sock.<option> = <value>;”:

    sock.identity = ‘monitor-1’;

Esto se usa por ejemplo para filtrar:

subscriber.subscribe('MUSIC');
ó
subscriber.setsockopt('subcribe', 'MUSIC');

o por ejemplo:

subscriber.unsubscribe('MUSIC');
subscriber.setsockopt('subscribe', 'POP');

Patrones

Distribución de tareas síncrona

Se usa cuando todo mensaje tiene que ser “contestado”. Gestiona sólo un mensaje cada vez. Ciclo enviar-recibir estricto.

Si se envía y no se recibe respuesta, el socket que hace la request no se recuperará.

Ver ejemplo en slide 36.

El Dealer

Reparte juego entre varios.