On Wed, 15 Feb 2017 08:35:09 -0800, [email protected] wrote:
> Hope this helps, and I'll keep the issue under consideration.
So the time came to tackle getting supply/react/whenever syntax capable of
playing nice with non-blocking await, and I decided as part of those changes to
look at both this problem and also the more general problem of lack of good
back-pressure and, related to that, lack of fairness when using the
supply/react syntax.
To recap, until very recently, a `supply` or `react` block instance had its own
processing queue. If it was empty, the emitting thread would enter and run the
required code. If any messages were emitted to it in the meantime, they would
be queued asynchronously. When the sender of the currently-being-processed
message was done, it would check if there was anything added to the queue in
the meantime, and if so it would process those too. This mechanism also handled
recursive messages by queuing them up (this occurs when some code running in a
`supply` block instance results in another emit being sent to the same `supply`
block instance). The asynchronous queuing, however, meant that the cost of
processing a message didn't push back on senders as it should have.
I've just finished (I hope :-)) re-working the Supply internals to instead use
asynchronous locking throughout. An asynchronous lock returns a Promise that
will already be Kept if the lock is already available, or will be Kept once it
is available. Multiple contenders queue for the lock. This, in combination with
non-blocking `await` of the Promise, forms the new supply concurrency control
model, used consistently in `supply` blocks and elsewhere in the supplies
implementation (previously, the code elsewhere used a real mutex, which gave
its own set of issues).
On its own this cannot replace the previous mechanism, however, because the
queuing was used in preventing various forms of deadlock, especially recursion.
It also would cause problems for any `whenever` tapping a `supply` that emitted
values synchronously after being tapped (as in your case). The former is
resolved by detecting lock recursion and, in that case falling back to queuing
the work to run later, using the thread pool. The latter is resolved with a
custom Awaiter implementation: if anything during the processing of setting up
a `whenever` block does an `await`, a continuation is taken, and then - after
the setup work of the `supply` block is completed - the continuations are
invoked.
This latter case is relevant to the original subject to this ticket, because
with the supply concurrency control mechanism now being asynchronous locking,
the outcome of the `emit` that previously queued endlessly is now an `await`
instead. Thus the setup of the consumer is allowed to complete, before the
producer is resumed. Any further awaits are also collected and handled in the
same way, until we run out of them. The effect is that if we rewrite the
original code (to use the CLOSE phaser, not a hack with a role):
sub make-supply() {
supply {
until my $done {
say "Emitting ...";
emit(++$);
}
CLOSE $done = True;
}
}
my $s2 = make-supply;
react {
whenever $s2 -> $n {
say "Received $n";
done if $n >= 5;
}
}
Then it will produce:
Emitting ...
Received 1
Emitting ...
Received 2
Emitting ...
Received 3
Emitting ...
Received 4
Emitting ...
Received 5
Furthermore, if it is written as just:
sub make-supply() {
supply {
loop {
say "Emitting ...";
emit(++$);
}
}
}
my $s2 = make-supply;
react {
whenever $s2 -> $n {
say "Received $n";
done if $n >= 5;
}
}
The output is similar:
Emitting ...
Received 1
Emitting ...
Received 2
Emitting ...
Received 3
Emitting ...
Received 4
Emitting ...
Received 5
Emitting ...
Note the one extra "Emitting ...". The `emit` operation will now check if the
supply block is still active; in this case, it was closed by its consumer, so
it won't bother emitting and won't bother resuming either (emit is a control
exception, which is why we can unwind the stack and thus exit the loop).
Finally, this model means that:
sub make-supply() {
supply {
my $i = 0;
loop {
say "Emitting 2000 messages" if $i %% 2000;
emit(++$i);
}
}
}
my $s2 = make-supply;
react {
my $received = 0;
whenever $s2 -> $n {
$received++;
}
whenever Promise.in(1) {
say "Received $received messages";
done;
}
}
Works - as in, the second `whenever` block gets its fair chance to have a
message processed too, so the output is something like:
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Emitting 2000 messages
Received 17291 messages
I've added some tests to S17-supply/syntax.t. Thanks for the original ticket;
hopefully this solution will make things better for those writing "source"
supplies.