Node - ZMQ¶
Minitutorial¶
Para usar el paquete:
var zmq = require('zmq');
Para crear un socket:
var sock = zmq.socket("<SOCKET_TYPE>");
en donde tenemos múltiples tipos de socket:
- ZMQ_REQ
- ZMQ_REP
- ZMQ_DEALER
- ZMQ_ROUTER
- ZMQ_PUSH
- ZMQ_PULL
- ZMQ_PUB
- ZMQ_SUB
- ZMQ_XSUB
- ZMQ_XPUB
Hacemos el binding, connect y close:
Bind:
- sock.bind( “tcp://10.0.0.1:5555”,
function(err) { ... });
Connect:
sock.connet(“tcp://10.0.0.1:5555”);
Close:
sock.close();
BindSync:
sock.bindSync(“tcp://10.0.0.1:5555”); // Se ejecuta una vez el socket es binded, pero se rompe ante un error
Se pueden usar múltiples transportes:
TCP: mediante “tcp://<address>:<port>”, por ejemplo:
sock.bind(“tcp://192.168.1.100:5555”); // IP sock.bind(“tcp://eth0:5555”); // Interface eth0 sock.bind(“tcp://*:5555”); // Todas las interfaces
IPC: mediante “ipc://<address>”:
sock.bind(“ipc:///tmp/mysocket”); // Requiere permisos r/w en el path
In-process Communication (en memoria): “inproc://<address>”
- sock.bind(“inproc://queue”); // bind antes de conectar
// address_name < 256 chars
PGM (multicast): uso “pgm://<address>;<multicas_address>:<port>”:
sock.bind(“pgm://192.168.1.100;239.192.1.1:3055”); // slide 22!
Encapsulated PGM (multicast)
Enviar y recibir:
Enviar:
sock.send("Mi mensaje");
Recibir:
sock.on("message", function (msg) { // msg = buffer object console.log(msg.toString()); });
Nota
los mensajes son atómicos. Un mensaje puede tener una o varias partes (sólo limitado por memoria). Asegura que se recibe el mensaje entero o nada.
Envío de mensajes multiparte:
sock.send("Primera parte", zmq.ZMQ_SNDMORE);
sock.send("parte final");
y la recepción:
sock.on("message", function() {
for (var key in arguments) {
console.log("Part" +key+": "+arguments[key]);
};
});
Patrones¶
Publicamos a varios transportes simultáneamente:
var zmq = require('zmq'),
pub = zmq.socket('pub');
pub.bindSync('tcp://127.0.0.1:5555'); // TCP Publisher
pub.bindSync('ipc:///tmp/zmq.sock'); // IPC Publisher
setInterval( function() {
pub.send("Envia mensajes a ambos publicadores");
), 1000); // cada 1000ms
sub = zmq.socket('sub'); // socket subscriptor
sub.connect('ipc:///tmp/zmq.sock'); // al IPC publisher
sub.subscribe('');
sub.on('message', function(msg) {
console.log("Recibido: " + msg );
});
en otro programa, un subscriptor al transporte TCP:
var zmq = require('zmq'),
sub = zmq.socket('sub');
sub.connect('tcp://127.0.0.1:5555');
sub.subscribe('');
sub.on('message', function(msg) {
console.log("Recibido via TCP: ' + msg );
});
Socket options¶
Dos formas de hacerlo:
Mediante “sock.setsockopt(<option>, <value>);”:
sock.setsockopt(‘identity’, ‘monitor-1’);
Mediante “sock.<option> = <value>;”:
sock.identity = ‘monitor-1’;
Esto se usa por ejemplo para filtrar:
subscriber.subscribe('MUSIC');
ó
subscriber.setsockopt('subcribe', 'MUSIC');
o por ejemplo:
subscriber.unsubscribe('MUSIC');
subscriber.setsockopt('subscribe', 'POP');
Patrones¶
Distribución de tareas síncrona¶
Se usa cuando todo mensaje tiene que ser “contestado”. Gestiona sólo un mensaje cada vez. Ciclo enviar-recibir estricto.
Si se envía y no se recibe respuesta, el socket que hace la request no se recuperará.
Ver ejemplo en slide 36.
El Dealer¶
Reparte juego entre varios.