> From: "David Alayachew" <[email protected]> > To: "Remi Forax" <[email protected]> > Cc: "loom-dev" <[email protected]>, "Alan Bateman" > <[email protected]> > Sent: Wednesday, September 24, 2025 9:04:44 PM > Subject: Re: Remark on the StructuredTaskScope API of Java 25
> Hello Rémi, > > allSuccessfulOrThrow() should use List instead > > of Stream > This is a good suggestion. No need to pay for instances of Stream that you > won't > use, especially if what you wanted was a List in the first place. > > awaitAllSuccessfulOrThrow() should use <R> > > instead of Void > I don't see how this would be useful. When would you care about the type of > null? I'm not even sure what that means. My use case is.a list of STSs, one using a joiner that returns null and one using a joiner that returns a List of subtasks, when i call join() on all the STSs by iterating the list, i wanted the same return type. > > Can we get a joiner that returns subtasks > > ordered by onComplete(), and not in the order > > of onFork()? > Maybe Subtask can track onFork() and onComplete() timestamps? Then you could > order yourself. yes, i can, i was wondering if it would be a good idea to have this kind of behavior in a joiner provided by the JDK. regards, Rémi > On Wed, Sep 24, 2025 at 11:37 AM Remi Forax < [ mailto:[email protected] | > [email protected] ] > wrote: >> Hello, >> I know that the API will change a bit in 26, but this is a review of the API >> of >> 25. >> I find the new API (joiners + configuration) to be cleaner than the previous >> iteration (composition vs inheritance). >> In Joiner, >> - onFork() and onComplete() should take a SubTask<T> and not a SubTask<? >> extends >> T>. >> This is Okay because SubTask is sealed, so the type parameter is covariant. >> Another way to see it is that instead of every implementation of >> onFork/onComplete doing an unsafe cast, >> the caller of those methods should do the unsafe cast. >> - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so the >> result is a List and not a stream. >> In terms of implementation, in result(), the code should be >> return Collections.unmodifiableList(subtasks); >> - awaitAllSuccessfulOrThrow should not use Void, but be typed like this >> <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow() >> which means that the return type of join() can be any type. >> This allow users to type the result value null the way they want. >> - allUntil is missing a ? super, it should use a SubTask<T> (like in >> onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it >> should be: >> <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends Subtask<T>> >> isDone) >> { >> And now two remarks, >> - is there a way to remove the limitation that the main thread (the one that >> have created the STS) can not access to SubTask.get(), >> because there is at least a case where i know that the task is finished >> before >> join() is called (see below). >> - is there a way to get a joiner that returns the list of subtask in the >> order >> if their completeness, not in the order of onFork() ? >> regards, >> Rémi >> --- >> The following code works but I do not understand why i have to create a >> virtual >> thread to run the Runnable ? >> final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> { >> private int counter; >> private volatile boolean done; >> private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue = new >> LinkedBlockingDeque<>(); >> @Override >> public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) { >> StructuredTaskScope.Joiner.super.onFork(subtask); >> counter++; >> return false; >> } >> @Override >> @SuppressWarnings("unchecked") >> public boolean onComplete(StructuredTaskScope.Subtask<? extends T> subtask) { >> StructuredTaskScope.Joiner.super.onComplete(subtask); >> if (done) { >> return true; >> } >> try { >> queue.put((StructuredTaskScope.Subtask<T>) subtask); >> } catch (InterruptedException e) { >> throw new RuntimeException(e); >> } >> return false; >> } >> @Override >> public Void result() { >> return null; >> } >> public <R> R compute(Function<? super Stream<T>, ? extends R> function) >> throws >> InterruptedException { >> var spliterator = new Spliterator<T>() { >> private int remaining = counter; >> @Override >> public boolean tryAdvance(Consumer<? super T> action) { >> if (remaining == 0) { >> return false; >> } >> StructuredTaskScope.Subtask<T> subtask; >> try { >> subtask = queue.take(); >> } catch (InterruptedException e) { >> throw new RuntimeException(e); >> } >> if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) { >> action.accept(subtask.get()); >> } >> remaining--; >> return true; >> } >> @Override >> public Spliterator<T> trySplit() { >> return null; >> } >> @Override >> public long estimateSize() { >> return remaining; >> } >> @Override >> public int characteristics() { >> return 0; >> } >> }; >> var stream = StreamSupport.stream(spliterator, false); >> var runnable = new Runnable() { >> private R result; >> @Override >> public void run() { >> result = function.apply(stream); >> } >> }; >> Thread.ofVirtual().start(runnable).join(); >> done = true; >> return runnable.result; >> } >> } >> Callable<WeatherResponse> task(LatLong latLong) { >> return () -> OpenMeteo.getWeatherResponse(latLong); >> } >> void main() throws InterruptedException { >> var paris = new LatLong(48.864716, 2.349014); // use 30_000 >> var nantes = new LatLong(47.2181, -1.5528); >> var marseille = new LatLong(43.2964, 5.37); >> var latlongs = List.of(paris, nantes, marseille); >> var joiner = new StreamJoiner<WeatherResponse>(); >> try(var scope = StructuredTaskScope.open(joiner)) { >> var callables = latlongs.stream() >> .map(this::task) >> .toList(); >> callables.forEach(scope::fork); >> var response = joiner.compute(Stream::toList); >> //var response = joiner.compute(s -> s.findFirst().orElseThrow()); >> scope.join(); >> IO.println(response); >> } >> }
