Hi Winston, I think you really should install git and make a pull request with your patch, which looks great. We use a pretty simple process, it's explained in some detail in Chapter 6 of the Guide. It would be great to see you join the contributors to 0MQ.
If you need any help at all installing or using git for this, just ask! -Pieter On Fri, Mar 22, 2013 at 5:05 AM, Winston Huang <[email protected]> wrote: > there was a typo in the code below. it should be > pipes.swap(pipes.index(pipe_), eligible - 1); > > the entire snippet should be like this: > > > > if (pipes.index (pipe_) < matching) { > pipes.swap (pipes.index (pipe_), matching - 1); > matching--; > } > if (pipes.index (pipe_) < active) { > pipes.swap (pipes.index (pipe_), active - 1); > active--; > } > if (pipes.index (pipe_) < eligible) { > pipes.swap(pipes.index(pipe_), eligible - 1); > eligible--; > } > pipes.erase (pipe_); > > > > ----- Original Message ----- > > From: Winston Huang > > Sent: 03/21/13 09:13 PM > > To: ZeroMQ development list > > Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB > socket > > > that's a good point. I am actually not familar with the inner-working of zmq > at all. first time trying to dig into the code to fix the problem. that's > why I suspected my fix was not complete anyway. > > now I have the new version as you suggested: > > if (pipes.index (pipe_) < matching) { > pipes.swap (pipes.index (pipe_), matching - 1); > matching--; > } > if (pipes.index (pipe_) < active) { > pipes.swap (pipes.index (pipe_), active - 1); > active--; > } > if (pipes.index (pipe_) < eligible) { > pipes.swap(pipes.index(pipe_), eligible); > eligible--; > } > pipes.erase (pipe_); > > > > ----- Original Message ----- > > From: Martin Hurton > > Sent: 03/21/13 07:19 PM > > To: ZeroMQ development list > > Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB > socket > > > > Hi Winston, > > Thanks for the patch. I think there are still some cases which are not > handled though. > > For example, if there are 8 pipes handled by the dist: 2 are matching, > 4 are active, 6 are eligible and the last two are congested. What > happens when the first active pipe terminates? > > I would suggest to fix the terminated method so that the terminating > pipe is moved beyond the eligible pointer the same way as in the write > method. Then it can be safely erased. What do you think? > > - Martin > >> void zmq::dist_t::terminated (pipe_t *pipe_) >> { >> // Remove the pipe from the list; adjust number of matching, active >> and/or >> // eligible pipes accordingly. >> if (pipes.index (pipe_) < matching) >> matching--; >> if (pipes.index (pipe_) < active) >> active--; >> bool swapEligible = false; >> if (pipes.index (pipe_) < eligible) { >> eligible--; >> swapEligible = true; >> } >> if (swapEligible) { >> pipes.swap(pipes.index(pipe_), eligible); >> pipes.erase(eligible); >> } else { >> pipes.erase (pipe_); >> } >> } >> >> >> >> >> >> ----- Original Message ----- >> >> From: Pieter Hintjens >> >> Sent: 03/21/13 10:06 AM >> >> To: ZeroMQ development list >> >> Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB >> socket >> >> >> >> Hi Winston, >> >> Great analysis of the problem! Would you like to send a pull request >> with a patch? >> >> -Pieter >> >> On Thu, Mar 21, 2013 at 2:43 PM, Winston Huang <[email protected]> wrote: >>> Pieter, >>> >>> Thanks for your reply. I think I might have found the problem. I have a >>> xpub >>> socket that has about 10 subscribers. The following events happened: >>> >>> 1) one subscriber's hwm is reached and it's moved to the end of the >>> pipes. >>> (in zmq::dist_t::write). >>> >>> 2) another subscriber was terminated (zmq::xpub_t::xterminated), causing >>> the >>> pipe to be removed from the dist. however in zmq::dist_t::terminated, the >>> terminated pipe was removed by moving the last pipe to the to-be-removed >>> pipe's spot. therefore the deactivated pipe in step 1 is moved in the >>> front >>> of the pipes. in the meantime, the value of eligible and active are >>> decremented. therefore the last eligible pipe (which was in front of the >>> de-activated pipe before this event) now becomes in-eligible. and it will >>> not receive any messages after this. >>> >>> let me know if this makes any sense. it's hard for me to write a >>> standalone >>> test case like this. I hope my explanation is clear. And if you can >>> suggest >>> any fix, let me know. I can see one fix is to swap with the last >>> eligible >>> pipe and then delete that position. >>> >>> Thanks, >>> Winston >>> >>> >>> >>> ----- Original Message ----- >>> >>> From: Pieter Hintjens >>> >>> Sent: 03/15/13 09:15 AM >>> >>> To: ZeroMQ development list >>> >>> Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB >>> socket >>> >>> >>> >>> It sounds like a problem in the subscription forwarding, yet it's not >>> clear how a subscriber could be affected by the publisher restarting, >>> with the proxy in between. >>> >>> Do you need the proxy at all? First thing would be to connect >>> subscribers directly to the publisher. If the problem then still >>> happens we can try to make a reproducible test case. >>> >>> -Pieter >>> >>> >>> >>> On Fri, Mar 15, 2013 at 4:59 AM, Winston Huang <[email protected]> wrote: >>>> hi there, >>>> >>>> I have multiple (5-10) subscribers subscribing to the same topic >>>> published >>>> by one publisher. They are connected via a XSUB-XPUB proxy. All the >>>> subscribers are always up and the publisher may come and go at times. I >>>> have >>>> noticed that at times, after the publisher is restarted, one of the >>>> subscribers might stop receiving any messages at all. It's not a >>>> slow-joiner >>>> kind of issue because the publisher continues to publish message every >>>> second and that subscriber may not get any messages at all forever, >>>> whereas >>>> other subscribers are getting messages at the same time. I also verified >>>> that the subscriber is waiting for messages (it's calling the receive >>>> function.) and if I restart the subscriber, it will get messages again. >>>> >>>> Could someone enlighten me what I may be doing wrong? Is there any thing >>>> I >>>> should be looking into? >>>> >>>> Thanks in advance. >>>> Winston >>>> _______________________________________________ >>>> zeromq-dev mailing list >>>> [email protected] >>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >>>> >>> _______________________________________________ >>> zeromq-dev mailing list >>> [email protected] >>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >>> >>> >>> >>> >>> _______________________________________________ >>> zeromq-dev mailing list >>> [email protected] >>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >>> >> _______________________________________________ >> zeromq-dev mailing list >> [email protected] >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >> >> >> >> >> _______________________________________________ >> zeromq-dev mailing list >> [email protected] >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >> > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > > > > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
