On 2013-11-20 07:34:36 +0000, Chris Williams said:

On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy wrote:
This is the correct forum to post phobos proposals on.

Well then, here's what I had written:

A few applications I've considered implementing seem like they would be easier if there was a channel-based messaging system in std.concurrency. I'm happy to do this implementation, but I thought I would try to get some sort of sign-off before doing so. Following, I will lay out my argument for the addition, and then the API that I am considering.

---

One fairly common task is thread-pooling. With the standard send/receive model currently implemented, you have to choose a specific thread to target when you send a task. While it's true that you can simply iterate through your list of threads over and over, to spread the load evenly over them, that presumes that all tasks take even processing time. It makes more sense to be able to push data into a shared channel (secretly a work queue), and the first thread that finishes its previous task will be able to immediately pull the task before everyone else. This also means that the necessity of passing around references to your threads so that they can be looped over goes away.

I haven't tested it, but it looks like this sort of thing might be quasi-possible using the register/unregister/locate methods. As each thread starts, it can register itself with a named group (i.e. channel), and then anyone who wants to send an item to an arbitrary thread in that group can call locate() to retrieve one thread and call send() against the Tid. The target thread would then need to unregister itself while it is doing work, then re-register itself. My complaint against this is the need to unregister and re-register. If the thread issuing commands sends a large number of tasks all at once, they will all go to the same thread (if coded poorly) or the caller will need to use yield() or sleep() to allow the target thread to receive the task and unregister, so that locate() can find a different thread. That's not terribly efficient. I am also concerned that there's the chance that all threads will be unregistered when we call locate(), whereas a channeling system would be able to expand the mailbox during the times that all threads are busy.

The actual implementation within concurrency.d also concerns me as (if I read it correctly), the most recent item to register() will be the one which locate() finds, rather than the thread which has been registered the longest. While I suppose it's probably not too large of an issue if the same two threads keep taking all the tasks - that means that your load can't exceed two threads worth of processing power - it still seems like a LIFO system would be better. The registry is also based on an array rather than a set, which can make removal an O(n) operation, if the contents of the registry have to be shifted left, to fill an empty spot.

Overall, I think that adding a shared message box system would be a straightforward way to improve the handling of thread pooling via the actor model.

---

A less common use-case but I was also considering some world-simulators (e.g. for studying economics or building a game map) and here the ability to broadcast messages to a large set of other actors, based on location, interest, etc. seems useful. In this case, messages would need to be copied out to each subscriber in the channel rather than having an existence as a point to point connection. For a networked game, most likely you would want to break each channel into two, where locally all senders on a channel push to a single listener that pipes the messages over the network, and then remotely the messages would be broadcast to many listeners again, but that's a reasonably straightforward task for someone to implement on top of the channel functionality. I don't think that such functionality is needed in Phobos itself. Mostly, the presence of the broadcasting functionality in the standard library allows them to use the easy and safe actor model for more creative uses than a straight one-to-one pipe.

---

Overall, my hope would be to develop something that is conceptually no more difficult to deal with than the current send()/receive() model, but also able to be used in a wide variety of ways. The API that I would propose to develop is:

interface Channel {
        void send(T...)(T vals);
        void prioritySend(T...)(T vals);
        void receive(T...)(out Tid sender, T ops);
        receiveOnlyRet!(T) receiveOnly(T...)();
        bool receiveTimeout(T...)(Duration d, T ops);

        void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) doThisFunc);
}

class SingleChannel : Channel {} // Send inserts a message into a shared message box. Receive removes message

class DuplicateChannel(bool echo = true) : Channel {} // Send inserts the message into a message box per-recipient. Receive removes message in the calling thread's channel message box. If echo is false, messages will not be sent back to the sender, even if they are a registered listener

void registerSend(Channel c, Tid tid = thisTid); // used by function sendAll(). Channel can be of either type
void unregisterSend(Channel c, Tid tid = thisTid);
void registerReceive(Channel c, Tid tid = thisTid); // used by function receiveAll(). Channel can be of either type
void unregisterReceive(Channel c, Tid tid = thisTid);

void sendAll(T...)(T ops); // Sends a copy of message to all channels this thread has registered for. void receiveAll(T...)(out Channel c, out Tid sender, T ops); // Receives a message of type T from any channel that we are registered for. Returns channel and sender

I believe that the look and feel stays fairly consistent with the current set of functions in std.concurrency. I've added the ability for the recipient to infer information about the sender since, in the duplication model, I believe there are quite a few cases where this would be important information. And of course, I've added the option to register/unregister threads other than ourselves to allow a greater range of code layouts, though it's possible that the lack of this sort of thing in the original code is due to some sort of safety concern?

The most straightforward way to implement the DuplicateChannel would be to use the individual threads' message boxes, but this would mean that data put into a channel could be pulled out via the traditional receive() method. Currently, my intention would be to partition these two systems (the direct send()/receive() model and the channel model), unless anyone has any reason to think they should be merged into a single whole?

Those are my thoughts, anyways. Comments? Complaints?

How does one receive from multiple channels out-of-order? I would rather this sent it to the subscribed Tid via send, rather than having an additional queue. It could possible send a ChannelMessage which has a reference to the sending channel and the message. I understand this is a different model than what Go and whatnot use, but I think it's more pratical in some circumstances. Maybe both ways would be good? I personally use this method in my vibe-d server.

Reply via email to