Hello Rémi, > allSuccessfulOrThrow() should use List instead > of Stream
This is a good suggestion. No need to pay for instances of Stream that you won't use, especially if what you wanted was a List in the first place. > awaitAllSuccessfulOrThrow() should use <R> > instead of Void I don't see how this would be useful. When would you care about the type of null? I'm not even sure what that means. > Can we get a joiner that returns subtasks > ordered by onComplete(), and not in the order > of onFork()? Maybe Subtask can track onFork() and onComplete() timestamps? Then you could order yourself. On Wed, Sep 24, 2025 at 11:37 AM Remi Forax <[email protected]> wrote: > Hello, > I know that the API will change a bit in 26, but this is a review of the > API of 25. > > I find the new API (joiners + configuration) to be cleaner than the > previous iteration (composition vs inheritance). > > In Joiner, > - onFork() and onComplete() should take a SubTask<T> and not a SubTask<? > extends T>. > This is Okay because SubTask is sealed, so the type parameter is > covariant. > > Another way to see it is that instead of every implementation of > onFork/onComplete doing an unsafe cast, > the caller of those methods should do the unsafe cast. > > - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so > the result is a List and not a stream. > In terms of implementation, in result(), the code should be > return Collections.unmodifiableList(subtasks); > > - awaitAllSuccessfulOrThrow should not use Void, but be typed like this > <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow() > > which means that the return type of join() can be any type. > This allow users to type the result value null the way they want. > > - allUntil is missing a ? super, it should use a SubTask<T> (like in > onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it > should be: > > <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends > Subtask<T>> isDone) { > > And now two remarks, > - is there a way to remove the limitation that the main thread (the one > that have created the STS) can not access to SubTask.get(), > because there is at least a case where i know that the task is finished > before join() is called (see below). > - is there a way to get a joiner that returns the list of subtask in the > order if their completeness, not in the order of onFork() ? > > > regards, > Rémi > > --- > > The following code works but I do not understand why i have to create a > virtual thread to run the Runnable ? > > final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> > { > private int counter; > private volatile boolean done; > private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue > = new LinkedBlockingDeque<>(); > > @Override > public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) { > StructuredTaskScope.Joiner.super.onFork(subtask); > counter++; > return false; > } > > @Override > @SuppressWarnings("unchecked") > public boolean onComplete(StructuredTaskScope.Subtask<? extends T> > subtask) { > StructuredTaskScope.Joiner.super.onComplete(subtask); > if (done) { > return true; > } > try { > queue.put((StructuredTaskScope.Subtask<T>) subtask); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > return false; > } > > @Override > public Void result() { > return null; > } > > public <R> R compute(Function<? super Stream<T>, ? extends R> function) > throws InterruptedException { > var spliterator = new Spliterator<T>() { > private int remaining = counter; > > @Override > public boolean tryAdvance(Consumer<? super T> action) { > if (remaining == 0) { > return false; > } > StructuredTaskScope.Subtask<T> subtask; > try { > subtask = queue.take(); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) { > action.accept(subtask.get()); > } > remaining--; > return true; > } > > @Override > public Spliterator<T> trySplit() { > return null; > } > > @Override > public long estimateSize() { > return remaining; > } > > @Override > public int characteristics() { > return 0; > } > }; > var stream = StreamSupport.stream(spliterator, false); > var runnable = new Runnable() { > private R result; > > @Override > public void run() { > result = function.apply(stream); > } > }; > Thread.ofVirtual().start(runnable).join(); > done = true; > return runnable.result; > } > } > > Callable<WeatherResponse> task(LatLong latLong) { > return () -> OpenMeteo.getWeatherResponse(latLong); > } > > void main() throws InterruptedException { > var paris = new LatLong(48.864716, 2.349014); // use 30_000 > var nantes = new LatLong(47.2181, -1.5528); > var marseille = new LatLong(43.2964, 5.37); > > var latlongs = List.of(paris, nantes, marseille); > var joiner = new StreamJoiner<WeatherResponse>(); > try(var scope = StructuredTaskScope.open(joiner)) { > var callables = latlongs.stream() > .map(this::task) > .toList(); > callables.forEach(scope::fork); > > var response = joiner.compute(Stream::toList); > //var response = joiner.compute(s -> s.findFirst().orElseThrow()); > scope.join(); > > IO.println(response); > } > } >
