Re: [zeromq-dev] External Event Loop
Hello Martin, I even tried with Edge-Triggered semantics. Still nothing happens. I have written test code to reproduce the problem. Kindly take a look. The FileService class in the test code is used to watch descriptors using libevent. Its tested to be working fine separately. I am also attaching FileService test code to this mail. If it works out, I shall contribute a general purpose proactor pattern implementation like boost::asio for zmq sockets. Thank you, Praveen On Fri, Dec 17, 2010 at 4:14 AM, Praveen Baratam < praveen.baratam+...@gmail.com > wrote: > Thank you Martin for the feedback. > > I will now try integrating with libevent which supports Edge-Triggered > semantics. > > Will contribute a reactor to zmq if it works out. > > Praveen > > > On Thu, Dec 16, 2010 at 6:05 PM, Martin Sustrik wrote: > >> On 12/15/2010 09:43 PM, Praveen Baratam wrote: >> >>> Libev is a strictly Level-Triggered event loop wrapper. The author >>> points out that its is required for portability. >>> >>> So the problem you mentioned does not arise as we are using >>> Level-Triggered event reactor. >>> >> >> ZMQ_FD is edge-triggered. They cannot be changed to level-triggered >> without serious impact on performance (decresing the throughput 10x or so). >> >> Martin >> >> > // // Name: ZMQTest.cpp // Author : Praveen Baratam // Version : // Copyright : All Rights Reserved. // Description : Test ZMQ external event loop integration // #include #include #include #include #include #include #include #include using namespace std; #define LOCAL_PUBLISHER "inproc://local-publisher" /** * * FileService is a class implementing Reactor pattern to watch files for ready events. * It uses boost::asio::io_service and boost::thread for scheduling and executing the event loop and handlers. * Internally it uses the libevent library to watch the supplied FDs. * * FDs can be attached and detached any time from any thread and the supplied callbacks * will be invoked by the internal thread. The internal thread also runs the loop internally * when there are FDs being watched with out the need to explicitly run the libev loop. * */ typedef boost::function< void (int fd, int) > FileEvent; typedef boost::function< void (void *) > AttachHandler; typedef boost::function< void () > DetachHandler; #define FS_READ EV_READ #define FS_WRITE EV_WRITE #define FS_ERROR EV_ERROR #define FS_ET EV_ET #define FS_PERSIST EV_PERSIST class FileService { public: FileService(int poll_interval /* Seconds*/) : _io_service(), _localThread(NULL), _poll_interval(poll_interval), _base(NULL), _watchCounter(0) { _localThread = new boost::thread(boost::ref(*this)); } ~FileService() { _localThread->interrupt(); delete _localThread; } void operator()() { cout << "Started File Service" << endl; startLoop(); while(1) { try { boost::asio::io_service::work work(_io_service); // Executes all the pending tasks and returns _io_service.run(); // Check for interruption to break out of the loop boost::this_thread::interruption_point(); }catch(boost::thread_interrupted &e) { break; } catch(...) { _io_service.reset(); cout << "Error in FileService" << endl; } } cout << "Stopped File Service" << endl; stopLoop(); } void join() { _localThread->join(); } void post(boost::function function) { _io_service.post(function); } void dispatch(boost::function function) { _io_service.dispatch(function); } void attach(int fd, int events, FileEvent* eventhandler, AttachHandler* attachHandler) { _io_service.dispatch(boost::bind(&FileService::doAttach, this, fd, events, eventhandler, attachHandler)); } void doAttach(int fd, int events, FileEvent* eventHandler, AttachHandler* attachHandler ) { struct event *watcher = event_new( _base, fd, events | FS_ET | FS_PERSIST, &FileService::watcherCallback, eventHandler); event_add(watcher, NULL); AttachHandler callback = *attachHandler; callback((void*)watcher); _watchCounter++; // Run the loop after attach to watch the fd _io_service.post(boost::bind(&FileService::runLoop, this)); } static void watcherCallback(int fd, short r_events, void *arg) { FileEvent callback = *(FileEvent*)arg; callback(fd, r_events); } void detach(void * watcher, DetachHandler* detachHandler) { _io_service.dispatch(boost::bind(&FileService::doDetach, this, (struct event *)watcher, detachHandler)); } void doDetach(struct event * watcher, DetachHandler* detachHandler) { event_del(watcher); event_free(watcher); DetachHandler callback = *detachHandler; callback(); _
Re: [zeromq-dev] Q; running multiple REP listeners on an IPC socket, only the last one connected gets any messages
Thanks all, I later re-re-read the documentation and realised I was misunderstanding ZMQ; - you can't bind multiple listen sockets to the same physical address with any protocol (by design) and there is Another Way To Do It. This re-binding failure is masked with IPC sockets (only last bind works) as you point out; but hey, user-error. Thanks! DrTune. On Fri, Dec 17, 2010 at 9:16 AM, Dhammika Pathirana wrote: > Martin, > > On Fri, Dec 17, 2010 at 3:31 AM, Martin Sustrik > wrote: > > Hi, > > > >> 1) Using one zmq context, I bind four separate zmq sockets (in java) to > >> the same ipc port; e.g. "ipc://tmp/test" > > > > Hm, you should be able to bind at most one socket to a particular > > endpoint. Are you sure all the four binds succeed? > > > > > All four binds will succeed, as we unlink() before we call bind(). > This is a bug and comments here are misleading. > > 217 else if (strcmp (protocol_, "ipc") == 0) { > 218 > 219 // Get rid of the file associated with the UNIX domain socket > that > 220 // may have been left behind by the previous run of the > application. > 221 ::unlink (addr_); > 222 > > > Dhammika > ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] Q; running multiple REP listeners on an IPC socket, only the last one connected gets any messages
Martin, On Fri, Dec 17, 2010 at 3:31 AM, Martin Sustrik wrote: > Hi, > >> 1) Using one zmq context, I bind four separate zmq sockets (in java) to >> the same ipc port; e.g. "ipc://tmp/test" > > Hm, you should be able to bind at most one socket to a particular > endpoint. Are you sure all the four binds succeed? > All four binds will succeed, as we unlink() before we call bind(). This is a bug and comments here are misleading. 217 else if (strcmp (protocol_, "ipc") == 0) { 218 219 // Get rid of the file associated with the UNIX domain socket that 220 // may have been left behind by the previous run of the application. 221 ::unlink (addr_); 222 Dhammika ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] zguide example implementation of msgqueue.py, mtserver.py and mtrelay.py
Hi, I have also added psenvpub.py and psenvsub.py in the github fork https://github.com/gaubert/zguide. The code is licensed under the MIT/X11 license. More should come. Cheers Guillaume On Thu, Dec 16, 2010 at 8:56 AM, Guillaume Aubert wrote: > Morning Martin, > > I forked the zguide project and sent you a pull request. > Just in case the examples are in this forked project: > https://github.com/gaubert/zguide > > I might add more examples in the next 2 days as I continue to > experiment with zmq. > > Let me know if you need some help. > > Cheers Guillaume > > On Wed, Dec 15, 2010 at 4:44 PM, Martin Sustrik wrote: >> Guillaume, >> >>> Thanks for answering me. >>> I have a github account so If you give me access to the zguide >>> repository, I could create branch and put the examples there. >>> Pieter will have a look at the examples in the branch and merge the >>> "example" branch into the main one when he is back and if the >>> translation is acceptable for him. >> >> Just fork the original zguide project and apply the changes there. You need >> no special rights for that. >> >> As I said, if Pieter is not back till the end of the year, we'll have to >> create an alternative process. Either I take the responsibility for >> uploading the patches or someone else may volunteer to do the work. >> >> Martin >> > ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] Mac OS X: test_shutdown_stress sometimes fails
Hi Dhammika, > Thanks for pointing this out. > I've fixed asymmetric calls in following patch. > Both read and write call finalize_initialization(), and flush() > dispatches engine, if it's already finalized. I've peer reviewed the patch thoroughly and applied it to the master. Thanks and sorry for the delay. Martin ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] poll does not return on a SUB socket
I'll try again, but the system ist already rather complex and it will be much effort to strip it down (maybe impossible). So I'd have to build the testcase from scratch. Current Scenario is the following:normal devices:- a modified queue device, implemented as LRU queue (not exactly as described in your docu, but same algo)- polls on - a connected REP-socket (service port) - a connected REP-socket (worker end of queue device) - a connected SUB-socket (subscription to dispatcher devices outer PUB-end)normal workers:- running behind their queue device- polling on REQ-socket- polling on SUB-socket (connected to the inner PUB-end of their device queue)dispatcher device: - same LRU-like queue device, but - the PUB/SUB direction goes the other way: "dispatcher" workers publish things, that are SUB'ed by "normal" workers - so it has an inner SUB-socket to the workers, shuffling messages to the outer PUB-end dispatcher workers:- also running behind the queue, but- polling on REQ-socket and their own devices PUB-endThe problem occurs in the dispatcher device:- one dispatcher gets a message, which should get published- the message gets recoded and definitely and errorfree send to the queue's inner SUB-end- the poll(... ,-1)ing dispatcher queue >>>should<<< receive it and suffle to it's outer PUB-socket, but poll does not return ;o(- when I send another message, I get a return code "2" from poll !!!- when I use a timeout, I get retcode 0 and the next loop polls again and immediately returns with "1" for the waiting message.I expect, that this only occurs, when using poll() on different socket types (2xREQ and 1xSUB), and it also seems to be a timing problem: according to the logs, the "delayed" message could arrive exactly at the time, when the queue calls poll(). Sometimes it works fine, but not often.A good hint for the bug finding could be, that poll(...,timeout) also returns "0", but the next call to it immediately returns "1". Nonetheless, I'l try to friggle some testcase over xmas, hoping, that I have a chance to easily show the buggy behaviour. ^5Sven - E = mc² ± 2dBA - everything is relative - -Original Message- Date: Fri, 17 Dec 2010 12:07:21 +0100 Subject: Re: [zeromq-dev] poll does not return on a SUB socket From: Martin Sustrik To: ZeroMQ development list Hi Sven, > the main issue is, that poll(... ,-1) does not at all return despite > the fact that there is a message available! > The usage of timeout value is a current workaround, because the second > call to poll (the first returned zero) successfully returns a number > greater one. Can you provide a minimal test case? Martin ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] zeromq and hdfs as persistent storage
Andreas, > hi there, in my cluster i want to have my queuing system based on zeromq > however i want to use as persistent storage of the messages the hdfs. do > you have any ideas how to start with? 0MQ is a messaging fabric, not a storage. Thus you should build you persistence layer on top of 0MQ. I.e. get message from database, send it to 0MQ socket. And: recv message from 0MQ socket, store it in database. Martin ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] Q; running multiple REP listeners on an IPC socket, only the last one connected gets any messages
Hi, > 1) Using one zmq context, I bind four separate zmq sockets (in java) to > the same ipc port; e.g. "ipc://tmp/test" Hm, you should be able to bind at most one socket to a particular endpoint. Are you sure all the four binds succeed? Martin ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] "Who said that?"
On 12/16/2010 10:39 PM, Oliver Smith wrote: > I'm not actually looking for the sender's address so much as who */I/* > got the message from, IE the previous hop. While the address and port > would be nice for diagnosing sources of bad data within a fabric (I > spent 2 days this week tracking down what turned out to be a bad stick > of ram in a machine corrupting outgoing data), it could be any > thread-unique value that discretely identifies a given TCP-socket-pair > connection to this zmq context. By bad data you mean malformed 0MQ frames? With 2.1 those are discarded silently and the connection is closed. We should also report the problem via sys://log. Martin ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] poll does not return on a SUB socket
Hi Sven, > the main issue is, that poll(... ,-1) does not at all return despite > the fact that there is a message available! > The usage of timeout value is a current workaround, because the second > call to poll (the first returned zero) successfully returns a number > greater one. Can you provide a minimal test case? Martin ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
[zeromq-dev] Q; running multiple REP listeners on an IPC socket, only the last one connected gets any messages
Hi all, Feeling my way around this most excellent (and lean!) middleware. I'm using both Java and c zmq libraries in different parts of my system. I may be misunderstanding or have a code bug but my assumption is that; 1) Using one zmq context, I bind four separate zmq sockets (in java) to the same ipc port; e.g. "ipc://tmp/test" 2) I have multiple client processes (on same machine of course) which connect dozens of REQ sockets to that same IPC address. Expected result: The four REP listener sockets would be roughly load-balanced with the requests; i.e. I could start 4 threads, each with a REP listening to the same IPC port, and they'd be approximately load balanced with messages. Actual result: I am finding that only the last REP socket connected actually gets any messages; the others get nothing. Is it a bug in my code or am I misunderstanding things? If IPC REP sockets cannot have multiple (balanced) BIND-ers, must I build a REP server with only one message listener thread? This doesn't match my understanding of zmq design goals, so I suspect it's my bug. Many thanks, DrTune. ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev
[zeromq-dev] zeromq and hdfs as persistent storage
hi there, in my cluster i want to have my queuing system based on zeromq however i want to use as persistent storage of the messages the hdfs. do you have any ideas how to start with? thanks in advance ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev