Copilot commented on code in PR #2437: URL: https://github.com/apache/groovy/pull/2437#discussion_r3036748295
########## src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java: ########## @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AwaitResult; +import groovy.concurrent.Awaitable; +import groovy.lang.Closure; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Internal runtime support for the {@code async}/{@code await}/{@code defer} language features. + * <p> + * This class contains the actual implementation invoked by compiler-generated code. + * User code should prefer the static methods on {@link groovy.concurrent.Awaitable} + * for combinators and configuration. + * <p> + * <b>Thread pool configuration:</b> + * <ul> + * <li>On JDK 21+ the default executor is a virtual-thread-per-task executor. + * Virtual threads make blocking within {@code await()} essentially free.</li> + * <li>On JDK 17-20 the fallback is a cached daemon thread pool + * whose maximum size is controlled by the system property + * {@code groovy.async.parallelism} (default: {@code 256}).</li> + * <li>The executor can be overridden at any time via {@link #setExecutor}.</li> + * </ul> + * <p> + * <b>Exception handling</b> follows a transparency principle: the + * <em>original</em> exception is rethrown without being wrapped. + * + * @see groovy.concurrent.Awaitable + * @since 6.0.0 + */ +public class AsyncSupport { + + private static final boolean VIRTUAL_THREADS_AVAILABLE; + private static final Executor VIRTUAL_THREAD_EXECUTOR; + private static final int FALLBACK_MAX_THREADS; + private static final Executor FALLBACK_EXECUTOR; + + static { + Executor vtExecutor = null; + boolean vtAvailable = false; + try { + MethodHandle mh = MethodHandles.lookup().findStatic( + Executors.class, "newVirtualThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class)); + vtExecutor = (Executor) mh.invoke(); + vtAvailable = true; + } catch (Throwable ignored) { + // JDK < 21 — virtual threads not available + } + VIRTUAL_THREAD_EXECUTOR = vtExecutor; + VIRTUAL_THREADS_AVAILABLE = vtAvailable; + + FALLBACK_MAX_THREADS = org.apache.groovy.util.SystemUtil.getIntegerSafe( + "groovy.async.parallelism", 256); + if (!VIRTUAL_THREADS_AVAILABLE) { + FALLBACK_EXECUTOR = new ThreadPoolExecutor( + 0, FALLBACK_MAX_THREADS, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("groovy-async-" + t.getId()); + return t; + }, + new ThreadPoolExecutor.CallerRunsPolicy()); + } else { + FALLBACK_EXECUTOR = null; + } + } + + private static volatile Executor defaultExecutor = + VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : FALLBACK_EXECUTOR; + + private static final ScheduledExecutorService SCHEDULER = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "groovy-async-scheduler"); + t.setDaemon(true); + return t; + }); + + private AsyncSupport() { } + + // ---- executor configuration ----------------------------------------- + + /** Returns {@code true} if running on JDK 21+ with virtual thread support. */ + public static boolean isVirtualThreadsAvailable() { + return VIRTUAL_THREADS_AVAILABLE; + } + + /** Returns the current executor used for async tasks. */ + public static Executor getExecutor() { + return defaultExecutor; + } + + /** Sets the executor used for async tasks. */ + public static void setExecutor(Executor executor) { + defaultExecutor = Objects.requireNonNull(executor, "executor must not be null"); + } + + /** Resets the executor to the default (virtual threads on JDK 21+, cached pool otherwise). */ + public static void resetExecutor() { + defaultExecutor = VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : FALLBACK_EXECUTOR; + } + + // ---- await overloads ------------------------------------------------ + + /** + * Awaits the result of an {@link Awaitable}. + * Blocks the calling thread until the computation completes. + * The original exception is rethrown transparently. + */ + public static <T> T await(Awaitable<T> awaitable) { + try { + return awaitable.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting"); + ce.initCause(e); + throw ce; + } + } + + /** Awaits a {@link CompletableFuture} using non-interruptible {@code join()}. */ + public static <T> T await(CompletableFuture<T> future) { + try { + return future.join(); + } catch (CompletionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } + } + + /** Awaits a {@link CompletionStage} by converting to CompletableFuture. */ + public static <T> T await(CompletionStage<T> stage) { + return await(stage.toCompletableFuture()); + } + + /** Awaits a {@link Future}. Delegates to the CF overload if applicable. */ + public static <T> T await(Future<T> future) { + if (future instanceof CompletableFuture<T> cf) { + return await(cf); + } + try { + return future.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting future"); + ce.initCause(e); + throw ce; + } + } + + /** + * Awaits an arbitrary object by adapting it via {@link Awaitable#from(Object)}. + * This is the fallback overload called by compiler-generated await expressions. + */ + @SuppressWarnings("unchecked") + public static <T> T await(Object source) { + if (source == null) return null; + if (source instanceof Awaitable) return await((Awaitable<T>) source); + if (source instanceof CompletableFuture) return await((CompletableFuture<T>) source); + if (source instanceof CompletionStage) return await((CompletionStage<T>) source); + if (source instanceof Future) return await((Future<T>) source); + if (source instanceof Closure) { + throw new IllegalArgumentException( + "Cannot await a Closure directly. Call the closure first: await myClosure()"); + } + return await(Awaitable.from(source)); + } + + // ---- async execution ------------------------------------------------ + + /** + * Executes the given closure asynchronously on the specified executor, + * returning an {@link Awaitable}. + */ + public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor executor) { + Objects.requireNonNull(closure, "closure must not be null"); + Executor targetExecutor = executor != null ? executor : defaultExecutor; + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, targetExecutor)); + } + + /** + * Executes the given closure asynchronously using the default executor. + */ + public static <T> Awaitable<T> async(Closure<T> closure) { + return executeAsync(closure, defaultExecutor); + } + + /** + * Lightweight task spawn. Executes the closure asynchronously using the default executor. + */ + public static <T> Awaitable<T> go(Closure<T> closure) { + return executeAsync(closure, defaultExecutor); + } + + /** + * Wraps a closure so that each invocation executes the body asynchronously + * and returns an {@link Awaitable}. This is the runtime entry point for + * the {@code async { ... }} expression syntax. + * <pre> + * def task = async { expensiveWork() } + * def result = await task() // explicit call required + * </pre> + */ + @SuppressWarnings("unchecked") + public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) { + Objects.requireNonNull(closure, "closure must not be null"); + return new Closure<Awaitable<T>>(closure.getOwner(), closure.getThisObject()) { + @SuppressWarnings("unused") + public Awaitable<T> doCall(Object... args) { + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(args); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, defaultExecutor)); + } + }; + } + + // ---- generators (yield return) ---------------------------------------- + + /** + * Called by compiler-generated code for {@code yield return expr} inside + * an async generator closure. Delegates to the bridge's yield method. + * + * @param bridge the GeneratorBridge instance (injected as synthetic parameter) + * @param value the value to yield + */ + public static void yieldReturn(Object bridge, Object value) { Review Comment: `yieldReturn` blindly casts `bridge` to `GeneratorBridge` and will throw a `ClassCastException`/NPE if `yield return` is (accidentally) used outside an async generator context (or if the synthetic param isn’t present). Since the grammar allows `YIELD RETURN` as a statement anywhere, please add a defensive check here (e.g., validate type/non-null and throw an `IllegalStateException` with a clear message) to make misuse fail predictably. ```suggestion public static void yieldReturn(Object bridge, Object value) { if (!(bridge instanceof GeneratorBridge<?>)) { throw new IllegalStateException("yield return may only be used inside an async generator context"); } ``` ########## src/main/java/org/apache/groovy/runtime/async/GeneratorBridge.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.SynchronousQueue; + +/** + * A producer/consumer bridge for async generators ({@code yield return}). + * <p> + * The generator closure runs on a separate thread and calls {@link #yield(Object)} + * to produce values. The consumer iterates using {@link #hasNext()}/{@link #next()}. + * A {@link SynchronousQueue} provides the handoff — each {@code yield} blocks + * until the consumer takes the value, providing natural back-pressure. + * <p> + * With virtual threads (JDK 21+), both the producer and consumer block cheaply. + * On JDK 17-20, the producer runs on a platform thread from the cached pool. + * + * @param <T> the element type + * @since 6.0.0 + */ +public final class GeneratorBridge<T> implements Iterator<T>, Closeable { + + private static final Object DONE = new Object(); + private static final Object ERROR = new Object(); + + private final SynchronousQueue<Object[]> handoff = new SynchronousQueue<>(); + private Object[] pending; + private boolean done; + private volatile boolean closed; + private volatile Thread producerThread; + + /** + * Called by the generator (producer thread) to yield a value. + * Blocks until the consumer calls {@link #next()}. + * + * @param value the value to yield (may be null) + * @throws GeneratorClosedException if the consumer has closed the bridge + */ + public void yield(Object value) { + if (closed) throw new GeneratorClosedException(); + producerThread = Thread.currentThread(); + try { + handoff.put(new Object[]{value}); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new GeneratorClosedException(); + } finally { + producerThread = null; + } + } + + /** + * Called internally when the generator completes normally. + */ + void complete() { + producerThread = Thread.currentThread(); + try { + handoff.put(new Object[]{DONE}); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + producerThread = null; + } + } + + /** + * Called internally when the generator throws an exception. + */ + void completeExceptionally(Throwable error) { + producerThread = Thread.currentThread(); + try { + handoff.put(new Object[]{ERROR, error}); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + producerThread = null; + } + } + + @Override + public boolean hasNext() { + if (done) return false; + if (pending != null) return true; + try { + pending = handoff.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + done = true; Review Comment: If the consumer thread is interrupted while waiting in `hasNext()` (`handoff.take()`), the bridge marks the iterator done and returns false but never closes/interrupts the producer. This can leave the producer blocked in `handoff.put()` and (on JDK 17–20) permanently consume a platform thread from the bounded pool. Consider calling `close()` (or otherwise signalling/interrupting the producer) in the InterruptedException path so interruption can’t leak a blocked producer. ```suggestion close(); Thread.currentThread().interrupt(); ``` ########## src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent; + +import org.apache.groovy.runtime.async.AsyncSupport; +import org.apache.groovy.runtime.async.GroovyPromise; + +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.ServiceLoader; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; + +/** + * Central registry for {@link AwaitableAdapter} instances. + * <p> + * On class-load, adapters are discovered via {@link ServiceLoader} from + * {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. A built-in + * adapter is always present as the lowest-priority fallback, handling: + * <ul> + * <li>{@link CompletableFuture} and {@link CompletionStage}</li> + * <li>{@link Future} (adapted via a blocking wrapper)</li> + * </ul> + * + * @since 6.0.0 + */ +public final class AwaitableAdapterRegistry { + + private static final List<AwaitableAdapter> adapters = new CopyOnWriteArrayList<>(); + + private static volatile ClassValue<AwaitableAdapter> awaitableCache = buildAwaitableCache(); + + static { + // Load SPI adapters + try { + ServiceLoader<AwaitableAdapter> loader = ServiceLoader.load( + AwaitableAdapter.class, AwaitableAdapterRegistry.class.getClassLoader()); + for (Iterator<AwaitableAdapter> it = loader.iterator(); it.hasNext(); ) { + try { + adapters.add(it.next()); + } catch (Throwable ignored) { + // Skip adapters that fail to load (missing dependencies) + } + } + } catch (Throwable ignored) { + // ServiceLoader failure — continue with built-in adapter only + } + // Built-in fallback adapter (lowest priority) + adapters.add(new BuiltInAdapter()); + } + + private AwaitableAdapterRegistry() { } + + /** + * Registers an adapter at the highest priority (before SPI-loaded adapters). + */ + public static void register(AwaitableAdapter adapter) { + Objects.requireNonNull(adapter); + adapters.add(0, adapter); + awaitableCache = buildAwaitableCache(); + } + + /** + * Removes a previously registered adapter. + */ + public static void unregister(AwaitableAdapter adapter) { + adapters.remove(adapter); + awaitableCache = buildAwaitableCache(); + } + + /** + * Converts the given source to an {@link Awaitable}. + */ + @SuppressWarnings("unchecked") + static <T> Awaitable<T> toAwaitable(Object source) { + if (source == null) { + return (Awaitable<T>) Awaitable.of(null); + } + if (source instanceof Awaitable) return (Awaitable<T>) source; + Class<?> type = source.getClass(); + AwaitableAdapter adapter = awaitableCache.get(type); + if (adapter != null) { + return adapter.toAwaitable(source); + } + throw new IllegalArgumentException( + "No Awaitable adapter found for type: " + type.getName() + + ". Register an AwaitableAdapter via ServiceLoader or AwaitableAdapterRegistry.register()."); + } Review Comment: `Awaitable.from(Object)` Javadoc says `source` must not be null and should throw `IllegalArgumentException` when null, but `AwaitableAdapterRegistry.toAwaitable(null)` currently returns a completed awaitable with null. Please align the implementation and documentation (either reject null consistently or update the Javadoc and combinator contracts that mention null is invalid). ########## src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java: ########## @@ -466,12 +467,20 @@ public Statement visitLoopStmtAlt(final LoopStmtAltContext ctx) { @Override public ForStatement visitForStmtAlt(final ForStmtAltContext ctx) { + boolean isForAwait = asBoolean(ctx.AWAIT()); Function<Statement, ForStatement> maker = this.visitForControl(ctx.forControl()); Statement loopBody = this.unpackStatement((Statement) this.visit(ctx.statement())); ForStatement forStatement = configureAST(maker.apply(loopBody), ctx); visitAnnotationsOpt(ctx.annotationsOpt()).forEach(forStatement::addStatementAnnotation); + + if (isForAwait) { + // Transform collection expression: wrap in AsyncSupport.toBlockingIterable() + Expression original = forStatement.getCollectionExpression(); + forStatement.setCollectionExpression(AsyncTransformHelper.buildToBlockingIterableCall(original)); + } Review Comment: `for await` currently only wraps the collection expression with `AsyncSupport.toBlockingIterable(...)`. There’s no corresponding close/cancellation hook when the loop exits early (break/exception), even though `AsyncSupport.closeIterable(...)` and `AsyncTransformHelper.buildCloseIterableCall(...)` exist and docs mention a compiler-generated finally block. Without closing, generator producers may remain blocked on `yield` and reactive iterables may keep subscriptions/resources longer than necessary. Consider wrapping the generated loop in try/finally to invoke `closeIterable` (or closing the iterator if it is AutoCloseable) when leaving the loop. ########## src/main/java/org/apache/groovy/runtime/async/GeneratorBridge.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.SynchronousQueue; + +/** + * A producer/consumer bridge for async generators ({@code yield return}). + * <p> + * The generator closure runs on a separate thread and calls {@link #yield(Object)} + * to produce values. The consumer iterates using {@link #hasNext()}/{@link #next()}. + * A {@link SynchronousQueue} provides the handoff — each {@code yield} blocks + * until the consumer takes the value, providing natural back-pressure. + * <p> + * With virtual threads (JDK 21+), both the producer and consumer block cheaply. + * On JDK 17-20, the producer runs on a platform thread from the cached pool. + * + * @param <T> the element type + * @since 6.0.0 + */ +public final class GeneratorBridge<T> implements Iterator<T>, Closeable { + + private static final Object DONE = new Object(); + private static final Object ERROR = new Object(); + + private final SynchronousQueue<Object[]> handoff = new SynchronousQueue<>(); + private Object[] pending; + private boolean done; + private volatile boolean closed; + private volatile Thread producerThread; + + /** + * Called by the generator (producer thread) to yield a value. + * Blocks until the consumer calls {@link #next()}. + * + * @param value the value to yield (may be null) + * @throws GeneratorClosedException if the consumer has closed the bridge + */ + public void yield(Object value) { + if (closed) throw new GeneratorClosedException(); + producerThread = Thread.currentThread(); + try { + handoff.put(new Object[]{value}); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new GeneratorClosedException(); + } finally { Review Comment: In GeneratorBridge.yield(), the `closed` check happens before `producerThread` is set. If `close()` is called in the tiny window between the check and the assignment, `close()` won’t be able to interrupt the producer and the subsequent `handoff.put(...)` can block indefinitely. Reorder the code so `producerThread` is set before checking/acting on `closed`, and re-check `closed` immediately before blocking in `put()` to make early-close reliably unblock producers. ########## src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java: ########## @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AwaitResult; +import groovy.concurrent.Awaitable; +import groovy.lang.Closure; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Internal runtime support for the {@code async}/{@code await}/{@code defer} language features. + * <p> + * This class contains the actual implementation invoked by compiler-generated code. + * User code should prefer the static methods on {@link groovy.concurrent.Awaitable} + * for combinators and configuration. + * <p> + * <b>Thread pool configuration:</b> + * <ul> + * <li>On JDK 21+ the default executor is a virtual-thread-per-task executor. + * Virtual threads make blocking within {@code await()} essentially free.</li> + * <li>On JDK 17-20 the fallback is a cached daemon thread pool + * whose maximum size is controlled by the system property + * {@code groovy.async.parallelism} (default: {@code 256}).</li> + * <li>The executor can be overridden at any time via {@link #setExecutor}.</li> + * </ul> + * <p> + * <b>Exception handling</b> follows a transparency principle: the + * <em>original</em> exception is rethrown without being wrapped. + * + * @see groovy.concurrent.Awaitable + * @since 6.0.0 + */ +public class AsyncSupport { + + private static final boolean VIRTUAL_THREADS_AVAILABLE; + private static final Executor VIRTUAL_THREAD_EXECUTOR; + private static final int FALLBACK_MAX_THREADS; + private static final Executor FALLBACK_EXECUTOR; + + static { + Executor vtExecutor = null; + boolean vtAvailable = false; + try { + MethodHandle mh = MethodHandles.lookup().findStatic( + Executors.class, "newVirtualThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class)); + vtExecutor = (Executor) mh.invoke(); + vtAvailable = true; + } catch (Throwable ignored) { + // JDK < 21 — virtual threads not available + } + VIRTUAL_THREAD_EXECUTOR = vtExecutor; + VIRTUAL_THREADS_AVAILABLE = vtAvailable; + + FALLBACK_MAX_THREADS = org.apache.groovy.util.SystemUtil.getIntegerSafe( + "groovy.async.parallelism", 256); + if (!VIRTUAL_THREADS_AVAILABLE) { + FALLBACK_EXECUTOR = new ThreadPoolExecutor( + 0, FALLBACK_MAX_THREADS, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("groovy-async-" + t.getId()); + return t; + }, + new ThreadPoolExecutor.CallerRunsPolicy()); + } else { + FALLBACK_EXECUTOR = null; + } + } + + private static volatile Executor defaultExecutor = + VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : FALLBACK_EXECUTOR; + + private static final ScheduledExecutorService SCHEDULER = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "groovy-async-scheduler"); + t.setDaemon(true); + return t; + }); + + private AsyncSupport() { } + + // ---- executor configuration ----------------------------------------- + + /** Returns {@code true} if running on JDK 21+ with virtual thread support. */ + public static boolean isVirtualThreadsAvailable() { + return VIRTUAL_THREADS_AVAILABLE; + } + + /** Returns the current executor used for async tasks. */ + public static Executor getExecutor() { + return defaultExecutor; + } + + /** Sets the executor used for async tasks. */ + public static void setExecutor(Executor executor) { + defaultExecutor = Objects.requireNonNull(executor, "executor must not be null"); + } + + /** Resets the executor to the default (virtual threads on JDK 21+, cached pool otherwise). */ + public static void resetExecutor() { + defaultExecutor = VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : FALLBACK_EXECUTOR; + } + + // ---- await overloads ------------------------------------------------ + + /** + * Awaits the result of an {@link Awaitable}. + * Blocks the calling thread until the computation completes. + * The original exception is rethrown transparently. + */ + public static <T> T await(Awaitable<T> awaitable) { + try { + return awaitable.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting"); + ce.initCause(e); + throw ce; + } + } + + /** Awaits a {@link CompletableFuture} using non-interruptible {@code join()}. */ + public static <T> T await(CompletableFuture<T> future) { + try { + return future.join(); + } catch (CompletionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } + } + + /** Awaits a {@link CompletionStage} by converting to CompletableFuture. */ + public static <T> T await(CompletionStage<T> stage) { + return await(stage.toCompletableFuture()); + } + + /** Awaits a {@link Future}. Delegates to the CF overload if applicable. */ + public static <T> T await(Future<T> future) { + if (future instanceof CompletableFuture<T> cf) { + return await(cf); + } + try { + return future.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting future"); + ce.initCause(e); + throw ce; + } + } + + /** + * Awaits an arbitrary object by adapting it via {@link Awaitable#from(Object)}. + * This is the fallback overload called by compiler-generated await expressions. + */ + @SuppressWarnings("unchecked") + public static <T> T await(Object source) { + if (source == null) return null; + if (source instanceof Awaitable) return await((Awaitable<T>) source); + if (source instanceof CompletableFuture) return await((CompletableFuture<T>) source); + if (source instanceof CompletionStage) return await((CompletionStage<T>) source); + if (source instanceof Future) return await((Future<T>) source); + if (source instanceof Closure) { + throw new IllegalArgumentException( + "Cannot await a Closure directly. Call the closure first: await myClosure()"); + } + return await(Awaitable.from(source)); + } + + // ---- async execution ------------------------------------------------ + + /** + * Executes the given closure asynchronously on the specified executor, + * returning an {@link Awaitable}. + */ + public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor executor) { + Objects.requireNonNull(closure, "closure must not be null"); + Executor targetExecutor = executor != null ? executor : defaultExecutor; + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, targetExecutor)); + } + + /** + * Executes the given closure asynchronously using the default executor. + */ + public static <T> Awaitable<T> async(Closure<T> closure) { + return executeAsync(closure, defaultExecutor); + } + + /** + * Lightweight task spawn. Executes the closure asynchronously using the default executor. + */ + public static <T> Awaitable<T> go(Closure<T> closure) { + return executeAsync(closure, defaultExecutor); + } + + /** + * Wraps a closure so that each invocation executes the body asynchronously + * and returns an {@link Awaitable}. This is the runtime entry point for + * the {@code async { ... }} expression syntax. + * <pre> + * def task = async { expensiveWork() } + * def result = await task() // explicit call required + * </pre> + */ + @SuppressWarnings("unchecked") + public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) { + Objects.requireNonNull(closure, "closure must not be null"); + return new Closure<Awaitable<T>>(closure.getOwner(), closure.getThisObject()) { + @SuppressWarnings("unused") + public Awaitable<T> doCall(Object... args) { + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(args); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, defaultExecutor)); + } + }; + } + + // ---- generators (yield return) ---------------------------------------- + + /** + * Called by compiler-generated code for {@code yield return expr} inside + * an async generator closure. Delegates to the bridge's yield method. + * + * @param bridge the GeneratorBridge instance (injected as synthetic parameter) + * @param value the value to yield + */ + public static void yieldReturn(Object bridge, Object value) { + ((GeneratorBridge<?>) bridge).yield(value); + } + + /** + * Wraps a generator closure so that each invocation returns an {@link Iterable} + * backed by a {@link GeneratorBridge}. The generator runs on a background thread + * and yields values via the bridge. + * <p> + * This is the runtime entry point for {@code async { ... yield return ... }} + * expressions that contain {@code yield return}. + * + * @param closure the generator closure; receives a GeneratorBridge as first parameter + * @param <T> the element type + * @return a closure that produces an Iterable when called + */ + @SuppressWarnings("unchecked") + public static <T> Closure<Iterable<T>> wrapAsyncGenerator(Closure<?> closure) { + Objects.requireNonNull(closure, "closure must not be null"); + return new Closure<Iterable<T>>(closure.getOwner(), closure.getThisObject()) { + @SuppressWarnings("unused") + public Iterable<T> doCall(Object... args) { + GeneratorBridge<T> bridge = new GeneratorBridge<>(); + Object[] allArgs = new Object[args.length + 1]; + allArgs[0] = bridge; + System.arraycopy(args, 0, allArgs, 1, args.length); + defaultExecutor.execute(() -> { + try { + closure.call(allArgs); + bridge.complete(); + } catch (GeneratorBridge.GeneratorClosedException ignored) { + // Consumer closed early — normal for break in for-await + } catch (Throwable t) { + bridge.completeExceptionally(t); + } + }); + return () -> bridge; + } + }; + } + + /** + * Starts a generator immediately, returning an {@link Iterable} backed by a + * {@link GeneratorBridge}. This is the runtime entry point for + * {@code async { ... yield return ... }} expressions. + * + * @param closure the generator closure; receives a GeneratorBridge as first parameter + * @param <T> the element type + * @return an Iterable that yields values from the generator + */ + @SuppressWarnings("unchecked") + public static <T> Iterable<T> asyncGenerator(Closure<?> closure) { + Objects.requireNonNull(closure, "closure must not be null"); + GeneratorBridge<T> bridge = new GeneratorBridge<>(); + Object[] args = new Object[]{bridge}; + defaultExecutor.execute(() -> { + try { + closure.call(args); + bridge.complete(); + } catch (GeneratorBridge.GeneratorClosedException ignored) { + // Consumer closed early — normal for break in for-await + } catch (Throwable t) { + bridge.completeExceptionally(t); + } + }); + return () -> bridge; + } + + // ---- for-await (blocking iterable conversion) ----------------------- + + /** + * Converts an arbitrary source to a blocking {@link Iterable} for use + * in {@code for await} loops. Handles arrays, collections, iterables, + * iterators, and adapter-supported types. + * + * @param source the source to convert + * @param <T> the element type + * @return a blocking iterable + */ + @SuppressWarnings("unchecked") + public static <T> Iterable<T> toBlockingIterable(Object source) { + if (source == null) return Collections.emptyList(); + if (source instanceof Iterable) return (Iterable<T>) source; + if (source instanceof Iterator) { + Iterator<T> iter = (Iterator<T>) source; + return () -> iter; + } + if (source instanceof Object[]) return (Iterable<T>) Arrays.asList((Object[]) source); + // Try adapter registry + return groovy.concurrent.AwaitableAdapterRegistry.toBlockingIterable(source); + } + + /** + * Closes a source if it implements {@link java.io.Closeable} or + * {@link AutoCloseable}. Called by compiler-generated finally block + * in {@code for await} loops. + */ + public static void closeIterable(Object source) { + if (source instanceof java.io.Closeable c) { + try { c.close(); } catch (Exception ignored) { } + } else if (source instanceof AutoCloseable c) { + try { c.close(); } catch (Exception ignored) { } + } + } + + // ---- combinators ---------------------------------------------------- + + /** + * Waits for all given sources to complete, returning their results in order. + * Multi-arg {@code await(a, b, c)} desugars to this. + */ + @SuppressWarnings("unchecked") + public static <T> List<T> all(Object... sources) { + CompletableFuture<?>[] futures = Arrays.stream(sources) + .map(s -> Awaitable.from(s).toCompletableFuture()) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(futures).join(); + List<T> results = new ArrayList<>(futures.length); + for (CompletableFuture<?> f : futures) { + results.add((T) f.join()); + } + return results; + } + + /** + * Returns the result of the first source to complete (success or failure). + */ + @SuppressWarnings("unchecked") + public static <T> T any(Object... sources) { + CompletableFuture<?>[] futures = Arrays.stream(sources) + .map(s -> Awaitable.from(s).toCompletableFuture()) + .toArray(CompletableFuture[]::new); + return (T) CompletableFuture.anyOf(futures).join(); + } + + /** + * Returns the result of the first source to complete <em>successfully</em>. + * Only fails when all sources fail. + */ + @SuppressWarnings("unchecked") + public static <T> T first(Object... sources) { + CompletableFuture<T>[] futures = Arrays.stream(sources) + .map(s -> (CompletableFuture<T>) Awaitable.from(s).toCompletableFuture()) + .toArray(CompletableFuture[]::new); + + CompletableFuture<T> result = new CompletableFuture<>(); + var remaining = new java.util.concurrent.atomic.AtomicInteger(futures.length); + List<Throwable> errors = java.util.Collections.synchronizedList(new ArrayList<>()); + for (CompletableFuture<T> f : futures) { + f.whenComplete((value, error) -> { + if (error == null) { + result.complete(value); + } else { + errors.add(error); + if (remaining.decrementAndGet() == 0) { + CompletionException aggregate = new CompletionException( + "All " + futures.length + " tasks failed", errors.get(0)); + for (int i = 1; i < errors.size(); i++) { + aggregate.addSuppressed(errors.get(i)); + } + result.completeExceptionally(aggregate); + } + } + }); + } + try { + return result.join(); + } catch (CompletionException e) { + throw rethrowUnwrapped(e); + } + } + + /** + * Waits for all sources to settle (succeed or fail), returning a list of + * {@link AwaitResult} without throwing. + */ + @SuppressWarnings("unchecked") + public static List<AwaitResult<Object>> allSettled(Object... sources) { + CompletableFuture<?>[] futures = Arrays.stream(sources) + .map(s -> Awaitable.from(s).toCompletableFuture()) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf( + Arrays.stream(futures) + .map(f -> f.handle((v, e) -> null)) + .toArray(CompletableFuture[]::new) + ).join(); + List<AwaitResult<Object>> results = new ArrayList<>(futures.length); + for (CompletableFuture<?> f : futures) { + try { + results.add(AwaitResult.success(f.join())); + } catch (CompletionException e) { + results.add(AwaitResult.failure(unwrap(e))); + } catch (CancellationException e) { + results.add(AwaitResult.failure(e)); + } + } + return results; + } + + // ---- async combinator variants (return Awaitable, non-blocking) ------ + + /** Non-blocking variant of {@link #all} — returns an Awaitable. */ + @SuppressWarnings("unchecked") + public static Awaitable<List<Object>> allAsync(Object... sources) { + CompletableFuture<?>[] futures = Arrays.stream(sources) + .map(s -> Awaitable.from(s).toCompletableFuture()) + .toArray(CompletableFuture[]::new); + + // Track the temporally-first failure explicitly, since + // CompletableFuture.allOf() doesn't guarantee which exception + // propagates when multiple futures fail. + var firstError = new java.util.concurrent.atomic.AtomicReference<Throwable>(); + for (CompletableFuture<?> f : futures) { + f.whenComplete((v, e) -> { + if (e != null) firstError.compareAndSet(null, e); + }); + } + + CompletableFuture<List<Object>> combined = CompletableFuture.allOf( + Arrays.stream(futures).map(f -> f.handle((v, e) -> null)).toArray(CompletableFuture[]::new) + ).thenApply(v -> { + Throwable err = firstError.get(); + if (err != null) throw err instanceof CompletionException ce ? ce : new CompletionException(err); + List<Object> results = new ArrayList<>(futures.length); + for (CompletableFuture<?> f : futures) results.add(f.join()); + return results; + }); Review Comment: `allAsync` currently waits for *all* sources to complete (via `allOf(...handle...)`) and only then throws the first recorded error. This contradicts the `Awaitable.all(...)` Javadoc which states it fails as soon as the first source fails, and can cause `await Awaitable.all(...)` to block indefinitely if one task fails quickly while another never completes. Adjust the combinator to complete exceptionally immediately on the first failure (while leaving other tasks running unless explicitly cancelled). ```suggestion CompletableFuture<List<Object>> combined = new CompletableFuture<>(); if (futures.length == 0) { combined.complete(Collections.emptyList()); return GroovyPromise.of(combined); } Object[] results = new Object[futures.length]; var remaining = new java.util.concurrent.atomic.AtomicInteger(futures.length); for (int i = 0; i < futures.length; i++) { final int index = i; futures[i].whenComplete((value, error) -> { if (error != null) { combined.completeExceptionally( error instanceof CompletionException ce ? ce : new CompletionException(error)); return; } results[index] = value; if (remaining.decrementAndGet() == 0) { List<Object> orderedResults = new ArrayList<>(futures.length); for (Object result : results) orderedResults.add(result); combined.complete(orderedResults); } }); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
