Hi Gerard, > As I'm not too familiar with the code yet, I started to investigate an > approach into simple subscription forwarding. > The concerns are: > > 1. Existing code must not break because of changes ( the default > initialization reflects how people use it today ). This means default it > does not use filtering at all.
There's filtering there even today. Have you see ZMQ_SUBSCRIBE socket option? > 2. Try to minimize performance impact and code complexity for > maintaining subscriptions (bloom filter used here). Ack. > 3. Minimize the overall impact on the current codebase, so least > intrusive (as little changes as possible). Ack. > > > The archive can be found here ( too large for mailing list / pastebin ): > > > http://www.radialmind.org/files/pubforward.tar.gz > > > My intention is to see if I'm heading in the right direction and whether > this code, once cleaned up, is making any chance at all for getting > committed. > > > Explanation about the approach: > ======================== > > - A subscription forwarding publisher is a device with two zmq sockets. > One is the socket over which data is published (the pub socket), another > is a > REQ socket, which is used to update subscriptions to clients that > have connected to the pub socket. It is required in the implementation that > clients set an identity on the SUB socket (which has implications, of > course, for the semantics of message queueing and when subs go offline). > > - A subscriber / client, after it connected to the pub socket, then uses > a REQ socket to request being added to some topic. > The protocol here is very simple: it sends a multipart message of > identity and topic on the REQ socket and the pubforwarder (how i called > it for now), > manipulates the filter on a particular pipe (writer_t pipe for the > client socket, found through the session id through the 'identity' ) to > include the subscription. > > - Every message that the publisher intends to send is then filtered on > the filter for a specific client connection. It is then either actually > sent or ignored. > > - The client receives the message when subscribed, or else nothing. > > > See server.c and client.c for an example how it looks from the API side > of things. > > > issues: > ===== > > - I'm using "find_session" to find the session based on some identity. > It looks like this is not a simple "find" function, because it's increasing > a number used in a different context. Is it safe to use this > function any number of times? > > - The bloom filter only allows additions, but subscription removals are > more difficult. There are "counting bloom filters", where it stores a > number per bit location, > but then the same subscription applied multiple times require an > equal number of unsubscribes. > > - Another way to do this implementation is to create a new socket "sub" > type, derived from pub_t and then reimplement the required methods to > include > the filter. The only advantage there is to leave the pub_t clean as > is and it saves a couple of "ifs" when sending messages. > > - The bloom filter here was taken from google code and hasn't been > formally tested or verified for correct functioning and such. It also > has a CPL license, > which may not be compatible with the 0MQ license? So it's possible > that this implementation needs to be redone entirely. > > - Is there any need for a "filter" interface of some sort, so that new / > different implementations of "a" filter can be done in the future and > where for example > sockopts are used to choose the filter in use and configure it > accordingly? > > > Ok, so this is the draft proposal. Let me know what you think and how to > proceed. I think you've started too high in the stack. Note that there's a "subscription" feature in 0MQ even today. We don't really have to change semantics or add any new concepts to the design. All that's needed is to forward the subscriptions from the subscriber all the way up through the distribution hierarchy. That consists of 3 more or less self-contained parts: 1. Forwarding of subscription from the application thread (socket) to the I/O thread (session). 2. Forwarding of subscription from the I/O thread to the peer application. In other words, the subscription has to be passed on the wire upstream. 3. Forwarding a subscription through a forwarder device. While accomplishing these three steps there are several problems to solve: 1. The filtering algorithm has to be able to maintain several sets of filters (one per downstream connection). Jon Dyte have done some work on this. He may share the code. 2. Unsubscriptions have to be propagated up the tree in the same way as subscriptions. 3. When connection between two nodes breaks, related unsubscriptions should be generated automatically. 4. When connection between two nodes in (re)established, subscriber should forward all the subscriptions upstream. Martin _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
