I have followed the 0MQ mailing list for about a year, experimented with 0MQ and contributed to the 0MQ adaptor for plack. I like many of the features of 0MQ, including asynchronous I/O, multi-language support, fan-out/fan-in connections and end-point connection syntax. But there are a number of things that I find frustrating and that hinder my use of 0MQ for more applications, including: the "slow joiner" syndrome, which limits the effectiveness of dynamically adding new workers when its observed that the current set of workers needs help handling the load (The Guide: Getting the Message Out) push sockets that block when there are no connections, preventing a ventilator from doing useful work while waiting for connections devices (e.g. routers) that ruin the fair-queuing properties, resulting in the possibility that workers connected to a device may be idle while workers directly connected to the original source are over-burdened (i.e. have messages sitting in their queues) High Water Mark (HWM) configuration that offers little protection from memory exhaustion when the number of anticipated connections is unknown (The Guide: (Semi-)Durable Subscribers and High-Water Marks) Message loss (due to network I/O buffers) even when using durable sockets (The Guide: Transient vs. Durable Sockets) It seems to me that this non-determinism arises from 0MQ using a "push" model at its core, where 0MQ tries to push messages out as quickly as possible.
I wonder how things would change if 0MQ used a "pull" model instead? Here is what I mean by a pull model: Every socket has a queue (SEND, RECV, or both) of configurable length A producer application continues to generate messages as long as its SEND socket queue is not full: the producer blocks when the queue is full and does not unblock until a message is delivered to a consumer When a RECV socket starts up, it asks its connected producers for exactly as many messages as would fill up its RECV queue I don't know enough about 0MQ to determine if this idea could be incorporated into 0MQ, or if it describes something else. Please read on to see how this change could alleviate a lot of fustrations, both for me and for others. The PUB/SUB scenario is probably the easiest one to describe: A publisher application sets up a PUB socket with a queue long enough to cover subscriber start up times and acceptable subscriber outages. The subscriber continously writes messages to the PUB socket: when the queue gets full, the socket removes the oldest message to make room for a new message. The publisher is never blocked and never faces memory exhaustion (unless too large of a queue was specified) A subscriber application connects to the publisher (passing in its filters) and the SUB socket asks for its queue length number of messages from the publisher. Later, when space becomes available on its queue, the SUB socket sends the message number of the most recent message on its queue (to the PUB socket) and asks for the next (filtered) message with a higher number. If the message number sent by the SUB socket refers to a message that no longer is in the PUB sockets queue, then the SUB socket can be notified that messages (may) have been missed: the PUB queue may need to be lengthened if this happens too frequently. A proxy (a SUB/PUB device) is a very simple device: it has a SUB socket with a very short queue (possibly 0 length) connected to a PUB socket queue with length set as desired (probably set to the same length used for the publisher's PUB socket). From the subscribers point of view, it can't tell the difference between the proxy and the publisher (other than latency). In fact, the subscriber's SUB socket could be connected to both the publisher and the proxy, and alternate getting messages from one and the other (provided both the publisher and proxy have the same session ID). If a subscriber dies or is restarted, there are two possibilities. If the subscriber "remembered" the number of the last message it processed (e.g. if it was written to disk), then it can pass this number to the SUB socket at creation time, which would use it as a starting point for building its queue. If the subscriber is restarted within the time period covered by the PUB sockets queue, then no messages would have been missed. If the subscriber did not remember the message number, then the SUB socket would build its queue starting from the beginning. This means that the subscriber could end up processing the same message twice. If the publisher dies or is restarted, then the PUB socket queue would be lost (or not - see the next point) and there would be two possibilities. If the publisher "remembered" the number of the last message it pushed onto the PUB socket, then it could continue publishing from where it left off (by passing the session ID and this number to the PUB socket at creation time). If it did not remember the number of the last message, then the PUB socket would generate a new session ID and restart message numbering from 0. When a SUB socket receives a message with a new session ID, a configuration setting would specifiy if it should keep the messages associated with the previous session ID, or discard them. It is very simple to preserve a publisher's PUB socket queue across publisher restarts: simply put a proxy in front of the publisher. If the architecture must be resilient to a proxy device dying or being restarted, then put a second proxy in front of the publisher and configure subscribers to connect to both proxies. I don't know how these ideas will work over (E)PGM. The PUSH/PULL scenario is a little more complicated than the PUB/SUB scenario: A ventilator application sets up a PUSH socket with an application specific queue size. The ventilator pushes messages onto the socket until the queue is full, at which point it blocks, and then unblocks once a message is sent to a worker (or if messages have shelf lives, until a message expires). A worker application sets up a PULL socket with a very short queue length, typically of length 1 or 0. There is no waiting to co-ordinate with other workers, and because there is at most one message in the worker's queue, other workers can pick up other messages when they come on line, effectively preventing the "slow joiner" syndrome. A worker application typically also sets up a PUSH socket with a queue length that is application specific. If it is important to keep busy all the downstream workers (across all pipe lines), then the PUSH queue should be kept short so that the worker application blocks and forces the ventilator to distribute more work to other pipelines. If this does not matter, then the queue length could be considerably longer. A sink application sets up a PULL socket with an application specific queue size: a short queue will regulate the message flow through the system (similar to the effect of setting a short queue on a worker's PUSH socket), whereas a long queue lets the upstream workers operate as quickly as possible. A streamer (a PUSH/PULL device) is interesting because it has PUSH socket with a queue length 0 and PULL socket with a queue that varies in length depending on the number of outstanding upstream requests, which is the number of messages it requests from the ventilator (or upstream streamer). Now, the ventilator's PUSH socket has the information necessary (the number of requests from each connection) to make intelligent decisions about how to distribute its messages, resulting in all workers getting their fair share of work (even when they are connected to the ventilator through a streamer or two). In other words, a streamer device does not perterb the end-to-end fair-queuing policies. Preserving message queues when components die or are restarted in this scenario is more complicated than it is for PUB/SUB. It can be done by having sockets "shadow" the queues of sockets at the other end of their connections. For example, instead of discarding a message after it has been delivered downstream, a PUSH socket could move the message to a "pending delivery" queue (a shadow queue) which would hold the message until there is a confirmation that the message was removed from the PULL queue of a worker application (streamer devices are not considered workers). If the worker (or streamer) dies or is restarted, then the ventilator can resend the messages in the pending delivery shadow queue; if the worker is not restarted quickly enough (configurable to the application), then the PUSH socket will move the message out of the shadow queue back onto the PUSH queue for delivery to another worker. On the flip side, if there is no worker (nor streamer) ready to receive a message on the ventilator's PUSH queue, then the PUSH socket selects the PULL socket most likely to request that message and sends it to that socket to be put on its "future delivery" shadow queue while leaving it on its PUSH queue (which could cause the ventilator to block): if the PULL socket does not request a message before another PULL socket is ready, then the PUSH socket could send the message to another PULL socket and tell the first PULL socket to remove it from its future delivery queue; otherwise, if it is the first PULL socket to request another message, then the PUSH socket could tell it to take the message from its future delivery queue. Now if a ventilator is restarted, and it has remembered the last message it sent to its PUSH socket, then the PUSH socket could recreate its queue based on information sent back from the PULL sockets that re-establish connections: the messages from a connected PULL socket's queue would go onto the pending delivery queue and the messages from the PULL socket's future delivery queue would go onto the PUSH socket's (primary) queue. If a streamer device is restarted, then it would recreate its state from re-established connections on both sides. Even with the shadow queues, it is possible to lose the message being worked on by a worker when it dies or is restarted. If this situation is not tolerable, then messages could be kept on the ventilator's (and intermidiary streamer devices) pending delivery queue until the worker has explicitly confirmed that is has processed the message (for example, after it sends something on to the sink application). This implies that the worker's PULL socket also has a "pending confirmation" queue (of length 1) to keep track of where to send the confirmation. Note that even though a ventilator might have a small queue, its shadow queue grows with the number of workers. However, it is a linear growth that should easily be handled by the ventilator. If not, then the "pending delivery" could be configured to have a maximum size, in which case the PUSH socket would block when the queue reaches the maximum length. Also note that this approach makes it easy for a PUSH socket to reassign a "pending delivery" message to another PULL socket if there is another PULL socket waiting for messages and the original PULL socket has not delivered the message to a worker within a specified period of time (as is done when a worker dies). Fundamentally, I am suggesting all the PUSH/PULL sockets between the ventilator and the workers cooperatively maintain a single distributed queue that distributes work evenly across all workers and is resilient to component failures. Similarly, there would be another distributed queue between the workers and the sink (and between workers and workers in a pipeline). The REQ/REP scenario is similar to the PUSH/PULL scenario, but more complicated: A client application sets up a REQ socket that has an output queue of length 0, and a input queue (called the "pending delivery" queue in the PUSH/PULL scenario) of length 1. This implies that the client blocks until there is a server ready to take the request message. A server application sets up REP sockets that has an input queue of medium length (application configurable) and a "future delivery" queue of length 0. If the client is connected to two or more servers, then the client will be informed of how many requests each server is prepared to handle, enabling the client to direct request messages to the server with the most available capacity. It also means that request messages could be lost if the client dies because there are no "future delivery" queues holding copies of request messages. The difference between this scenario and the PUSH/PULL scenario is that the server attaches its response to the confirmation message sent back to the client's input queue. This implies that the "pending confirmation" queues must be set up more like a PUSH queue (complete with confirmation pending shadow queues) to ensure that the response message does not get lost in its journey back to the client application. A router (a REQ/REP device) behaves just like a streamer device that is configured to pass confirmation messages back upstream (because a confirmation message contains the response). Also like the streamer, a router does not perterb the end-to-end fair queueing policy. Notice that there is no need for XREQ and XREP separate sockets because their functionality already is included in these REQ and REP sockets. This suggests that "single outstanding request" limitation for client applications could be removed (by increasing the size of the REP socket's output queue). As discussed for the PUSH/PULL scenario, I am suggesting that all the REP/REQ sockets between the client and the server cooperatively maintain a single distributed queue that distributes work evenly across all servers and is resilient to component failures. I have not analyzed the PAIR pattern because it is "a low-level pattern for specific, advanced use-cases" (The Guide: Core Messaging Patterns). I realize that moving to a "pull" model is a major change to the internal architecture of 0MQ and the suggestions here mean a change to the interface. But, I think that 0MQ would be easier to explain and more attractive to a wider audience with these changes. My question is: Is there any possibility of these suggestions making it into 0MQ? Regards, Henry -- Henry Baragar Instantiated Software
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev