Re: [zeromq-dev] syncpub/syncsub example losing messages (ZMQ 3.2.0-rc1, Solaris 11)

2012-07-18 Thread Alex Keahan
Alex Keahan akeahan at gbtradingllc.com writes:

 $ ./syncpub 
 [1] 6611
 $ Waiting for subscribers
 
 $ ./syncsub A 
 [2] 6612
 $ ./syncsub B 
 [3] 6613
 $ Broadcasting messages
 Message mismatch: received 'Msg #1266' expected 'Msg #1001'
 A: received 1000 updates
 Message mismatch: received 'Msg #1266' expected 'Msg #1001'
 B: received 1000 updates
 


Whoops.   The high water mark for outbound messages was changed from 0 (no
limit) in 2.2 to 1000 in 3.2.

Not a bug, please disregard.


___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev


[zeromq-dev] syncpub/syncsub example losing messages (ZMQ 3.2.0-rc1, Solaris 11)

2012-07-17 Thread Alex Keahan
ZMQ 3.2.0-rc1 was built with the latest Sun Studio C/C++ compilers,
with CFLAGS and CXXFLAGS=-fast -library=stlport4.

$ uname -a
SunOS xx 5.11 11.0 i86pc i386 i86pc
$ CC -V
CC: Sun C++ 5.12 SunOS_i386 2011/11/16
$ cc -V
cc: Sun C 5.12 SunOS_i386 2011/11/16


The syncpub/syncsub example from the guide was adapted to 3.2.0 (see
the code below).

The original version of syncsub would simply get stuck in zmq_recv().
 I added extra code to detect sequence gaps and break out of the loop
whenever there's a discrepancy; now all syncsubs print received N
updates where N is somewhat random.   Slowing down the producer fixes
the problem.   Zmq reports no errors at any point.

As a side note, whenever there is a sequence gap, the next sequence
number received by syncsub appears to come from the start of the next
send() buffer sent by syncpub (confirmed by 'truss syncpub')

Any suggestions?

Alex Keahan


Run as follows:

$ ./syncpub 
$ ./syncsub A 
$ ./syncsub B 

You should see something similar to this:

$ ./syncpub 
[1] 6611
$ Waiting for subscribers

$ ./syncsub A 
[2] 6612
$ ./syncsub B 
[3] 6613
$ Broadcasting messages
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
A: received 1000 updates
Message mismatch: received 'Msg #1266' expected 'Msg #1001'
B: received 1000 updates

[2]-  Done./syncsub A
[3]+  Done./syncsub B


syncpub.c:

#include stdio.h
#include strings.h
#include unistd.h
#include zmq.h

// Wait for 2 subscribers
#define SUBSCRIBERS_EXPECTED 2

int
main(int argc, char *argv[])
{
  void *context = zmq_ctx_new();
  if (context == 0) {
fprintf(stderr, Error: zmq_ctx_new failed: %s\n, zmq_strerror(errno));
return 1;
  }
  int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
  if (rc != 0) {
fprintf(stderr, Error: zmq_ctx_set failed: %s\n, zmq_strerror(errno));
return 2;
  }

  // Socket to talk to clients
  void *publisher = zmq_socket(context, ZMQ_PUB);
  if (publisher == 0) {
fprintf(stderr, Error: zmq_socket(ZMQ_PUB) failed: %s\n,
zmq_strerror(errno));
return 4;
  }

  rc = zmq_bind (publisher, tcp://*:5561);
  if (rc != 0) {
fprintf(stderr, Error: zmq_bind failed: %s\n, zmq_strerror(errno));
return 5;
  }

  // Socket to receive signals
  void *syncservice = zmq_socket (context, ZMQ_REP);
  if (syncservice == 0) {
fprintf(stderr, Error: zmq_socket(ZMQ_REP) failed: %s\n,
zmq_strerror(errno));
return 6;
  }

  rc = zmq_bind (syncservice, tcp://*:5562);
  if (rc != 0) {
fprintf(stderr, Error: zmq_bind failed: %s\n, zmq_strerror(errno));
return 7;
  }

  // Get synchronization from subscribers
  printf (Waiting for subscribers\n);
  int subscribers = 0;
  while (subscribers  SUBSCRIBERS_EXPECTED) {
char tmp[24];

// - wait for synchronization request
int size = zmq_recv(syncservice, tmp, sizeof(tmp), 0);
if (size  0) {
  fprintf(stderr, Error: zmq_recv failed: %s\n, zmq_strerror(errno));
  return 8;
}

// - send synchronization reply
rc = zmq_send(syncservice, , 0, 0);
if (rc  0) {
  fprintf(stderr, Error: zmq_send failed: %s\n, zmq_strerror(errno));
  return 9;
}

subscribers++;
  }
  // Now broadcast exactly 100,000 updates followed by END
  printf (Broadcasting messages\n);
  int update_nbr;
  for (update_nbr = 0; update_nbr  10; update_nbr++) {
char buf[24];
int len;
sprintf(buf, Msg #%d, update_nbr+1);
len = strlen(buf);

int size = zmq_send(publisher, buf, len, 0);
if (size != len) {
  if (errno == EAGAIN) {
update_nbr--;
continue;
  }
  fprintf(stderr, Error: zmq_send failed: %s\n, zmq_strerror(errno));
  return 10;
}
  }

  int size = zmq_send(publisher, END, 3, 0);
  if (size != 3) {
fprintf(stderr, Error: zmq_send failed: %s\n, zmq_strerror(errno));
return 11;
  }

  sleep(10);

  zmq_close(publisher);
  zmq_close(syncservice);
  zmq_term (context);

  return 0;
}


syncsub.c:


#include stdio.h
#include strings.h
#include unistd.h
#include zmq.h

int
main(int argc, char *argv[])
{
  const char *prefix = (argc  1 ? argv[1] : );

  void *context = zmq_ctx_new();
  if (context == 0) {
fprintf(stderr, Error: zmq_ctx_new failed: %s\n, zmq_strerror(errno));
return 1;
  }
  int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
  if (rc != 0) {
fprintf(stderr, Error: zmq_ctx_set failed: %s\n, zmq_strerror(errno));
return 2;
  }

  // First, connect our subscriber socket
  void *subscriber = zmq_socket(context, ZMQ_SUB);
  if (subscriber == 0) {
fprintf(stderr, Error: zmq_socket(ZMQ_SUB) failed: %s\n,
zmq_strerror(errno));
return 4;
  }

  rc = zmq_connect(subscriber, tcp://localhost:5561);
  if (rc != 0) {
fprintf(stderr, Error: zmq_connect failed: %s\n, zmq_strerror(errno));
return 5;
  }

  rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, , 0);
  if (rc != 0) {
fprintf(stderr, Error: zmq_setsockopt failed: %s\n, zmq_strerror(errno));