Good morning!
> >1) SCS works great when tasks are independent, and known upfront; that is,
> >when tasks aren’t dynamically generated based on computations that are part
> >of the scope.
>
> I think I understand what you intend to say, but I think more specifically
> you're referring to when tasks are generated as a result of *other tasks*,
> not the scope body itself.
>
> Case in point:
>
> try(var scope = …) {
> while(dynamicCondition) // Dynamic number of tasks
> scope.fork(…);
> scope.join();
> }
>
> >someone on Reddit already pointed out that a better implementation using SCS
> >nested scopes exists.
>
> When in doubt, consider if nested scopes could make the design clearer. This
> is analoguous to breaking a large, complex, method-body down into multiple
> smaller ones.
>
> >Still, if we add requirements around rate limiting, per-domain connection
> >pooling etc., a solution with a centralised coordinator becomes more viable.
>
> Not sure how that conclusion was derived. Could you explain further?
>
> >Other examples might include implementing an actor-like component, where the
> >actor’s body becomes the scope's body, handles some private (mutable) state,
> >and communicates with child processes (forks created in the scope) using
> >queues.
>
> Inter-task communication channels are not part of Structure Concurrency at
> this point in time. It is however important to note that StructuredTaskScope
> is not the end state of Structured Concurrency.
>
> >If the main scope body includes any blocking logic, it might end up hanging
> >indefinitely, while all the other forks have been cancelled.
>
> That statement is true by definition—any code which is blocking indefinitely
> and is not interrupted, is by definition blocking indefinitely.
>
> >The main scope’s body awaits for data from either of them (on a queue), and
> >when an element is produced, sends it downstream. Now, if we’re not careful
> >with error handling, an exception in one of the substreams will cancel the
> >scope, but the main scope will indefinitely wait on data, not aware of the
> >error.
>
> This sounds, to me, like another issue with an absent feature—Inter-task
> communication channels.
I’ll try to clarify those together.
So first of all yes, you’re right, the problematic scenario isn’t dynamic task
creation in general, but a subset, where tasks are generated as results of
other tasks. And yes, if there are equivalent solutions using nested scopes or
a centralised coordinator, then going with the less-centralized option, and
leveraging nested scopes is definitely the way to go. I won’t contend anything
here :). But - the scenarios I’m referring to are when nested scopes either
aren’t possible to use (because of the nature of the problem), or would
complicate the overall code.
Continuing the example of the crawler. A reasonable requirement is to add a
per-domain limit on how many connections can be opened to a given domain at any
time, plus a global limit on connections. One way to implement this on top of
STS is to use nested scopes, as pointed out on reddit
(https://www.reddit.com/r/java/comments/1nq25yr/comment/ng3yk7b/). However, to
maintain the limits, you’d then need a ConcurrentHashMap mapping domains to
per-domain Semaphores, plus a global Semaphore to enforce the global limit. And
now this becomes tricky to implement correctly: which semaphore do you acquire
first? What if there’s a concurrent update to the map? You’ll need to use CAS
and the like to ensure correctness of the process. Doable, but not necessarily
the cleanest solution.
I suppose the point here is that when using nested scopes, for problems where
some communication between the processes needs to happen, you have to rely on
shared (mutable) memory, and use synchronization tools such as concurrent maps,
semaphores etc.
An alternative (one which I think you’ve exercised to the extreme ;) ) is to
use something more like an actor: a centralised process-coordinator which
manages the whole computation. We can still use STS, but instead of a tree of
scopes, we flatten this to a single scope, which creates crawler-forks in
accordance with the various rate-limiting restrictions. The logic in the
coordinator is definitely non-trivial, but it’s now single-threaded and we
don’t have to worry about atomic updates and the like. But, we still need to
communicate somehow with the child processes, and that’s where queues or
channels come in.
(And using STS might not be just a whim, we might **have to** to use it, as we
might need ScopedValues propagation, or we want the guarantee, that no thread
leaks are possible)
Of course, the crawler is just one example, with which I hope to illustrate a
point: that there’s a class of problems where some coordination between forks
is crucial, and while some of these can be solved using shared memory and
java.util.concurrent, it’s not always the best approach. I understand that
inter-task communication is out of scope of the JEP, but since we are designing
the future of Java’s IO-bound concurrency (or at least, that’s my perception of
the goal of the JEP), it would be good to at least know that STS is extendible
so that it might accommodate to these requirements in the future. Or integrate
with third-party solutions. I’m quite sure people will end up just using
LinkedBlockingQueues (or something like Jox’s Channels, which are completable &
error-aware) as soon as the JEP becomes final.
So now to the crux of the problem: using blocking operations in forks has
different error handling than using blocking operations in the body of the
scope. The first will be interrupted when there’s an exception in any of the
forks (under the default Joiner). The second will not - it will just hang. I
think it does put a dent in the otherwise "let it crash" philosophy that you
might employ when working with STS. That is, when an exception occurs, you can
be sure that everything will be cleaned up properly, and the exception is
propagated. With a caveat: only when the scope’s main body is blocked on
scope.join(), not some other operation.
> >Moreover, I allow creating forks-in-forks, so that the main logic can create
> >forks at will.
>
> As this is described, this means that you could have a race condition between
> forking in the same scope and the call to scope.join(), or did I
> misunderstand?
Well if scope.join() waits until all forks complete, it will only do so when
there are no forks left, and then nobody can create new forks? So I don’t think
there’s a race here? In other words, forks can only be created from live forks,
before they complete.
> >My work-around here is to create a `Joiner` which monitors an `isDone` flag,
> >and submit an empty task after the work is determined to be done:
>
> Since Joiners are one-shot, and are created before the scope is opened, it
> would seem more logical(?) to embed that flag into the joiner and have it be
> set by the scope body by referring to the joiner:
>
> var myJoiner = new MyJoiner(…);
> try(var scope = StructuredTaskScope.open(myJoiner)) {
> …
> myJoiner.signalDone()
> scope.join();
> }
Yes, sure, the design here can be cleaner. But I still think the idea of using
an AtomicBoolean to signal completion smells like a work-around, not a "proper"
way to use Joiners. But maybe I’m too picky?
> >Special-casing for this seems odd, as timeout is only one example from a
> >family of "resiliency" methods, others including retries, repeats etc. These
> >as well, could be implemented on top of virtual threads and SCS as methods,
> >without special support from the SCS API itself.
>
> While it is true that the I combinator, in SKI calculus, is not primitive
> because it can be implemented in terms of the S and K combinators, that
> doesn't necessarily mean that we should strive to distill the absolute
> primitives. There also exists a scientific paper on the Turing Completeness
> of the x86 MOV instruction—but it's Turing Completeness does not rule out the
> value of having specialized instructions.
>
> Putting a deadline on a concurrent operation is established "good practice"
> to ensure liveness, and the duration for that timeout is most useful if it is
> provided by the caller, so creating a standardized configuration option for
> this common operation was deemed to be worth it, since we do not need to
> either create a scope-within-a-scope by default or filter out the Subtask
> handling the timeout.
Sure, and we might not want to represent numbers in terms of tuples and units
:). As I said, it’s more a nitpick, but for me a timeout(Duration, Runnable)
method seems more general and light-weight. That said, I’m not too worried
about nested scopes, as they seem really light-weight, but maybe the current
solution has potential for more optimization.
> > The Subtask.get() method is confusing
>
> «public static sealed interface StructuredTaskScope.Subtask<T>
> extends Supplier<T>» -
> https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/concurrent/StructuredTaskScope.Subtask.html
Ah, you see, I didn’t even notice the Supplier there. I’d still argue, though,
that when people see a Subtask<T>, they would rather think of it in terms of
analogies with a Future<T> ("a computation for which a result will be available
in the future"), rather than a supplier. Especially that Subtask.get() can only
be called under specific circumstances - after scope.join() has completed. So
I’m not sure if the contract of Supplier even fits here? But maybe I’m too
biased by working with Future-like things for a longer time.
Thank you for the answers!
--
Adam Warski
https://warski.org