Hi, I've been following the discussions about needing to call zsocket_bind() before zsocket_connect(). In one discussion, it was suggested that in an asynchronous system where you're not able to guarantee order, you could retry the connect until it succeeds:
(See http://grokbase.com/p/zeromq/zeromq-dev/12aj8bn29c/inproc-need-to-bind-to-an-address-before-connect ) However, I've not been able to get this to work in my use case, a simplified version of which is attached below. Briefly, the idea is a logging thread which coordinates the logging of messages sent from other threads (one of which may be the logging thread's parent... is this an issue?). The only data shared between threads is the czmq context. In the example below, the parent first spawns the logger thread, then a worker thread. Then both the worker thread and the parent try to connect to the logger thread to send a number of messages. If I introduce a small pause (e.g. 1 millisecond or more) *after* starting the logger thread but before connecting, then everything seems to work. If there is no pause, then not only does the initial connect never work, but even after trying hundreds of times over a number of seconds, connection still fails (CONNECTION REFUSED). I'd prefer to use this retry connection approach if possible. Should it work? Or am I doing something else dumb? The small program below demonstrates the problem reliably on my 4 core centos 6 VM (running on windows 7 via VMware). It runs with one argument - the number of milliseconds to wait after creating the logger thread. Set this value to 0 to see the problem, or a value greater than 1 to see it working. When working, messages from the parent and worker threads should be atomically echoed to stdout. When not working, neither thread manages to connect. Thanks! Andy //START OF bindwait.c //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // This test creates a logger thread which listens via an inproc 'PULL' socket for // messages to output, and then creates a worker thread. The worker thread and // the parent then both attempt to send messages to the logger thread using // a 'PUSH' connection. // // COMPILING: // (assumes zmq and czmq have been installed) // gcc -o bindwait -I/usr/local/include/ -lzmq -lczmq bindwait.c // // RUN TO SHOW PROBLEM: // // bindwait 0 // // RUN WITHOUT PROBLEM - by specifying a period in milliseconds to wait // after starting the logger thread: // // bindwait 200 #include "czmq.h" #include <stdio.h> // Open a connection to the logger thread and send it a message a number of times static void send_log_messages( zctx_t* ctx, const char* message, int num_sends, int ms_between_sends) { int num_tries = 500; int sleep_ms = 10; int rc,tries; void* src_socket = zsocket_new(ctx, ZMQ_PUSH); assert(src_socket); // zsocket_connect() will fail if the corresponding zsocket_bind() in the // logger thread hasn't completed, so try a number of times until it succeeds: for(tries = 1; tries <= num_tries; tries++) { rc = zsocket_connect(src_socket, "inproc://PushPull"); if (rc == 0) break; else zclock_sleep(sleep_ms); } if (rc != 0) { printf("Child failed to connect after %d tries, error = %d - %s\n", tries, zmq_errno(), zmq_strerror(zmq_errno()) ); } else { // Send some messages to the logger thread! if (src_socket) { int m; for (m = 0; m < num_sends; m++) { int rc = zstr_send (src_socket, message); assert(rc == 0); zclock_sleep(ms_between_sends); } } } zsocket_destroy (ctx, src_socket); } // writes a log message which has arrived static int output_log_message(zloop_t* reactor, zmq_pollitem_t* log_poller, void* dst_socket) { assert(dst_socket); char* message = zstr_recv_nowait(dst_socket); printf("Message received: %s \n", message); free(message); return 0; } // listens for log messages arriving static void* logger_thread(void* arg) { zctx_t* ctx = ( zctx_t* )arg; void* dst_socket = zsocket_new(ctx, ZMQ_PULL); assert(dst_socket); int rc = zsocket_bind(dst_socket, "inproc://PushPull"); assert(rc == 0); zloop_t* reactor = zloop_new(); assert(reactor); zmq_pollitem_t log_poller = { dst_socket, 0, ZMQ_POLLIN }; rc = zloop_poller(reactor, &log_poller, output_log_message, dst_socket); assert(rc == 0); zloop_start(reactor); printf("Logger Thread Exiting...\n"); return 0; } // A thread which generates log messages static void* worker_thread(void* arg) { zctx_t* ctx = (zctx_t*)arg; send_log_messages( ctx, "Worker!", 20, 10); printf("Worker Thread Exiting...\n"); return 0; } int main(int argc, char* argv[]) { int thread_wait_ms; if ( argc != 2 ) { printf("Usage: %s <milliseconds to sleep after starting logger thread>\n", argv[0]); exit(-1); } else { sscanf( argv[1], "%d", &thread_wait_ms ); } // Create a context which will be shared by all threads zctx_t* ctx = zctx_new(); assert(ctx); // Create a thread which will listen for log messages to output int rc = zthread_new(logger_thread, (void*)ctx); assert(rc == 0); if (thread_wait_ms > 0) { printf("Waiting for %d milliseconds after logger thread creation...\n", thread_wait_ms); zclock_sleep(thread_wait_ms); } // Create a thread which will generate some log messages to be output rc = zthread_new(worker_thread, (void*)ctx); assert(rc == 0); // We're going to send log messages from the parent too: send_log_messages( ctx, "Parent!", 20, 10); // Wait a bit before exiting: zclock_sleep(1000); zctx_destroy (&ctx); } //END OF bindwait.c //////////////////////////////////////////////////////////////////////////////////////////////////////////////// -- Andy Ballingall Senior Software Engineer The Foundry 6th Floor, The Communications Building, 48, Leicester Square, London, WC2H 7LT, UK Tel: +44 (0)20 7968 6828 - Fax: +44 (0)20 7930 8906 Web: http://www.thefoundry.co.uk/ The Foundry Visionmongers Ltd. Registered in England and Wales No: 4642027 _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
