On Wed, 13 Oct 2010 20:55:04 +0400, Andrei Alexandrescu <seewebsiteforem...@erdani.org> wrote:

Why doesn't Sean's concurrency API scale for your needs? Can that be fixed? Would you consider submitting some informed bug reports?


Okay, now I got a few extra time so I can share what I missed in std.concurrency that made me implement my own message passing API:

1) Next to impossible to create and use multiple message boxes, which is a big pain for writing library code. std.concurrency operates on Tid, and you can't create new Tids (ctor is private). Tid is a very simple wrapper around a MessageBox class, and while you can create custom MessageBoxes, none of the public std.concurrency APIs work with them directly. As such you are stuck with one Tid per thread, and that's a no go for me. E.g. I have hundreds of concurrent socket connections, and I'd like to have different event handlers for different event sources (i.e. different SocketStreams).

2) Even if you are able to create N Tids, that's what event handling occurs:

for (int i = 0; i < messageBoxes.length; ++i) {
messageBoxes[i].tid.receiveTimeout(0 /* no timeout, blocking is not allowed*/, messageBoxes[i].callback );
}

This doesn't scale well. With hunders of message boxes, this loop will consume all the CPU time. Besides, callback must be defined as "void delegate(Variant)" and loses all the type information. It just throws all the events into the same bag. That's not okay for me.

3) I want to bind callbacks to event types in one place, and poll for events in another one. E.g. instead of:

void foo(FooMessage message) { ... }
...

tid.receiveTimeout(0, &foo, &bar, &baz};

I'd want to be able to

tid.register(&foo);
tid.register(&bar);
tid.register(&baz);
...
tid.poll(0 /* no wait */);

4) Event chaining is impossible to achieve. This is mostly because of #3.
Here is a more concrete example: I want to receive a incoming socket message notification, parse that message (extract http headers/contents etc) and then possibly dispatch new event. All this needs to be transparent to the user. E.g.:

HttpConnection connection = new HttpConnection("google.com"); // HttpConnection is needed to send multiple http requests over the same socket connection
HttpRequest request = new HttpRequest("/"); // main page

connection.execute(request, tid); // start

HttpConnection is using SocketStream under the hood.
When you call connection.execute(request), it connects to

tid.receiveOnly( (HttpResponse response) { writeln(response.contents); } );

With std.concurrency it is impossible to implement. Problem is that HttpResponse event is never sent, because SocketStream message is never received, because it is never polled for. The following could improve the situation:

class HttpConnection {
        void execute(HttpRequest request) { ... tid.register(&onNewMessage); 
... }
}

tid.register(&onHttpResponse);
tid.poll(); // not it polls for both messages!

When a new message arrives, the control is passed to HttpConnection, which is then passed to HttpRequest, which parses socket message and generates HttpResponse event, which is then received by user.

5) std.concurrency doesn't know about ThreadPools, and doesn't allow event processing in threads other than current one. This prevents code parallelization.

6) Tid can't redirect events to other Tids.

Here is an example:

void onNewEvent(Event e) { writeln(e.toString()); }

Mailbox m1;
m1.register(&onNewEvent);

Mailbox m2 = Mailbox(&m1);  // m1 is now a parent to m2
m2.raiseEvent(new Event()); // redirects to m1

m1.poll(INFINITE); // triggers event handling

Useful when you have hunders of mailboxes. Just poll one and all the events will be triggered.

That's pretty much that I needed (and my Mailbox provides) but std.concurrency lacks.

My mailbox implementation is very-very slim, full source code available here:
http://bitbucket.org/korDen/io/src/tip/io/mailbox.d

Reply via email to