Good morning,

with the release of Java 25, I’ve attempted to migrate my 
virtual-thread-native, reactive-streaming-like library from Java 21 to Java 25 
scopes. So far I’ve been using my own wrapper on StructuredConcurrencyScope, 
but with that migration I wanted to eliminate it, and just use SCS directly. 
However, I encountered some problems with SCS’s design; I've summarised them in 
a blog 
(https://softwaremill.com/critique-of-jep-505-structured-concurrency-fifth-preview),
 and then prompted by a discussion on Reddit that followed 
(https://www.reddit.com/r/java/comments/1nq25yr/critique_of_jep_505_structured_concurrency_fifth/),
 I’m writing here.

Here’s a summary of the issues:

1) SCS works great when tasks are independent, and known upfront; that is, when 
tasks aren’t dynamically generated based on computations that are part of the 
scope. In that case, some communication between the forks & the main scope body 
is needed - typically using queues. The example I give in the article is of a 
web crawler, however it might not be the best one, as someone on Reddit already 
pointed out that a better implementation using SCS nested scopes exists. Still, 
if we add requirements around rate limiting, per-domain connection pooling 
etc., a solution with a centralised coordinator becomes more viable. Other 
examples might include implementing an actor-like component, where the actor’s 
body becomes the scope's body, handles some private (mutable) state, and 
communicates with child processes (forks created in the scope) using queues.

Such a centralised coordinator (implemented as the scope’s body) does not, 
however, participate in the error handling contract of the forks. If there’s an 
exception in the forks, (using the default Joiner) the scope will be cancelled, 
and all other forks will be interrupted - and that’s of course correct. 
However, the main scope body won’t be (and it can’t be, as the interruption 
could escape the scope). If the main scope body includes any blocking logic, it 
might end up hanging indefinitely, while all the other forks have been 
cancelled.

To make the problem a little bit more concrete, I encountered the above (and 
problem number 2 as well) when implementing Jox 
(https://github.com/softwaremill/jox) `Flow` stages. For example, when merging 
two streams, the merged substreams need to be run in the background, 
concurrently, within a concurrency scope. The main scope’s body awaits for data 
from either of them (on a queue), and when an element is produced, sends it 
downstream. Now, if we’re not careful with error handling, an exception in one 
of the substreams will cancel the scope, but the main scope will indefinitely 
wait on data, not aware of the error.

I’m currently solving this by still having my custom wrapper on top of scopes. 
The wrapper essentially provides custom concurrency scopes, which run the main 
scope logic in a fork, making it participate in the error handling/cancellation 
properties of all other forks. Moreover, I allow creating forks-in-forks, so 
that the main logic can create forks at will.

2) If the scope’s body does include some non-trivial coordination logic, then 
it should also be able to decide that a scope is "done". My example here is 
that there might be two kinds of forks: some implementing the actual logic, and 
some serving "helper" functions. Following the crawling example, the main-logic 
forks would be the crawlers; helper forks could e.g. implement monitoring. Now, 
when the coordinator (main scope body) decides that the computation is done, 
the scope should be completed, cancelling any helper forks. Currently this can 
be implemented through a `Joiner`. However, if the data needed to decide, if a 
scope should complete is present in the coordinator (scope body), it’s 
problematic to pass that information to the `Joiner`. My work-around here is to 
create a `Joiner` which monitors an `isDone` flag, and submit an empty task 
after the work is determined to be done:

void main() throws ExecutionException, InterruptedException {
    var isDone = new AtomicBoolean(false);
    try (var scope = StructuredTaskScope.open(
        new CancellableJoiner<>(isDone))) {
        // some logic

        isDone.set(true);
        scope.fork(() -> {});

        scope.join();
    }
}

class CancellableJoiner<T> 
    implements StructuredTaskScope.Joiner<T, Void> {

    private final AtomicBoolean isDone;
    CancellableJoiner(AtomicBoolean isDone) { this.isDone = isDone; }

    public boolean onFork(
        StructuredTaskScope.Subtask<? extends T> subtask) {
        return isDone.get();
    }

    // ...
}

But it’s more of an ugly trick, rather than a proper solution. I think the 
essence of the problem is that the logic of the scope is divided between the 
scope body, and the `Joiner` implementation, and it’s hard to keep them in sync.

3) The final two problems are more of nitpicks. First, a `timeout` method can 
easily be implemented using the machinery of the SCS, without additional 
configuration parameters. Special-casing for this seems odd, as timeout is only 
one example from a family of "resiliency" methods, others including retries, 
repeats etc. These as well, could be implemented on top of virtual threads and 
SCS as methods, without special support from the SCS API itself.

4) The Subtask.get() method is confusing, as it has the semantics of 
Future.resultNow(), but the nomenclature of Future.get(). Since Future.get() is 
quite well established, I think it’s reasonable to assume, without prior 
knowledge of the SCS API, that Subtask.get() is blocking as well. However, it 
works rather differently. I understand that .get() is a fitting name, however 
given the existing functionalities in place, I would consider changing the name 
to something without naming or semantical clashes.

----

One might say, that my requirements are above regular use-cases for SCS. 
However, I view SCS as the new base building block for IO-bound concurrency in 
Java, and that any future concurrency libraries or projects should use it. This 
is supported by the fact that `ScopedValue`s are only inherited within SCSs - 
and I think any concurrency library or feature should provide such propagation. 
Hence, it should build on top of SCS. That’s why I’m seeking how to make SCS 
more flexible, to accommodate also for the more unusual or advanced use-cases.

Thank you for your work on SCS and Virtual Threads! I think what’s currently 
there is really great and powerful, even if not (yet) perfect :).

Regards,
Adam Warski

-- 
Adam Warski

https://warski.org
https://twitter.com/adamwarski

Reply via email to