2010/6/9 Martin Sustrik <[email protected]>
> Marcin,
>
> > The typical architecture would like this:
> >
> > C1 \ /W1
> > C2 ---- queue --- W2
> > .. ..
> > CN / \WM
> >
> > Queue handles workers registration, load-balancing and retrying. What is
> > the best way to implement it using 0MQ?
>
> Have a look here:
>
> http://www.zeromq.org/blog:multithreaded-server
>
> You want basically the same thing, the only difference being that the
> queue should be standalone application (zmq_queue) rather than
> in-process component.
>
Martin, thanks for the quick answer. I have tried it and see some
deterministic crashes. I don't know if this is a problem with ZeroMQ 2.0.7 I
have used or my code.
I am launching in one process several client threads:
void client_thread (zmq::context_t *ctx)
{
// This client is a requester.
zmq::socket_t s (*ctx, ZMQ_REQ);
// Connect to the server.
s.connect ("ipc://queue");
while (!stop_clients) {
// Send the request. No point in filling the content in as server
// is a dummy and won't use it anyway.
zmq::message_t request (req_size);
memset (request.data (), 0, request.size ());
s.send (request);
// Get the reply.
zmq::message_t reply;
s.recv (&reply);
}
}
In other process I am launching the queue:
zmq::context_t ctx (thread_count);
// Create an endpoint for worker threads to connect to.
// We are using XREQ socket so that processing of one request
// won't block other requests.
zmq::socket_t workers (ctx, ZMQ_XREQ);
string bind_addr =
string("tcp://")+interface+string(":")+boost::lexical_cast<string>(port);
workers.bind (bind_addr.c_str());
// Create an endpoint for client applications to connect to.
// We are usign XREP socket so that processing of one request
// won't block other requests.
zmq::socket_t clients (ctx, ZMQ_XREP);
clients.bind ("ipc://queue");
// Use queue device as a dispatcher of messages from clients to worker
// threads.
zmq::device (ZMQ_QUEUE, clients, workers);
And in another process I am launching the workers:
void worker_thread (zmq::context_t *ctx)
{
zmq::socket_t s (*ctx, ZMQ_REP);
s.connect ("inproc://workers");
pt::time_duration td = pt::microsec(1000000/reqs_sec);
while (!stop_workers) {
// Get a request from the dispatcher.
zmq::message_t request;
s.recv (&request);
// Our server does no real processing. So let's sleep for a while
// to simulate actual processing.
this_thread::sleep (td);
// Send the reply. No point in filling the data in as the client
// is a dummy and won't check it anyway.
zmq::message_t reply (resp_size);
memset (reply.data (), 0, reply.size ());
s.send (reply);
}
}
and a queue from TCP socket to inproc workers:
zmq::context_t ctx (thread_count);
// Create an endpoint for worker threads to connect to.
// We are using XREQ socket so that processing of one request
// won't block other requests.
zmq::socket_t workers (ctx, ZMQ_XREQ);
workers.bind ("inproc://workers");
// Connect to queue
// We are usign XREP socket so that processing of one request
// won't block other requests.
zmq::socket_t clients (ctx, ZMQ_XREP);
string connect_addr =
string("tcp://")+host+string(":")+boost::lexical_cast<string>(port);
clients.connect (connect_addr.c_str());
// Use queue device as a dispatcher of messages from clients to worker
// threads.
zmq::device (ZMQ_QUEUE, clients, workers);
After running queue, worker and client I get a SIGSEGV in client:
Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff5c49710 (LWP 9812)]
zmq::zmq_engine_t::in_event (this=0x688e70) at zmq_engine.cpp:123
123 inout->flush ();
(gdb) bt
#0 zmq::zmq_engine_t::in_event (this=0x688e70) at zmq_engine.cpp:123
#1 0x00007ffff7bba195 in zmq::epoll_t::loop (this=0x66a4e0) at
epoll.cpp:197
#2 0x00007ffff7bc9a67 in zmq::thread_t::thread_routine (arg_=0x66a520) at
thread.cpp:99
#3 0x00007ffff79879ca in start_thread (arg=<value optimized out>) at
pthread_create.c:300
#4 0x00007ffff6f376cd in clone () at
../sysdeps/unix/sysv/linux/x86_64/clone.S:112
#5 0x0000000000000000 in ?? ()
The sources are available at http://gozdal.com/zeromq.tar.bz2 (together with
CMakeLists.txt for building them - you need boost to build them).
--
Marcin Gozdalik
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev