Copilot commented on code in PR #2386: URL: https://github.com/apache/groovy/pull/2386#discussion_r2868613864
########## src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AsyncStream; +import groovy.concurrent.Awaitable; + +import java.util.concurrent.SynchronousQueue; + +/** + * A producer/consumer implementation of {@link AsyncStream} used by + * {@code async} methods that contain {@code yield return} statements. + * <p> + * The producer (method body) runs on a separate thread and calls + * {@link #yield(Object)} for each emitted element. The consumer + * calls {@link #moveNext()}/{@link #getCurrent()} — typically via + * a {@code for await} loop. + * <p> + * Uses a {@link SynchronousQueue} to provide natural back-pressure: + * the producer thread blocks at each {@code yield return} until the + * consumer has consumed the previous element (mirroring C#'s async + * iterator suspension semantics). + * <p> + * This class is an internal implementation detail and should not be referenced + * directly by user code. + * + * @param <T> the element type + * @since 6.0.0 + */ +public class AsyncStreamGenerator<T> implements AsyncStream<T> { + + private static final Object DONE = new Object(); + + private final SynchronousQueue<Object> queue = new SynchronousQueue<>(); + private T current; + + /** + * Produces the next element. Called from the generator body when + * a {@code yield return expr} statement is executed. Blocks until + * the consumer is ready. + */ + public void yield(Object value) { + try { + queue.put(new Item(value)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new java.util.concurrent.CancellationException("Interrupted during yield"); + } Review Comment: `SynchronousQueue.put(...)` in `yield()` blocks until a consumer calls `moveNext()`. If a consumer abandons iteration early (e.g., breaks out of a `for await` loop), the producer thread can block forever in `yield()`/`complete()`, leaking an executor thread. Consider adding a cancellation/close mechanism and ensuring compiler-generated `for await` loops signal early termination. ########## src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy: ########## @@ -0,0 +1,1138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.codehaus.groovy.transform + +import org.junit.jupiter.api.Test + +import static groovy.test.GroovyAssert.assertScript + +/** + * Tests for virtual thread integration, executor configuration, exception + * handling consistency across async methods/closures/lambdas, and coverage + * improvements for the async/await feature. + * + * @since 6.0.0 + */ +final class AsyncVirtualThreadTest { + + // ---- Virtual thread detection and usage ---- + + @Test + void testVirtualThreadsAvailable() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + // JDK 21+ should have virtual threads + def jdkVersion = Runtime.version().feature() + if (jdkVersion >= 21) { + assert isVirtualThreadsAvailable() + } + ''' + } + + @Test + void testAsyncMethodRunsOnVirtualThread() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + + class VTService { + async checkThread() { + return Thread.currentThread().isVirtual() + } + } + + if (isVirtualThreadsAvailable()) { + def service = new VTService() + def result = service.checkThread().get() + assert result == true + } + ''' + } + + @Test + void testAsyncClosureRunsOnVirtualThread() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + + if (isVirtualThreadsAvailable()) { + def asyncVirtual = async { Thread.currentThread().isVirtual() } + def awaitable = asyncVirtual() + assert await(awaitable) == true + } + ''' + } + + @Test + void testAsyncLambdaRunsOnVirtualThread() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + + if (isVirtualThreadsAvailable()) { + def asyncFn = async { x -> Thread.currentThread().isVirtual() } + def result = await(asyncFn(42)) + assert result == true + } + ''' + } + + @Test + void testForAwaitRunsOnVirtualThread() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + import groovy.concurrent.AsyncStream + + class VTGenerator { + async generate() { + yield return Thread.currentThread().isVirtual() + } + } + + if (isVirtualThreadsAvailable()) { + def gen = new VTGenerator() + def stream = gen.generate() + def results = [] + for await (item in stream) { + results << item + } + assert results == [true] + } + ''' + } + + @Test + void testHighConcurrencyWithVirtualThreads() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + + class HighConcurrency { + async compute(int n) { + Thread.sleep(10) + return n * 2 + } + } + + if (isVirtualThreadsAvailable()) { + def svc = new HighConcurrency() + // Launch 1000 concurrent tasks — trivial with virtual threads + def awaitables = (1..1000).collect { svc.compute(it) } + def results = awaitAll(awaitables as Object[]) + assert results.size() == 1000 + assert results[0] == 2 + assert results[999] == 2000 + } + ''' + } + + // ---- Executor configuration ---- + + @Test + void testCustomExecutorOverride() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + import java.util.concurrent.Executors + import java.util.concurrent.atomic.AtomicReference + + def savedExecutor = getExecutor() + try { + def customPool = Executors.newFixedThreadPool(2, { r -> + def t = new Thread(r) + t.setName("custom-async-" + t.getId()) + t + }) Review Comment: The `customPool` ExecutorService created here is never shut down and the thread factory creates non-daemon threads. This can leak threads and keep the test JVM alive. Please shut the pool down in the finally block (and consider setting threads daemon as a safety net). ########## src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java: ########## @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AsyncStream; +import groovy.concurrent.AwaitResult; +import groovy.concurrent.Awaitable; +import groovy.concurrent.AwaitableAdapterRegistry; +import groovy.lang.Closure; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Internal runtime support for the {@code async}/{@code await} language feature + * and the {@link groovy.transform.Async @Async} annotation. + * <p> + * This class contains the actual implementation invoked by compiler-generated + * code. User code should use the public-facing + * {@link groovy.concurrent.AsyncUtils} facade instead. + * <p> + * All overloads of {@code await()} go through the + * {@link AwaitableAdapterRegistry} so that third-party async types (RxJava + * {@code Single}, Reactor {@code Mono}, etc.) are supported transparently + * once an adapter is registered. + * <p> + * <b>Thread pool configuration</b> + * <ul> + * <li>On JDK 21+ the default executor is a virtual-thread-per-task executor + * obtained via {@code Executors.newVirtualThreadPerTaskExecutor()}.</li> + * <li>On earlier JDKs the fallback is a dedicated fixed thread pool whose + * size is controlled by the system property {@code groovy.async.parallelism} + * (default: {@code availableProcessors + 1}). All threads in this pool + * are daemon threads named {@code groovy-async-<id>}.</li> + * <li>The executor can be overridden at any time via {@link #setExecutor}.</li> + * </ul> + * <p> + * <b>Exception handling</b> follows the same transparency principle as C# and + * JavaScript: the <em>original</em> exception is rethrown without being wrapped + * in an {@link java.util.concurrent.ExecutionException ExecutionException} or + * {@link java.util.concurrent.CompletionException CompletionException}. + * + * @see groovy.concurrent.AsyncUtils + * @see groovy.transform.Async + * @see Awaitable + * @since 6.0.0 + */ +public class AsyncSupport { + + /** + * {@code true} if the running JVM supports virtual threads (JDK 21+). + * Detected once at class-load time via {@link MethodHandle} reflection. + */ + private static final boolean VIRTUAL_THREADS_AVAILABLE; + + private static final Executor VIRTUAL_THREAD_EXECUTOR; + + /** + * Fallback thread pool size when virtual threads are unavailable. + * Configurable via the system property {@code groovy.async.parallelism}. + * Defaults to {@code Runtime.getRuntime().availableProcessors() + 1}. + */ + private static final int FALLBACK_PARALLELISM; + + private static final Executor FALLBACK_EXECUTOR; + + static { + Executor vtExecutor = null; + boolean vtAvailable = false; + try { + MethodHandle mh = MethodHandles.lookup().findStatic( + Executors.class, "newVirtualThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class)); + vtExecutor = (Executor) mh.invoke(); + vtAvailable = true; + } catch (Throwable ignored) { + // JDK < 21 — virtual threads not available + } + VIRTUAL_THREAD_EXECUTOR = vtExecutor; + VIRTUAL_THREADS_AVAILABLE = vtAvailable; + + FALLBACK_PARALLELISM = org.apache.groovy.util.SystemUtil.getIntegerSafe( + "groovy.async.parallelism", + Runtime.getRuntime().availableProcessors() + 1); + if (!VIRTUAL_THREADS_AVAILABLE) { + FALLBACK_EXECUTOR = Executors.newFixedThreadPool(FALLBACK_PARALLELISM, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("groovy-async-" + t.getId()); + return t; + }); + } else { + FALLBACK_EXECUTOR = null; + } + } + + private static volatile Executor defaultExecutor = + VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : FALLBACK_EXECUTOR; + + private AsyncSupport() { } + + // ---- await overloads ------------------------------------------------ + + /** + * Awaits the result of an {@link Awaitable}. + * <p> + * Blocks the calling thread until the computation completes and returns + * its result. If the computation failed, the original exception is + * rethrown transparently (even if it is a checked exception). If the + * waiting thread is interrupted, an {@link java.util.concurrent.CancellationException} + * is thrown and the interrupt flag is restored. + * + * @param awaitable the computation to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(Awaitable<T> awaitable) { + try { + return awaitable.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting"); + ce.initCause(e); + throw ce; + } + } + + /** + * Awaits the result of a {@link CompletableFuture}. + * Uses {@link CompletableFuture#join()} for non-interruptible waiting. + * + * @param future the future to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(CompletableFuture<T> future) { + try { + return future.join(); + } catch (CompletionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } + } + + /** + * Awaits the result of a {@link CompletionStage} by converting it to a + * {@link CompletableFuture} first. + * + * @param stage the completion stage to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(CompletionStage<T> stage) { + return await(stage.toCompletableFuture()); + } + + /** + * Awaits the result of a {@link Future}. If the future is a + * {@link CompletableFuture}, delegates to the more efficient + * {@link #await(CompletableFuture)} overload. + * + * @param future the future to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(Future<T> future) { + if (future instanceof CompletableFuture<T> cf) { + return await(cf); + } + try { + return future.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting future"); + ce.initCause(e); + throw ce; + } + } + + /** + * Awaits an arbitrary object by adapting it to {@link Awaitable} via the + * {@link AwaitableAdapterRegistry}. This is the fallback overload called + * by the {@code await} expression when the compile-time type is not one + * of the other supported types. Returns {@code null} for a {@code null} + * argument. + * + * @param source the object to await + * @param <T> the result type + * @return the computed result, or {@code null} if {@code source} is {@code null} + */ + @SuppressWarnings("unchecked") + public static <T> T await(Object source) { + if (source == null) return null; + if (source instanceof Awaitable) return await((Awaitable<T>) source); + if (source instanceof CompletableFuture) return await((CompletableFuture<T>) source); + if (source instanceof CompletionStage) return await((CompletionStage<T>) source); + if (source instanceof Future) return await((Future<T>) source); + if (source instanceof Closure) return awaitClosure((Closure<?>) source); + return await(AwaitableAdapterRegistry.<T>toAwaitable(source)); + } + + /** + * Handles the case where a {@link Closure} is passed to {@code await}. + * <p> + * An {@code async} closure or lambda must be explicitly called before + * awaiting. For example, {@code await(myClosure())} is correct, while + * {@code await(myClosure)} is not. This method always throws an + * {@link IllegalArgumentException} with guidance on the correct usage. + * + * @param closure the closure that was incorrectly passed to await + * @param <T> the result type (never actually returned) + * @return never returns + * @throws IllegalArgumentException always + */ + @SuppressWarnings("unchecked") + private static <T> T awaitClosure(Closure<?> closure) { + throw new IllegalArgumentException( + "Cannot await a Closure directly. " + + "Call the closure first, then await the result: " + + "await(myClosure()) or await(myClosure(args))"); + } + + // ---- async ---------------------------------------------------------- + + /** + * Executes the given closure asynchronously on the specified executor, + * returning an {@link Awaitable} that completes with the closure's return + * value. Used by {@link groovy.transform.Async @Async} methods. + * + * @param closure the closure to execute + * @param executor the executor on which to run the closure + * @param <T> the result type + * @return an awaitable representing the pending computation + */ + @SuppressWarnings("unchecked") + public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor executor) { + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, executor)); + } + + /** + * Void variant of {@link #executeAsync(Closure, Executor)} for + * {@link groovy.transform.Async @Async} methods whose return type is + * {@code void}. + * + * @param closure the closure to execute + * @param executor the executor on which to run the closure + * @return an awaitable that completes when the closure finishes + */ + public static Awaitable<Void> executeAsyncVoid(Closure<?> closure, Executor executor) { + return GroovyPromise.of(CompletableFuture.runAsync(() -> { + try { + closure.call(); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, executor)); + } + + /** + * Executes the given closure asynchronously using the default executor, + * returning an {@link Awaitable}. This method is used internally by the + * {@link groovy.transform.Async @Async} annotation transformation and + * the {@code async} method modifier. For the {@code async { ... }} + * closure expression syntax, see {@link #wrapAsync(Closure)}. + * + * @param closure the closure to execute + * @param <T> the result type + * @return an awaitable representing the pending computation + */ + @SuppressWarnings("unchecked") + public static <T> Awaitable<T> async(Closure<T> closure) { + return executeAsync(closure, defaultExecutor); + } + + /** + * Wraps a closure so that each invocation executes the body asynchronously + * and returns an {@link Awaitable}. This is the runtime entry point for + * the {@code async { ... }} expression syntax. The returned closure must + * be explicitly called to start the async computation: + * <pre> + * def task = async { expensiveWork() } + * def result = await(task()) // explicit call required + * </pre> + * + * @param closure the closure to wrap + * @param <T> the result type + * @return a new closure whose calls produce awaitables + */ + @SuppressWarnings("unchecked") + public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) { + return new Closure<Awaitable<T>>(closure.getOwner(), closure.getThisObject()) { + @SuppressWarnings("unused") + public Awaitable<T> doCall(Object... args) { + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(args); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, defaultExecutor)); + } + }; + } + + /** + * Wraps a generator closure (one containing {@code yield return} statements) + * so that each invocation returns an {@link AsyncStream} producing the + * yielded elements. This is the runtime entry point for all async + * generator closures ({@code async { yield return ... }}). The returned + * closure must be explicitly called to start the generation: + * <pre> + * def gen = async { yield return 1; yield return 2 } + * for await (item in gen()) { println item } + * </pre> + * + * @param closure the generator closure to wrap + * @param <T> the element type + * @return a new closure whose calls produce async streams + */ + @SuppressWarnings("unchecked") + public static <T> Closure<AsyncStream<T>> wrapAsyncGenerator(Closure<?> closure) { + return new Closure<AsyncStream<T>>(closure.getOwner(), closure.getThisObject()) { Review Comment: `wrapAsyncGenerator` wraps the original closure without copying delegation settings (`delegate`/`resolveStrategy`). This can lead to different behavior between `async { ... }` and direct closure execution when delegation is used. Copy delegation-related fields from the original closure to the wrapper closure instance. ```suggestion return new Closure<AsyncStream<T>>(closure.getOwner(), closure.getThisObject()) { { setDelegate(closure.getDelegate()); setResolveStrategy(closure.getResolveStrategy()); } ``` ########## src/main/java/groovy/transform/Async.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.transform; + +import org.codehaus.groovy.transform.GroovyASTTransformationClass; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Method annotation to make a method execute asynchronously and return an + * {@link groovy.concurrent.Awaitable Awaitable}. + * <p> + * When applied to a method, the {@code @Async} transformation will: + * <ul> + * <li>Change the method's return type to {@code Awaitable<T>} (where {@code T} is the original return type)</li> + * <li>Execute the method body asynchronously via {@link org.apache.groovy.runtime.async.AsyncSupport#executeAsync AsyncSupport.executeAsync}</li> + * <li>Transform any {@code await(future)} calls within the method to use {@link groovy.concurrent.AsyncUtils#await AsyncUtils.await}</li> + * <li>For generator methods containing {@code yield return}, return + * {@link groovy.concurrent.AsyncStream AsyncStream<T>} instead</li> + * </ul> + * <p> + * The {@code await(future)} pattern inside an {@code @Async} method blocks the async thread until the + * given future completes and returns the unwrapped result. This provides a sequential programming model + * over asynchronous operations, similar to JavaScript's {@code async/await} or C#'s {@code async/await}. + * <p> + * <em>Example usage:</em> + * <pre> + * import groovy.transform.Async + * import groovy.concurrent.Awaitable + * + * class DataService { + * + * {@code @}Async + * def fetchUser(long id) { + * def profile = await(fetchProfile(id)) + * def orders = await(fetchOrders(id)) + * return [profile: profile, orders: orders] + * } + * + * {@code @}Async + * Awaitable<Map> fetchProfile(long id) { + * return [name: "User$id"] + * } + * + * {@code @}Async + * Awaitable<List> fetchOrders(long id) { + * return ["order1", "order2"] + * } + * } + * + * def service = new DataService() + * def awaitable = service.fetchUser(1) // returns Awaitable + * def result = awaitable.get() // blocks until complete + * assert result.profile.name == 'User1' + * assert result.orders.size() == 2 + * </pre> + * <p> + * For ad-hoc async execution without annotating a method, use the {@code async} keyword: + * <pre> + * def awaitable = async { + * def data = await fetchFromRemote() + * return process(data) + * } Review Comment: The example for “use the `async` keyword” shows `async { ... }` producing an Awaitable directly, but the implemented `async { ... }` syntax returns a Closure that must be invoked to start work (e.g., `def task = async { ... }; def awaitable = task()`). Please adjust the example to match the actual semantics. ```suggestion * def task = async { * def data = await fetchFromRemote() * return process(data) * } * def awaitable = task() ``` ########## src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java: ########## @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AsyncStream; +import groovy.concurrent.AwaitResult; +import groovy.concurrent.Awaitable; +import groovy.concurrent.AwaitableAdapterRegistry; +import groovy.lang.Closure; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Internal runtime support for the {@code async}/{@code await} language feature + * and the {@link groovy.transform.Async @Async} annotation. + * <p> + * This class contains the actual implementation invoked by compiler-generated + * code. User code should use the public-facing + * {@link groovy.concurrent.AsyncUtils} facade instead. + * <p> + * All overloads of {@code await()} go through the + * {@link AwaitableAdapterRegistry} so that third-party async types (RxJava + * {@code Single}, Reactor {@code Mono}, etc.) are supported transparently + * once an adapter is registered. + * <p> + * <b>Thread pool configuration</b> + * <ul> + * <li>On JDK 21+ the default executor is a virtual-thread-per-task executor + * obtained via {@code Executors.newVirtualThreadPerTaskExecutor()}.</li> + * <li>On earlier JDKs the fallback is a dedicated fixed thread pool whose + * size is controlled by the system property {@code groovy.async.parallelism} + * (default: {@code availableProcessors + 1}). All threads in this pool + * are daemon threads named {@code groovy-async-<id>}.</li> + * <li>The executor can be overridden at any time via {@link #setExecutor}.</li> + * </ul> + * <p> + * <b>Exception handling</b> follows the same transparency principle as C# and + * JavaScript: the <em>original</em> exception is rethrown without being wrapped + * in an {@link java.util.concurrent.ExecutionException ExecutionException} or + * {@link java.util.concurrent.CompletionException CompletionException}. + * + * @see groovy.concurrent.AsyncUtils + * @see groovy.transform.Async + * @see Awaitable + * @since 6.0.0 + */ +public class AsyncSupport { + + /** + * {@code true} if the running JVM supports virtual threads (JDK 21+). + * Detected once at class-load time via {@link MethodHandle} reflection. + */ + private static final boolean VIRTUAL_THREADS_AVAILABLE; + + private static final Executor VIRTUAL_THREAD_EXECUTOR; + + /** + * Fallback thread pool size when virtual threads are unavailable. + * Configurable via the system property {@code groovy.async.parallelism}. + * Defaults to {@code Runtime.getRuntime().availableProcessors() + 1}. + */ + private static final int FALLBACK_PARALLELISM; + + private static final Executor FALLBACK_EXECUTOR; + + static { + Executor vtExecutor = null; + boolean vtAvailable = false; + try { + MethodHandle mh = MethodHandles.lookup().findStatic( + Executors.class, "newVirtualThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class)); + vtExecutor = (Executor) mh.invoke(); + vtAvailable = true; + } catch (Throwable ignored) { + // JDK < 21 — virtual threads not available + } + VIRTUAL_THREAD_EXECUTOR = vtExecutor; + VIRTUAL_THREADS_AVAILABLE = vtAvailable; + + FALLBACK_PARALLELISM = org.apache.groovy.util.SystemUtil.getIntegerSafe( + "groovy.async.parallelism", + Runtime.getRuntime().availableProcessors() + 1); + if (!VIRTUAL_THREADS_AVAILABLE) { + FALLBACK_EXECUTOR = Executors.newFixedThreadPool(FALLBACK_PARALLELISM, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("groovy-async-" + t.getId()); + return t; + }); + } else { + FALLBACK_EXECUTOR = null; + } + } + + private static volatile Executor defaultExecutor = + VIRTUAL_THREADS_AVAILABLE ? VIRTUAL_THREAD_EXECUTOR : FALLBACK_EXECUTOR; + + private AsyncSupport() { } + + // ---- await overloads ------------------------------------------------ + + /** + * Awaits the result of an {@link Awaitable}. + * <p> + * Blocks the calling thread until the computation completes and returns + * its result. If the computation failed, the original exception is + * rethrown transparently (even if it is a checked exception). If the + * waiting thread is interrupted, an {@link java.util.concurrent.CancellationException} + * is thrown and the interrupt flag is restored. + * + * @param awaitable the computation to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(Awaitable<T> awaitable) { + try { + return awaitable.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting"); + ce.initCause(e); + throw ce; + } + } + + /** + * Awaits the result of a {@link CompletableFuture}. + * Uses {@link CompletableFuture#join()} for non-interruptible waiting. + * + * @param future the future to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(CompletableFuture<T> future) { + try { + return future.join(); + } catch (CompletionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } + } + + /** + * Awaits the result of a {@link CompletionStage} by converting it to a + * {@link CompletableFuture} first. + * + * @param stage the completion stage to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(CompletionStage<T> stage) { + return await(stage.toCompletableFuture()); + } + + /** + * Awaits the result of a {@link Future}. If the future is a + * {@link CompletableFuture}, delegates to the more efficient + * {@link #await(CompletableFuture)} overload. + * + * @param future the future to await + * @param <T> the result type + * @return the computed result + */ + public static <T> T await(Future<T> future) { + if (future instanceof CompletableFuture<T> cf) { + return await(cf); + } + try { + return future.get(); + } catch (ExecutionException e) { + throw rethrowUnwrapped(e); + } catch (CancellationException e) { + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + CancellationException ce = new CancellationException("Interrupted while awaiting future"); + ce.initCause(e); + throw ce; + } + } + + /** + * Awaits an arbitrary object by adapting it to {@link Awaitable} via the + * {@link AwaitableAdapterRegistry}. This is the fallback overload called + * by the {@code await} expression when the compile-time type is not one + * of the other supported types. Returns {@code null} for a {@code null} + * argument. + * + * @param source the object to await + * @param <T> the result type + * @return the computed result, or {@code null} if {@code source} is {@code null} + */ + @SuppressWarnings("unchecked") + public static <T> T await(Object source) { + if (source == null) return null; + if (source instanceof Awaitable) return await((Awaitable<T>) source); + if (source instanceof CompletableFuture) return await((CompletableFuture<T>) source); + if (source instanceof CompletionStage) return await((CompletionStage<T>) source); + if (source instanceof Future) return await((Future<T>) source); + if (source instanceof Closure) return awaitClosure((Closure<?>) source); + return await(AwaitableAdapterRegistry.<T>toAwaitable(source)); + } + + /** + * Handles the case where a {@link Closure} is passed to {@code await}. + * <p> + * An {@code async} closure or lambda must be explicitly called before + * awaiting. For example, {@code await(myClosure())} is correct, while + * {@code await(myClosure)} is not. This method always throws an + * {@link IllegalArgumentException} with guidance on the correct usage. + * + * @param closure the closure that was incorrectly passed to await + * @param <T> the result type (never actually returned) + * @return never returns + * @throws IllegalArgumentException always + */ + @SuppressWarnings("unchecked") + private static <T> T awaitClosure(Closure<?> closure) { + throw new IllegalArgumentException( + "Cannot await a Closure directly. " + + "Call the closure first, then await the result: " + + "await(myClosure()) or await(myClosure(args))"); + } + + // ---- async ---------------------------------------------------------- + + /** + * Executes the given closure asynchronously on the specified executor, + * returning an {@link Awaitable} that completes with the closure's return + * value. Used by {@link groovy.transform.Async @Async} methods. + * + * @param closure the closure to execute + * @param executor the executor on which to run the closure + * @param <T> the result type + * @return an awaitable representing the pending computation + */ + @SuppressWarnings("unchecked") + public static <T> Awaitable<T> executeAsync(Closure<T> closure, Executor executor) { + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { + try { + return closure.call(); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, executor)); + } + + /** + * Void variant of {@link #executeAsync(Closure, Executor)} for + * {@link groovy.transform.Async @Async} methods whose return type is + * {@code void}. + * + * @param closure the closure to execute + * @param executor the executor on which to run the closure + * @return an awaitable that completes when the closure finishes + */ + public static Awaitable<Void> executeAsyncVoid(Closure<?> closure, Executor executor) { + return GroovyPromise.of(CompletableFuture.runAsync(() -> { + try { + closure.call(); + } catch (Throwable t) { + throw wrapForFuture(t); + } + }, executor)); + } + + /** + * Executes the given closure asynchronously using the default executor, + * returning an {@link Awaitable}. This method is used internally by the + * {@link groovy.transform.Async @Async} annotation transformation and + * the {@code async} method modifier. For the {@code async { ... }} + * closure expression syntax, see {@link #wrapAsync(Closure)}. + * + * @param closure the closure to execute + * @param <T> the result type + * @return an awaitable representing the pending computation + */ + @SuppressWarnings("unchecked") + public static <T> Awaitable<T> async(Closure<T> closure) { + return executeAsync(closure, defaultExecutor); + } + + /** + * Wraps a closure so that each invocation executes the body asynchronously + * and returns an {@link Awaitable}. This is the runtime entry point for + * the {@code async { ... }} expression syntax. The returned closure must + * be explicitly called to start the async computation: + * <pre> + * def task = async { expensiveWork() } + * def result = await(task()) // explicit call required + * </pre> + * + * @param closure the closure to wrap + * @param <T> the result type + * @return a new closure whose calls produce awaitables + */ + @SuppressWarnings("unchecked") + public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) { + return new Closure<Awaitable<T>>(closure.getOwner(), closure.getThisObject()) { + @SuppressWarnings("unused") + public Awaitable<T> doCall(Object... args) { + return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { Review Comment: `wrapAsync` creates a new Closure wrapper but does not preserve the original closure's `delegate`/`resolveStrategy` (and potentially `directive`). This can change property/method resolution semantics for code that customizes delegation (common in DSLs). Consider copying delegation-related settings from the original closure onto the wrapper closure instance. ########## src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy: ########## @@ -0,0 +1,1138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.codehaus.groovy.transform + +import org.junit.jupiter.api.Test + +import static groovy.test.GroovyAssert.assertScript + +/** + * Tests for virtual thread integration, executor configuration, exception + * handling consistency across async methods/closures/lambdas, and coverage + * improvements for the async/await feature. + * + * @since 6.0.0 + */ +final class AsyncVirtualThreadTest { + + // ---- Virtual thread detection and usage ---- + + @Test + void testVirtualThreadsAvailable() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + // JDK 21+ should have virtual threads + def jdkVersion = Runtime.version().feature() + if (jdkVersion >= 21) { + assert isVirtualThreadsAvailable() + } + ''' + } + + @Test + void testAsyncMethodRunsOnVirtualThread() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + + class VTService { + async checkThread() { + return Thread.currentThread().isVirtual() + } + } + + if (isVirtualThreadsAvailable()) { + def service = new VTService() + def result = service.checkThread().get() + assert result == true + } + ''' + } + + @Test + void testAsyncClosureRunsOnVirtualThread() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + + if (isVirtualThreadsAvailable()) { + def asyncVirtual = async { Thread.currentThread().isVirtual() } + def awaitable = asyncVirtual() + assert await(awaitable) == true + } + ''' + } + + @Test + void testAsyncLambdaRunsOnVirtualThread() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + + if (isVirtualThreadsAvailable()) { + def asyncFn = async { x -> Thread.currentThread().isVirtual() } + def result = await(asyncFn(42)) + assert result == true + } + ''' + } + + @Test + void testForAwaitRunsOnVirtualThread() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + import groovy.concurrent.AsyncStream + + class VTGenerator { + async generate() { + yield return Thread.currentThread().isVirtual() + } + } + + if (isVirtualThreadsAvailable()) { + def gen = new VTGenerator() + def stream = gen.generate() + def results = [] + for await (item in stream) { + results << item + } + assert results == [true] + } + ''' + } + + @Test + void testHighConcurrencyWithVirtualThreads() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + + class HighConcurrency { + async compute(int n) { + Thread.sleep(10) + return n * 2 + } + } + + if (isVirtualThreadsAvailable()) { + def svc = new HighConcurrency() + // Launch 1000 concurrent tasks — trivial with virtual threads + def awaitables = (1..1000).collect { svc.compute(it) } + def results = awaitAll(awaitables as Object[]) + assert results.size() == 1000 + assert results[0] == 2 + assert results[999] == 2000 + } + ''' + } + + // ---- Executor configuration ---- + + @Test + void testCustomExecutorOverride() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + import java.util.concurrent.Executors + import java.util.concurrent.atomic.AtomicReference + + def savedExecutor = getExecutor() + try { + def customPool = Executors.newFixedThreadPool(2, { r -> + def t = new Thread(r) + t.setName("custom-async-" + t.getId()) + t + }) + setExecutor(customPool) + + def asyncName = async { + Thread.currentThread().getName() + } + def awaitable = asyncName() + def threadName = await(awaitable) + assert threadName.startsWith("custom-async-") + } finally { + setExecutor(savedExecutor) + } + ''' + } + + @Test + void testSetExecutorNullResetsToDefault() { + assertScript ''' + import static groovy.concurrent.AsyncUtils.* + import java.util.concurrent.Executors + + def originalExecutor = getExecutor() + // Set a custom executor + setExecutor(Executors.newSingleThreadExecutor()) + assert getExecutor() != originalExecutor + // Reset to null — should restore default + setExecutor(null) + def restored = getExecutor() + assert restored != null + // Verify it works + def awaitable = groovy.concurrent.AsyncUtils.async { 42 } + assert groovy.concurrent.AsyncUtils.await(awaitable) == 42 + // Restore original + setExecutor(originalExecutor) + ''' + } + + @Test + void testAsyncMethodWithCustomExecutorField() { + assertScript ''' + import groovy.transform.Async + import static groovy.concurrent.AsyncUtils.* + import java.util.concurrent.Executor + import java.util.concurrent.Executors + + class CustomExecutorService { + static Executor myPool = Executors.newFixedThreadPool(1, { r -> + def t = new Thread(r) + t.setName("my-pool-thread") Review Comment: `myPool` is a fixed thread pool whose threads are non-daemon (default) and it is never shut down. Because it is stored in a static field, it can outlive the script and keep the test JVM running. Consider using a daemon thread factory and/or an ExecutorService that is shut down at the end of the script/test. ```suggestion t.setName("my-pool-thread") t.setDaemon(true) ``` ########## src/main/java/groovy/transform/Async.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.transform; + +import org.codehaus.groovy.transform.GroovyASTTransformationClass; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Method annotation to make a method execute asynchronously and return an + * {@link groovy.concurrent.Awaitable Awaitable}. + * <p> + * When applied to a method, the {@code @Async} transformation will: + * <ul> + * <li>Change the method's return type to {@code Awaitable<T>} (where {@code T} is the original return type)</li> + * <li>Execute the method body asynchronously via {@link org.apache.groovy.runtime.async.AsyncSupport#executeAsync AsyncSupport.executeAsync}</li> + * <li>Transform any {@code await(future)} calls within the method to use {@link groovy.concurrent.AsyncUtils#await AsyncUtils.await}</li> Review Comment: Javadoc mismatch: the implementation rewrites await calls to the internal `org.apache.groovy.runtime.async.AsyncSupport.await(...)` (see AsyncASTTransformation), not `groovy.concurrent.AsyncUtils.await(...)`. Please update this bullet to reflect the actual generated call target, or rephrase it in terms of the public API if that's the intent. ```suggestion * <li>Transform any {@code await(future)} calls within the method to use {@link org.apache.groovy.runtime.async.AsyncSupport#await AsyncSupport.await}</li> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
