Copilot commented on code in PR #2437:
URL: https://github.com/apache/groovy/pull/2437#discussion_r3036748295


##########
src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java:
##########
@@ -0,0 +1,741 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import groovy.concurrent.AwaitResult;
+import groovy.concurrent.Awaitable;
+import groovy.lang.Closure;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Internal runtime support for the {@code async}/{@code await}/{@code defer} 
language features.
+ * <p>
+ * This class contains the actual implementation invoked by compiler-generated 
code.
+ * User code should prefer the static methods on {@link 
groovy.concurrent.Awaitable}
+ * for combinators and configuration.
+ * <p>
+ * <b>Thread pool configuration:</b>
+ * <ul>
+ *   <li>On JDK 21+ the default executor is a virtual-thread-per-task executor.
+ *       Virtual threads make blocking within {@code await()} essentially 
free.</li>
+ *   <li>On JDK 17-20 the fallback is a cached daemon thread pool
+ *       whose maximum size is controlled by the system property
+ *       {@code groovy.async.parallelism} (default: {@code 256}).</li>
+ *   <li>The executor can be overridden at any time via {@link 
#setExecutor}.</li>
+ * </ul>
+ * <p>
+ * <b>Exception handling</b> follows a transparency principle: the
+ * <em>original</em> exception is rethrown without being wrapped.
+ *
+ * @see groovy.concurrent.Awaitable
+ * @since 6.0.0
+ */
+public class AsyncSupport {
+
+    private static final boolean VIRTUAL_THREADS_AVAILABLE;
+    private static final Executor VIRTUAL_THREAD_EXECUTOR;
+    private static final int FALLBACK_MAX_THREADS;
+    private static final Executor FALLBACK_EXECUTOR;
+
+    static {
+        Executor vtExecutor = null;
+        boolean vtAvailable = false;
+        try {
+            MethodHandle mh = MethodHandles.lookup().findStatic(
+                    Executors.class, "newVirtualThreadPerTaskExecutor",
+                    MethodType.methodType(ExecutorService.class));
+            vtExecutor = (Executor) mh.invoke();
+            vtAvailable = true;
+        } catch (Throwable ignored) {
+            // JDK < 21 — virtual threads not available
+        }
+        VIRTUAL_THREAD_EXECUTOR = vtExecutor;
+        VIRTUAL_THREADS_AVAILABLE = vtAvailable;
+
+        FALLBACK_MAX_THREADS = 
org.apache.groovy.util.SystemUtil.getIntegerSafe(
+                "groovy.async.parallelism", 256);
+        if (!VIRTUAL_THREADS_AVAILABLE) {
+            FALLBACK_EXECUTOR = new ThreadPoolExecutor(
+                    0, FALLBACK_MAX_THREADS,
+                    60L, TimeUnit.SECONDS,
+                    new SynchronousQueue<>(),
+                    r -> {
+                        Thread t = new Thread(r);
+                        t.setDaemon(true);
+                        t.setName("groovy-async-" + t.getId());
+                        return t;
+                    },
+                    new ThreadPoolExecutor.CallerRunsPolicy());
+        } else {
+            FALLBACK_EXECUTOR = null;
+        }
+    }
+
+    private static volatile Executor defaultExecutor =
+            VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : 
FALLBACK_EXECUTOR;
+
+    private static final ScheduledExecutorService SCHEDULER =
+            Executors.newSingleThreadScheduledExecutor(r -> {
+                Thread t = new Thread(r, "groovy-async-scheduler");
+                t.setDaemon(true);
+                return t;
+            });
+
+    private AsyncSupport() { }
+
+    // ---- executor configuration -----------------------------------------
+
+    /** Returns {@code true} if running on JDK 21+ with virtual thread 
support. */
+    public static boolean isVirtualThreadsAvailable() {
+        return VIRTUAL_THREADS_AVAILABLE;
+    }
+
+    /** Returns the current executor used for async tasks. */
+    public static Executor getExecutor() {
+        return defaultExecutor;
+    }
+
+    /** Sets the executor used for async tasks. */
+    public static void setExecutor(Executor executor) {
+        defaultExecutor = Objects.requireNonNull(executor, "executor must not 
be null");
+    }
+
+    /** Resets the executor to the default (virtual threads on JDK 21+, cached 
pool otherwise). */
+    public static void resetExecutor() {
+        defaultExecutor = VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR 
: FALLBACK_EXECUTOR;
+    }
+
+    // ---- await overloads ------------------------------------------------
+
+    /**
+     * Awaits the result of an {@link Awaitable}.
+     * Blocks the calling thread until the computation completes.
+     * The original exception is rethrown transparently.
+     */
+    public static <T> T await(Awaitable<T> awaitable) {
+        try {
+            return awaitable.get();
+        } catch (ExecutionException e) {
+            throw rethrowUnwrapped(e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            CancellationException ce = new CancellationException("Interrupted 
while awaiting");
+            ce.initCause(e);
+            throw ce;
+        }
+    }
+
+    /** Awaits a {@link CompletableFuture} using non-interruptible {@code 
join()}. */
+    public static <T> T await(CompletableFuture<T> future) {
+        try {
+            return future.join();
+        } catch (CompletionException e) {
+            throw rethrowUnwrapped(e);
+        } catch (CancellationException e) {
+            throw e;
+        }
+    }
+
+    /** Awaits a {@link CompletionStage} by converting to CompletableFuture. */
+    public static <T> T await(CompletionStage<T> stage) {
+        return await(stage.toCompletableFuture());
+    }
+
+    /** Awaits a {@link Future}. Delegates to the CF overload if applicable. */
+    public static <T> T await(Future<T> future) {
+        if (future instanceof CompletableFuture<T> cf) {
+            return await(cf);
+        }
+        try {
+            return future.get();
+        } catch (ExecutionException e) {
+            throw rethrowUnwrapped(e);
+        } catch (CancellationException e) {
+            throw e;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            CancellationException ce = new CancellationException("Interrupted 
while awaiting future");
+            ce.initCause(e);
+            throw ce;
+        }
+    }
+
+    /**
+     * Awaits an arbitrary object by adapting it via {@link 
Awaitable#from(Object)}.
+     * This is the fallback overload called by compiler-generated await 
expressions.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T await(Object source) {
+        if (source == null) return null;
+        if (source instanceof Awaitable) return await((Awaitable<T>) source);
+        if (source instanceof CompletableFuture) return 
await((CompletableFuture<T>) source);
+        if (source instanceof CompletionStage) return 
await((CompletionStage<T>) source);
+        if (source instanceof Future) return await((Future<T>) source);
+        if (source instanceof Closure) {
+            throw new IllegalArgumentException(
+                    "Cannot await a Closure directly. Call the closure first: 
await myClosure()");
+        }
+        return await(Awaitable.from(source));
+    }
+
+    // ---- async execution ------------------------------------------------
+
+    /**
+     * Executes the given closure asynchronously on the specified executor,
+     * returning an {@link Awaitable}.
+     */
+    public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor 
executor) {
+        Objects.requireNonNull(closure, "closure must not be null");
+        Executor targetExecutor = executor != null ? executor : 
defaultExecutor;
+        return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
+            try {
+                return closure.call();
+            } catch (Throwable t) {
+                throw wrapForFuture(t);
+            }
+        }, targetExecutor));
+    }
+
+    /**
+     * Executes the given closure asynchronously using the default executor.
+     */
+    public static <T> Awaitable<T> async(Closure<T> closure) {
+        return executeAsync(closure, defaultExecutor);
+    }
+
+    /**
+     * Lightweight task spawn. Executes the closure asynchronously using the 
default executor.
+     */
+    public static <T> Awaitable<T> go(Closure<T> closure) {
+        return executeAsync(closure, defaultExecutor);
+    }
+
+    /**
+     * Wraps a closure so that each invocation executes the body asynchronously
+     * and returns an {@link Awaitable}. This is the runtime entry point for
+     * the {@code async { ... }} expression syntax.
+     * <pre>
+     * def task = async { expensiveWork() }
+     * def result = await task()  // explicit call required
+     * </pre>
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) {
+        Objects.requireNonNull(closure, "closure must not be null");
+        return new Closure<Awaitable<T>>(closure.getOwner(), 
closure.getThisObject()) {
+            @SuppressWarnings("unused")
+            public Awaitable<T> doCall(Object... args) {
+                return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
+                    try {
+                        return closure.call(args);
+                    } catch (Throwable t) {
+                        throw wrapForFuture(t);
+                    }
+                }, defaultExecutor));
+            }
+        };
+    }
+
+    // ---- generators (yield return) ----------------------------------------
+
+    /**
+     * Called by compiler-generated code for {@code yield return expr} inside
+     * an async generator closure. Delegates to the bridge's yield method.
+     *
+     * @param bridge the GeneratorBridge instance (injected as synthetic 
parameter)
+     * @param value  the value to yield
+     */
+    public static void yieldReturn(Object bridge, Object value) {

Review Comment:
   `yieldReturn` blindly casts `bridge` to `GeneratorBridge` and will throw a 
`ClassCastException`/NPE if `yield return` is (accidentally) used outside an 
async generator context (or if the synthetic param isn’t present). Since the 
grammar allows `YIELD RETURN` as a statement anywhere, please add a defensive 
check here (e.g., validate type/non-null and throw an `IllegalStateException` 
with a clear message) to make misuse fail predictably.
   ```suggestion
       public static void yieldReturn(Object bridge, Object value) {
           if (!(bridge instanceof GeneratorBridge<?>)) {
               throw new IllegalStateException("yield return may only be used 
inside an async generator context");
           }
   ```



##########
src/main/java/org/apache/groovy/runtime/async/GeneratorBridge.java:
##########
@@ -0,0 +1,157 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.SynchronousQueue;
+
+/**
+ * A producer/consumer bridge for async generators ({@code yield return}).
+ * <p>
+ * The generator closure runs on a separate thread and calls {@link 
#yield(Object)}
+ * to produce values. The consumer iterates using {@link #hasNext()}/{@link 
#next()}.
+ * A {@link SynchronousQueue} provides the handoff — each {@code yield} blocks
+ * until the consumer takes the value, providing natural back-pressure.
+ * <p>
+ * With virtual threads (JDK 21+), both the producer and consumer block 
cheaply.
+ * On JDK 17-20, the producer runs on a platform thread from the cached pool.
+ *
+ * @param <T> the element type
+ * @since 6.0.0
+ */
+public final class GeneratorBridge<T> implements Iterator<T>, Closeable {
+
+    private static final Object DONE = new Object();
+    private static final Object ERROR = new Object();
+
+    private final SynchronousQueue<Object[]> handoff = new 
SynchronousQueue<>();
+    private Object[] pending;
+    private boolean done;
+    private volatile boolean closed;
+    private volatile Thread producerThread;
+
+    /**
+     * Called by the generator (producer thread) to yield a value.
+     * Blocks until the consumer calls {@link #next()}.
+     *
+     * @param value the value to yield (may be null)
+     * @throws GeneratorClosedException if the consumer has closed the bridge
+     */
+    public void yield(Object value) {
+        if (closed) throw new GeneratorClosedException();
+        producerThread = Thread.currentThread();
+        try {
+            handoff.put(new Object[]{value});
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new GeneratorClosedException();
+        } finally {
+            producerThread = null;
+        }
+    }
+
+    /**
+     * Called internally when the generator completes normally.
+     */
+    void complete() {
+        producerThread = Thread.currentThread();
+        try {
+            handoff.put(new Object[]{DONE});
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            producerThread = null;
+        }
+    }
+
+    /**
+     * Called internally when the generator throws an exception.
+     */
+    void completeExceptionally(Throwable error) {
+        producerThread = Thread.currentThread();
+        try {
+            handoff.put(new Object[]{ERROR, error});
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            producerThread = null;
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (done) return false;
+        if (pending != null) return true;
+        try {
+            pending = handoff.take();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            done = true;

Review Comment:
   If the consumer thread is interrupted while waiting in `hasNext()` 
(`handoff.take()`), the bridge marks the iterator done and returns false but 
never closes/interrupts the producer. This can leave the producer blocked in 
`handoff.put()` and (on JDK 17–20) permanently consume a platform thread from 
the bounded pool. Consider calling `close()` (or otherwise 
signalling/interrupting the producer) in the InterruptedException path so 
interruption can’t leak a blocked producer.
   ```suggestion
               close();
               Thread.currentThread().interrupt();
   ```



##########
src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java:
##########
@@ -0,0 +1,180 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent;
+
+import org.apache.groovy.runtime.async.AsyncSupport;
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.ServiceLoader;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+
+/**
+ * Central registry for {@link AwaitableAdapter} instances.
+ * <p>
+ * On class-load, adapters are discovered via {@link ServiceLoader} from
+ * {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. A built-in
+ * adapter is always present as the lowest-priority fallback, handling:
+ * <ul>
+ *   <li>{@link CompletableFuture} and {@link CompletionStage}</li>
+ *   <li>{@link Future} (adapted via a blocking wrapper)</li>
+ * </ul>
+ *
+ * @since 6.0.0
+ */
+public final class AwaitableAdapterRegistry {
+
+    private static final List<AwaitableAdapter> adapters = new 
CopyOnWriteArrayList<>();
+
+    private static volatile ClassValue<AwaitableAdapter> awaitableCache = 
buildAwaitableCache();
+
+    static {
+        // Load SPI adapters
+        try {
+            ServiceLoader<AwaitableAdapter> loader = ServiceLoader.load(
+                    AwaitableAdapter.class, 
AwaitableAdapterRegistry.class.getClassLoader());
+            for (Iterator<AwaitableAdapter> it = loader.iterator(); 
it.hasNext(); ) {
+                try {
+                    adapters.add(it.next());
+                } catch (Throwable ignored) {
+                    // Skip adapters that fail to load (missing dependencies)
+                }
+            }
+        } catch (Throwable ignored) {
+            // ServiceLoader failure — continue with built-in adapter only
+        }
+        // Built-in fallback adapter (lowest priority)
+        adapters.add(new BuiltInAdapter());
+    }
+
+    private AwaitableAdapterRegistry() { }
+
+    /**
+     * Registers an adapter at the highest priority (before SPI-loaded 
adapters).
+     */
+    public static void register(AwaitableAdapter adapter) {
+        Objects.requireNonNull(adapter);
+        adapters.add(0, adapter);
+        awaitableCache = buildAwaitableCache();
+    }
+
+    /**
+     * Removes a previously registered adapter.
+     */
+    public static void unregister(AwaitableAdapter adapter) {
+        adapters.remove(adapter);
+        awaitableCache = buildAwaitableCache();
+    }
+
+    /**
+     * Converts the given source to an {@link Awaitable}.
+     */
+    @SuppressWarnings("unchecked")
+    static <T> Awaitable<T> toAwaitable(Object source) {
+        if (source == null) {
+            return (Awaitable<T>) Awaitable.of(null);
+        }
+        if (source instanceof Awaitable) return (Awaitable<T>) source;
+        Class<?> type = source.getClass();
+        AwaitableAdapter adapter = awaitableCache.get(type);
+        if (adapter != null) {
+            return adapter.toAwaitable(source);
+        }
+        throw new IllegalArgumentException(
+                "No Awaitable adapter found for type: " + type.getName()
+                        + ". Register an AwaitableAdapter via ServiceLoader or 
AwaitableAdapterRegistry.register().");
+    }

Review Comment:
   `Awaitable.from(Object)` Javadoc says `source` must not be null and should 
throw `IllegalArgumentException` when null, but 
`AwaitableAdapterRegistry.toAwaitable(null)` currently returns a completed 
awaitable with null. Please align the implementation and documentation (either 
reject null consistently or update the Javadoc and combinator contracts that 
mention null is invalid).



##########
src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java:
##########
@@ -466,12 +467,20 @@ public Statement visitLoopStmtAlt(final 
LoopStmtAltContext ctx) {
 
     @Override
     public ForStatement visitForStmtAlt(final ForStmtAltContext ctx) {
+        boolean isForAwait = asBoolean(ctx.AWAIT());
         Function<Statement, ForStatement> maker = 
this.visitForControl(ctx.forControl());
 
         Statement loopBody = this.unpackStatement((Statement) 
this.visit(ctx.statement()));
 
         ForStatement forStatement = configureAST(maker.apply(loopBody), ctx);
         
visitAnnotationsOpt(ctx.annotationsOpt()).forEach(forStatement::addStatementAnnotation);
+
+        if (isForAwait) {
+            // Transform collection expression: wrap in 
AsyncSupport.toBlockingIterable()
+            Expression original = forStatement.getCollectionExpression();
+            
forStatement.setCollectionExpression(AsyncTransformHelper.buildToBlockingIterableCall(original));
+        }

Review Comment:
   `for await` currently only wraps the collection expression with 
`AsyncSupport.toBlockingIterable(...)`. There’s no corresponding 
close/cancellation hook when the loop exits early (break/exception), even 
though `AsyncSupport.closeIterable(...)` and 
`AsyncTransformHelper.buildCloseIterableCall(...)` exist and docs mention a 
compiler-generated finally block. Without closing, generator producers may 
remain blocked on `yield` and reactive iterables may keep 
subscriptions/resources longer than necessary. Consider wrapping the generated 
loop in try/finally to invoke `closeIterable` (or closing the iterator if it is 
AutoCloseable) when leaving the loop.



##########
src/main/java/org/apache/groovy/runtime/async/GeneratorBridge.java:
##########
@@ -0,0 +1,157 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.SynchronousQueue;
+
+/**
+ * A producer/consumer bridge for async generators ({@code yield return}).
+ * <p>
+ * The generator closure runs on a separate thread and calls {@link 
#yield(Object)}
+ * to produce values. The consumer iterates using {@link #hasNext()}/{@link 
#next()}.
+ * A {@link SynchronousQueue} provides the handoff — each {@code yield} blocks
+ * until the consumer takes the value, providing natural back-pressure.
+ * <p>
+ * With virtual threads (JDK 21+), both the producer and consumer block 
cheaply.
+ * On JDK 17-20, the producer runs on a platform thread from the cached pool.
+ *
+ * @param <T> the element type
+ * @since 6.0.0
+ */
+public final class GeneratorBridge<T> implements Iterator<T>, Closeable {
+
+    private static final Object DONE = new Object();
+    private static final Object ERROR = new Object();
+
+    private final SynchronousQueue<Object[]> handoff = new 
SynchronousQueue<>();
+    private Object[] pending;
+    private boolean done;
+    private volatile boolean closed;
+    private volatile Thread producerThread;
+
+    /**
+     * Called by the generator (producer thread) to yield a value.
+     * Blocks until the consumer calls {@link #next()}.
+     *
+     * @param value the value to yield (may be null)
+     * @throws GeneratorClosedException if the consumer has closed the bridge
+     */
+    public void yield(Object value) {
+        if (closed) throw new GeneratorClosedException();
+        producerThread = Thread.currentThread();
+        try {
+            handoff.put(new Object[]{value});
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new GeneratorClosedException();
+        } finally {

Review Comment:
   In GeneratorBridge.yield(), the `closed` check happens before 
`producerThread` is set. If `close()` is called in the tiny window between the 
check and the assignment, `close()` won’t be able to interrupt the producer and 
the subsequent `handoff.put(...)` can block indefinitely. Reorder the code so 
`producerThread` is set before checking/acting on `closed`, and re-check 
`closed` immediately before blocking in `put()` to make early-close reliably 
unblock producers.



##########
src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java:
##########
@@ -0,0 +1,741 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import groovy.concurrent.AwaitResult;
+import groovy.concurrent.Awaitable;
+import groovy.lang.Closure;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Internal runtime support for the {@code async}/{@code await}/{@code defer} 
language features.
+ * <p>
+ * This class contains the actual implementation invoked by compiler-generated 
code.
+ * User code should prefer the static methods on {@link 
groovy.concurrent.Awaitable}
+ * for combinators and configuration.
+ * <p>
+ * <b>Thread pool configuration:</b>
+ * <ul>
+ *   <li>On JDK 21+ the default executor is a virtual-thread-per-task executor.
+ *       Virtual threads make blocking within {@code await()} essentially 
free.</li>
+ *   <li>On JDK 17-20 the fallback is a cached daemon thread pool
+ *       whose maximum size is controlled by the system property
+ *       {@code groovy.async.parallelism} (default: {@code 256}).</li>
+ *   <li>The executor can be overridden at any time via {@link 
#setExecutor}.</li>
+ * </ul>
+ * <p>
+ * <b>Exception handling</b> follows a transparency principle: the
+ * <em>original</em> exception is rethrown without being wrapped.
+ *
+ * @see groovy.concurrent.Awaitable
+ * @since 6.0.0
+ */
+public class AsyncSupport {
+
+    private static final boolean VIRTUAL_THREADS_AVAILABLE;
+    private static final Executor VIRTUAL_THREAD_EXECUTOR;
+    private static final int FALLBACK_MAX_THREADS;
+    private static final Executor FALLBACK_EXECUTOR;
+
+    static {
+        Executor vtExecutor = null;
+        boolean vtAvailable = false;
+        try {
+            MethodHandle mh = MethodHandles.lookup().findStatic(
+                    Executors.class, "newVirtualThreadPerTaskExecutor",
+                    MethodType.methodType(ExecutorService.class));
+            vtExecutor = (Executor) mh.invoke();
+            vtAvailable = true;
+        } catch (Throwable ignored) {
+            // JDK < 21 — virtual threads not available
+        }
+        VIRTUAL_THREAD_EXECUTOR = vtExecutor;
+        VIRTUAL_THREADS_AVAILABLE = vtAvailable;
+
+        FALLBACK_MAX_THREADS = 
org.apache.groovy.util.SystemUtil.getIntegerSafe(
+                "groovy.async.parallelism", 256);
+        if (!VIRTUAL_THREADS_AVAILABLE) {
+            FALLBACK_EXECUTOR = new ThreadPoolExecutor(
+                    0, FALLBACK_MAX_THREADS,
+                    60L, TimeUnit.SECONDS,
+                    new SynchronousQueue<>(),
+                    r -> {
+                        Thread t = new Thread(r);
+                        t.setDaemon(true);
+                        t.setName("groovy-async-" + t.getId());
+                        return t;
+                    },
+                    new ThreadPoolExecutor.CallerRunsPolicy());
+        } else {
+            FALLBACK_EXECUTOR = null;
+        }
+    }
+
+    private static volatile Executor defaultExecutor =
+            VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : 
FALLBACK_EXECUTOR;
+
+    private static final ScheduledExecutorService SCHEDULER =
+            Executors.newSingleThreadScheduledExecutor(r -> {
+                Thread t = new Thread(r, "groovy-async-scheduler");
+                t.setDaemon(true);
+                return t;
+            });
+
+    private AsyncSupport() { }
+
+    // ---- executor configuration -----------------------------------------
+
+    /** Returns {@code true} if running on JDK 21+ with virtual thread 
support. */
+    public static boolean isVirtualThreadsAvailable() {
+        return VIRTUAL_THREADS_AVAILABLE;
+    }
+
+    /** Returns the current executor used for async tasks. */
+    public static Executor getExecutor() {
+        return defaultExecutor;
+    }
+
+    /** Sets the executor used for async tasks. */
+    public static void setExecutor(Executor executor) {
+        defaultExecutor = Objects.requireNonNull(executor, "executor must not 
be null");
+    }
+
+    /** Resets the executor to the default (virtual threads on JDK 21+, cached 
pool otherwise). */
+    public static void resetExecutor() {
+        defaultExecutor = VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR 
: FALLBACK_EXECUTOR;
+    }
+
+    // ---- await overloads ------------------------------------------------
+
+    /**
+     * Awaits the result of an {@link Awaitable}.
+     * Blocks the calling thread until the computation completes.
+     * The original exception is rethrown transparently.
+     */
+    public static <T> T await(Awaitable<T> awaitable) {
+        try {
+            return awaitable.get();
+        } catch (ExecutionException e) {
+            throw rethrowUnwrapped(e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            CancellationException ce = new CancellationException("Interrupted 
while awaiting");
+            ce.initCause(e);
+            throw ce;
+        }
+    }
+
+    /** Awaits a {@link CompletableFuture} using non-interruptible {@code 
join()}. */
+    public static <T> T await(CompletableFuture<T> future) {
+        try {
+            return future.join();
+        } catch (CompletionException e) {
+            throw rethrowUnwrapped(e);
+        } catch (CancellationException e) {
+            throw e;
+        }
+    }
+
+    /** Awaits a {@link CompletionStage} by converting to CompletableFuture. */
+    public static <T> T await(CompletionStage<T> stage) {
+        return await(stage.toCompletableFuture());
+    }
+
+    /** Awaits a {@link Future}. Delegates to the CF overload if applicable. */
+    public static <T> T await(Future<T> future) {
+        if (future instanceof CompletableFuture<T> cf) {
+            return await(cf);
+        }
+        try {
+            return future.get();
+        } catch (ExecutionException e) {
+            throw rethrowUnwrapped(e);
+        } catch (CancellationException e) {
+            throw e;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            CancellationException ce = new CancellationException("Interrupted 
while awaiting future");
+            ce.initCause(e);
+            throw ce;
+        }
+    }
+
+    /**
+     * Awaits an arbitrary object by adapting it via {@link 
Awaitable#from(Object)}.
+     * This is the fallback overload called by compiler-generated await 
expressions.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T await(Object source) {
+        if (source == null) return null;
+        if (source instanceof Awaitable) return await((Awaitable<T>) source);
+        if (source instanceof CompletableFuture) return 
await((CompletableFuture<T>) source);
+        if (source instanceof CompletionStage) return 
await((CompletionStage<T>) source);
+        if (source instanceof Future) return await((Future<T>) source);
+        if (source instanceof Closure) {
+            throw new IllegalArgumentException(
+                    "Cannot await a Closure directly. Call the closure first: 
await myClosure()");
+        }
+        return await(Awaitable.from(source));
+    }
+
+    // ---- async execution ------------------------------------------------
+
+    /**
+     * Executes the given closure asynchronously on the specified executor,
+     * returning an {@link Awaitable}.
+     */
+    public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor 
executor) {
+        Objects.requireNonNull(closure, "closure must not be null");
+        Executor targetExecutor = executor != null ? executor : 
defaultExecutor;
+        return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
+            try {
+                return closure.call();
+            } catch (Throwable t) {
+                throw wrapForFuture(t);
+            }
+        }, targetExecutor));
+    }
+
+    /**
+     * Executes the given closure asynchronously using the default executor.
+     */
+    public static <T> Awaitable<T> async(Closure<T> closure) {
+        return executeAsync(closure, defaultExecutor);
+    }
+
+    /**
+     * Lightweight task spawn. Executes the closure asynchronously using the 
default executor.
+     */
+    public static <T> Awaitable<T> go(Closure<T> closure) {
+        return executeAsync(closure, defaultExecutor);
+    }
+
+    /**
+     * Wraps a closure so that each invocation executes the body asynchronously
+     * and returns an {@link Awaitable}. This is the runtime entry point for
+     * the {@code async { ... }} expression syntax.
+     * <pre>
+     * def task = async { expensiveWork() }
+     * def result = await task()  // explicit call required
+     * </pre>
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) {
+        Objects.requireNonNull(closure, "closure must not be null");
+        return new Closure<Awaitable<T>>(closure.getOwner(), 
closure.getThisObject()) {
+            @SuppressWarnings("unused")
+            public Awaitable<T> doCall(Object... args) {
+                return GroovyPromise.of(CompletableFuture.supplyAsync(() -> {
+                    try {
+                        return closure.call(args);
+                    } catch (Throwable t) {
+                        throw wrapForFuture(t);
+                    }
+                }, defaultExecutor));
+            }
+        };
+    }
+
+    // ---- generators (yield return) ----------------------------------------
+
+    /**
+     * Called by compiler-generated code for {@code yield return expr} inside
+     * an async generator closure. Delegates to the bridge's yield method.
+     *
+     * @param bridge the GeneratorBridge instance (injected as synthetic 
parameter)
+     * @param value  the value to yield
+     */
+    public static void yieldReturn(Object bridge, Object value) {
+        ((GeneratorBridge<?>) bridge).yield(value);
+    }
+
+    /**
+     * Wraps a generator closure so that each invocation returns an {@link 
Iterable}
+     * backed by a {@link GeneratorBridge}. The generator runs on a background 
thread
+     * and yields values via the bridge.
+     * <p>
+     * This is the runtime entry point for {@code async { ... yield return ... 
}}
+     * expressions that contain {@code yield return}.
+     *
+     * @param closure the generator closure; receives a GeneratorBridge as 
first parameter
+     * @param <T>     the element type
+     * @return a closure that produces an Iterable when called
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Closure<Iterable<T>> wrapAsyncGenerator(Closure<?> 
closure) {
+        Objects.requireNonNull(closure, "closure must not be null");
+        return new Closure<Iterable<T>>(closure.getOwner(), 
closure.getThisObject()) {
+            @SuppressWarnings("unused")
+            public Iterable<T> doCall(Object... args) {
+                GeneratorBridge<T> bridge = new GeneratorBridge<>();
+                Object[] allArgs = new Object[args.length + 1];
+                allArgs[0] = bridge;
+                System.arraycopy(args, 0, allArgs, 1, args.length);
+                defaultExecutor.execute(() -> {
+                    try {
+                        closure.call(allArgs);
+                        bridge.complete();
+                    } catch (GeneratorBridge.GeneratorClosedException ignored) 
{
+                        // Consumer closed early — normal for break in 
for-await
+                    } catch (Throwable t) {
+                        bridge.completeExceptionally(t);
+                    }
+                });
+                return () -> bridge;
+            }
+        };
+    }
+
+    /**
+     * Starts a generator immediately, returning an {@link Iterable} backed by 
a
+     * {@link GeneratorBridge}. This is the runtime entry point for
+     * {@code async { ... yield return ... }} expressions.
+     *
+     * @param closure the generator closure; receives a GeneratorBridge as 
first parameter
+     * @param <T>     the element type
+     * @return an Iterable that yields values from the generator
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Iterable<T> asyncGenerator(Closure<?> closure) {
+        Objects.requireNonNull(closure, "closure must not be null");
+        GeneratorBridge<T> bridge = new GeneratorBridge<>();
+        Object[] args = new Object[]{bridge};
+        defaultExecutor.execute(() -> {
+            try {
+                closure.call(args);
+                bridge.complete();
+            } catch (GeneratorBridge.GeneratorClosedException ignored) {
+                // Consumer closed early — normal for break in for-await
+            } catch (Throwable t) {
+                bridge.completeExceptionally(t);
+            }
+        });
+        return () -> bridge;
+    }
+
+    // ---- for-await (blocking iterable conversion) -----------------------
+
+    /**
+     * Converts an arbitrary source to a blocking {@link Iterable} for use
+     * in {@code for await} loops. Handles arrays, collections, iterables,
+     * iterators, and adapter-supported types.
+     *
+     * @param source the source to convert
+     * @param <T>    the element type
+     * @return a blocking iterable
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Iterable<T> toBlockingIterable(Object source) {
+        if (source == null) return Collections.emptyList();
+        if (source instanceof Iterable) return (Iterable<T>) source;
+        if (source instanceof Iterator) {
+            Iterator<T> iter = (Iterator<T>) source;
+            return () -> iter;
+        }
+        if (source instanceof Object[]) return (Iterable<T>) 
Arrays.asList((Object[]) source);
+        // Try adapter registry
+        return 
groovy.concurrent.AwaitableAdapterRegistry.toBlockingIterable(source);
+    }
+
+    /**
+     * Closes a source if it implements {@link java.io.Closeable} or
+     * {@link AutoCloseable}. Called by compiler-generated finally block
+     * in {@code for await} loops.
+     */
+    public static void closeIterable(Object source) {
+        if (source instanceof java.io.Closeable c) {
+            try { c.close(); } catch (Exception ignored) { }
+        } else if (source instanceof AutoCloseable c) {
+            try { c.close(); } catch (Exception ignored) { }
+        }
+    }
+
+    // ---- combinators ----------------------------------------------------
+
+    /**
+     * Waits for all given sources to complete, returning their results in 
order.
+     * Multi-arg {@code await(a, b, c)} desugars to this.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> all(Object... sources) {
+        CompletableFuture<?>[] futures = Arrays.stream(sources)
+                .map(s -> Awaitable.from(s).toCompletableFuture())
+                .toArray(CompletableFuture[]::new);
+        CompletableFuture.allOf(futures).join();
+        List<T> results = new ArrayList<>(futures.length);
+        for (CompletableFuture<?> f : futures) {
+            results.add((T) f.join());
+        }
+        return results;
+    }
+
+    /**
+     * Returns the result of the first source to complete (success or failure).
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T any(Object... sources) {
+        CompletableFuture<?>[] futures = Arrays.stream(sources)
+                .map(s -> Awaitable.from(s).toCompletableFuture())
+                .toArray(CompletableFuture[]::new);
+        return (T) CompletableFuture.anyOf(futures).join();
+    }
+
+    /**
+     * Returns the result of the first source to complete 
<em>successfully</em>.
+     * Only fails when all sources fail.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T first(Object... sources) {
+        CompletableFuture<T>[] futures = Arrays.stream(sources)
+                .map(s -> (CompletableFuture<T>) 
Awaitable.from(s).toCompletableFuture())
+                .toArray(CompletableFuture[]::new);
+
+        CompletableFuture<T> result = new CompletableFuture<>();
+        var remaining = new 
java.util.concurrent.atomic.AtomicInteger(futures.length);
+        List<Throwable> errors = java.util.Collections.synchronizedList(new 
ArrayList<>());
+        for (CompletableFuture<T> f : futures) {
+            f.whenComplete((value, error) -> {
+                if (error == null) {
+                    result.complete(value);
+                } else {
+                    errors.add(error);
+                    if (remaining.decrementAndGet() == 0) {
+                        CompletionException aggregate = new 
CompletionException(
+                                "All " + futures.length + " tasks failed", 
errors.get(0));
+                        for (int i = 1; i < errors.size(); i++) {
+                            aggregate.addSuppressed(errors.get(i));
+                        }
+                        result.completeExceptionally(aggregate);
+                    }
+                }
+            });
+        }
+        try {
+            return result.join();
+        } catch (CompletionException e) {
+            throw rethrowUnwrapped(e);
+        }
+    }
+
+    /**
+     * Waits for all sources to settle (succeed or fail), returning a list of
+     * {@link AwaitResult} without throwing.
+     */
+    @SuppressWarnings("unchecked")
+    public static List<AwaitResult<Object>> allSettled(Object... sources) {
+        CompletableFuture<?>[] futures = Arrays.stream(sources)
+                .map(s -> Awaitable.from(s).toCompletableFuture())
+                .toArray(CompletableFuture[]::new);
+        CompletableFuture.allOf(
+                Arrays.stream(futures)
+                        .map(f -> f.handle((v, e) -> null))
+                        .toArray(CompletableFuture[]::new)
+        ).join();
+        List<AwaitResult<Object>> results = new ArrayList<>(futures.length);
+        for (CompletableFuture<?> f : futures) {
+            try {
+                results.add(AwaitResult.success(f.join()));
+            } catch (CompletionException e) {
+                results.add(AwaitResult.failure(unwrap(e)));
+            } catch (CancellationException e) {
+                results.add(AwaitResult.failure(e));
+            }
+        }
+        return results;
+    }
+
+    // ---- async combinator variants (return Awaitable, non-blocking) ------
+
+    /** Non-blocking variant of {@link #all} — returns an Awaitable. */
+    @SuppressWarnings("unchecked")
+    public static Awaitable<List<Object>> allAsync(Object... sources) {
+        CompletableFuture<?>[] futures = Arrays.stream(sources)
+                .map(s -> Awaitable.from(s).toCompletableFuture())
+                .toArray(CompletableFuture[]::new);
+
+        // Track the temporally-first failure explicitly, since
+        // CompletableFuture.allOf() doesn't guarantee which exception
+        // propagates when multiple futures fail.
+        var firstError = new 
java.util.concurrent.atomic.AtomicReference<Throwable>();
+        for (CompletableFuture<?> f : futures) {
+            f.whenComplete((v, e) -> {
+                if (e != null) firstError.compareAndSet(null, e);
+            });
+        }
+
+        CompletableFuture<List<Object>> combined = CompletableFuture.allOf(
+                Arrays.stream(futures).map(f -> f.handle((v, e) -> 
null)).toArray(CompletableFuture[]::new)
+        ).thenApply(v -> {
+            Throwable err = firstError.get();
+            if (err != null) throw err instanceof CompletionException ce ? ce 
: new CompletionException(err);
+            List<Object> results = new ArrayList<>(futures.length);
+            for (CompletableFuture<?> f : futures) results.add(f.join());
+            return results;
+        });

Review Comment:
   `allAsync` currently waits for *all* sources to complete (via 
`allOf(...handle...)`) and only then throws the first recorded error. This 
contradicts the `Awaitable.all(...)` Javadoc which states it fails as soon as 
the first source fails, and can cause `await Awaitable.all(...)` to block 
indefinitely if one task fails quickly while another never completes. Adjust 
the combinator to complete exceptionally immediately on the first failure 
(while leaving other tasks running unless explicitly cancelled).
   ```suggestion
           CompletableFuture<List<Object>> combined = new CompletableFuture<>();
           if (futures.length == 0) {
               combined.complete(Collections.emptyList());
               return GroovyPromise.of(combined);
           }
   
           Object[] results = new Object[futures.length];
           var remaining = new 
java.util.concurrent.atomic.AtomicInteger(futures.length);
           for (int i = 0; i < futures.length; i++) {
               final int index = i;
               futures[i].whenComplete((value, error) -> {
                   if (error != null) {
                       combined.completeExceptionally(
                               error instanceof CompletionException ce ? ce : 
new CompletionException(error));
                       return;
                   }
   
                   results[index] = value;
                   if (remaining.decrementAndGet() == 0) {
                       List<Object> orderedResults = new 
ArrayList<>(futures.length);
                       for (Object result : results) orderedResults.add(result);
                       combined.complete(orderedResults);
                   }
               });
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to