ZMQ 3.2.0-rc1 was built with the latest Sun Studio C/C++ compilers,
with CFLAGS and CXXFLAGS=-fast -library=stlport4.
$ uname -a
SunOS xx 5.11 11.0 i86pc i386 i86pc
$ CC -V
CC: Sun C++ 5.12 SunOS_i386 2011/11/16
$ cc -V
cc: Sun C 5.12 SunOS_i386 2011/11/16
The syncpub/syncsub example from the guide was adapted to 3.2.0 (see
the code below).
The original version of syncsub would simply get stuck in zmq_recv().
I added extra code to detect sequence gaps and break out of the loop
whenever there's a discrepancy; now all syncsubs print "received N
updates" where N is somewhat random. Slowing down the producer fixes
the problem. Zmq reports no errors at any point.
As a side note, whenever there is a sequence gap, the next sequence
number received by syncsub appears to come from the start of the next
send() buffer sent by syncpub (confirmed by 'truss syncpub')
Any suggestions?
Alex Keahan
Run as follows:
$ ./syncpub &
$ ./syncsub A &
$ ./syncsub B &
You should see something similar to this:
$ ./syncpub &
[1] 6611
$ Waiting for subscribers
$ ./syncsub A &
[2] 6612
$ ./syncsub B &
[3] 6613
$ Broadcasting messages
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
A: received 1000 updates
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
B: received 1000 updates
[2]- Done./syncsub A
[3]+ Done./syncsub B
syncpub.c:
#include
#include
#include
#include
// Wait for 2 subscribers
#define SUBSCRIBERS_EXPECTED 2
int
main(int argc, char *argv[])
{
void *context = zmq_ctx_new();
if (context == 0) {
fprintf(stderr, "Error: zmq_ctx_new failed: %s\n", zmq_strerror(errno));
return 1;
}
int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
if (rc != 0) {
fprintf(stderr, "Error: zmq_ctx_set failed: %s\n", zmq_strerror(errno));
return 2;
}
// Socket to talk to clients
void *publisher = zmq_socket(context, ZMQ_PUB);
if (publisher == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_PUB) failed: %s\n",
zmq_strerror(errno));
return 4;
}
rc = zmq_bind (publisher, "tcp://*:5561");
if (rc != 0) {
fprintf(stderr, "Error: zmq_bind failed: %s\n", zmq_strerror(errno));
return 5;
}
// Socket to receive signals
void *syncservice = zmq_socket (context, ZMQ_REP);
if (syncservice == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_REP) failed: %s\n",
zmq_strerror(errno));
return 6;
}
rc = zmq_bind (syncservice, "tcp://*:5562");
if (rc != 0) {
fprintf(stderr, "Error: zmq_bind failed: %s\n", zmq_strerror(errno));
return 7;
}
// Get synchronization from subscribers
printf ("Waiting for subscribers\n");
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
char tmp[24];
// - wait for synchronization request
int size = zmq_recv(syncservice, tmp, sizeof(tmp), 0);
if (size < 0) {
fprintf(stderr, "Error: zmq_recv failed: %s\n", zmq_strerror(errno));
return 8;
}
// - send synchronization reply
rc = zmq_send(syncservice, "", 0, 0);
if (rc < 0) {
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 9;
}
subscribers++;
}
// Now broadcast exactly 100,000 updates followed by END
printf ("Broadcasting messages\n");
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++) {
char buf[24];
int len;
sprintf(buf, "Msg #%d", update_nbr+1);
len = strlen(buf);
int size = zmq_send(publisher, buf, len, 0);
if (size != len) {
if (errno == EAGAIN) {
update_nbr--;
continue;
}
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 10;
}
}
int size = zmq_send(publisher, "END", 3, 0);
if (size != 3) {
fprintf(stderr, "Error: zmq_send failed: %s\n", zmq_strerror(errno));
return 11;
}
sleep(10);
zmq_close(publisher);
zmq_close(syncservice);
zmq_term (context);
return 0;
}
syncsub.c:
#include
#include
#include
#include
int
main(int argc, char *argv[])
{
const char *prefix = (argc > 1 ? argv[1] : "");
void *context = zmq_ctx_new();
if (context == 0) {
fprintf(stderr, "Error: zmq_ctx_new failed: %s\n", zmq_strerror(errno));
return 1;
}
int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
if (rc != 0) {
fprintf(stderr, "Error: zmq_ctx_set failed: %s\n", zmq_strerror(errno));
return 2;
}
// First, connect our subscriber socket
void *subscriber = zmq_socket(context, ZMQ_SUB);
if (subscriber == 0) {
fprintf(stderr, "Error: zmq_socket(ZMQ_SUB) failed: %s\n",
zmq_strerror(errno));
return 4;
}
rc =