Copilot commented on code in PR #2387:
URL: https://github.com/apache/groovy/pull/2387#discussion_r2927677687
##########
src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java:
##########
@@ -465,14 +466,71 @@ public Statement visitLoopStmtAlt(final
LoopStmtAltContext ctx) {
}
@Override
- public ForStatement visitForStmtAlt(final ForStmtAltContext ctx) {
+ public Statement visitForStmtAlt(final ForStmtAltContext ctx) {
+ // 'for await' async iteration
+ if (ctx.AWAIT() != null) {
+ return visitForAwait(ctx);
+ }
+
Function<Statement, ForStatement> maker =
this.visitForControl(ctx.forControl());
Statement loopBody = this.unpackStatement((Statement)
this.visit(ctx.statement()));
return configureAST(maker.apply(loopBody), ctx);
}
+ /**
+ * Transforms {@code for await (item in source) { ... }} into a while-loop
+ * over an {@link groovy.concurrent.AsyncStream}: the source expression is
+ * adapted via {@code AsyncSupport.toAsyncStream()}, then repeatedly polled
+ * with {@code moveNext()} / {@code getCurrent()}.
+ * <p>
+ * Variable modifiers (e.g. {@code final}) from the enhanced-for
declaration
+ * are applied to the synthesised loop variable, consistent with the
+ * standard {@code for (... in ...)} handling in
+ * {@link #visitEnhancedForControl}.
+ */
+ private Statement visitForAwait(final ForStmtAltContext ctx) {
+ ForControlContext forCtrl = ctx.forControl();
+ EnhancedForControlContext enhCtrl = forCtrl.enhancedForControl();
+ if (enhCtrl == null) {
+ throw createParsingFailedException("for await requires enhanced
for syntax: for await (item in source)", ctx);
+ }
+
+ ClassNode varType = this.visitType(enhCtrl.type());
Review Comment:
`enhCtrl.type()` is likely optional in enhanced-for syntax (e.g. `for (item
in src)`), so calling `visitType(enhCtrl.type())` can throw if the type node is
absent. Align this with the existing enhanced-for handling by defaulting to an
implicit/dynamic type when `enhCtrl.type()` is null (and still applying
modifiers via `variableModifiersOpt`).
```suggestion
ClassNode varType = (enhCtrl.type() != null ?
this.visitType(enhCtrl.type()) : ClassHelper.DYNAMIC_TYPE);
```
##########
src/main/java/groovy/transform/Async.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.transform;
+
+import org.codehaus.groovy.transform.GroovyASTTransformationClass;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Method annotation to make a method execute asynchronously and return an
+ * {@link groovy.concurrent.Awaitable Awaitable}.
+ * <p>
+ * When applied to a method, the {@code @Async} transformation will:
+ * <ul>
+ * <li>Change the method's return type to {@code Awaitable<T>} (where {@code
T} is the original return type)</li>
+ * <li>Execute the method body asynchronously via {@link
org.apache.groovy.runtime.async.AsyncSupport#executeAsync
AsyncSupport.executeAsync}</li>
+ * <li>Transform any {@code await(expr)} calls within the method to use
{@link org.apache.groovy.runtime.async.AsyncSupport#await
AsyncSupport.await}</li>
+ * <li>For generator methods containing {@code yield return}, return
+ * {@link groovy.concurrent.AsyncStream AsyncStream<T>} instead</li>
+ * <li>If the method contains {@code defer} statements, wrap the body
+ * in try-finally to execute deferred actions in LIFO order on exit</li>
+ * </ul>
+ * <p>
+ * The {@code await(future)} pattern inside an {@code @Async} method blocks
the async thread until the
+ * given future completes and returns the unwrapped result. This provides a
sequential programming model
+ * over asynchronous operations, similar to JavaScript's {@code async/await}
or C#'s {@code async/await}.
+ * <p>
+ * <em>Example usage:</em>
+ * <pre>
+ * import groovy.transform.Async
+ * import groovy.concurrent.Awaitable
+ *
+ * class DataService {
+ *
+ * {@code @}Async
+ * def fetchUser(long id) {
+ * def profile = await(fetchProfile(id))
+ * def orders = await(fetchOrders(id))
+ * return [profile: profile, orders: orders]
+ * }
+ *
+ * {@code @}Async
+ * Awaitable<Map> fetchProfile(long id) {
+ * return [name: "User$id"]
+ * }
+ *
+ * {@code @}Async
+ * Awaitable<List> fetchOrders(long id) {
Review Comment:
The Javadoc example contradicts the transformation’s validation in
`AsyncASTTransformation`, which rejects methods that already return
`Awaitable`. Update the example to use a non-`Awaitable` return type for
`@Async` methods (e.g., `Map` / `List` / `def`) so the documented usage matches
the actual compiler behavior.
```suggestion
* Map fetchProfile(long id) {
* return [name: "User$id"]
* }
*
* {@code @}Async
* List fetchOrders(long id) {
```
##########
src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform;
+
+import groovy.concurrent.AsyncStream;
+import groovy.concurrent.Awaitable;
+import groovy.transform.Async;
+import org.codehaus.groovy.ast.ASTNode;
+import org.codehaus.groovy.ast.AnnotatedNode;
+import org.codehaus.groovy.ast.AnnotationNode;
+import org.codehaus.groovy.ast.ClassCodeExpressionTransformer;
+import org.codehaus.groovy.ast.ClassHelper;
+import org.codehaus.groovy.ast.ClassNode;
+import org.codehaus.groovy.ast.FieldNode;
+import org.codehaus.groovy.ast.GenericsType;
+import org.codehaus.groovy.ast.MethodNode;
+import org.codehaus.groovy.ast.Parameter;
+import org.codehaus.groovy.ast.VariableScope;
+import org.codehaus.groovy.ast.expr.ClosureExpression;
+import org.codehaus.groovy.ast.expr.Expression;
+import org.codehaus.groovy.ast.expr.MethodCallExpression;
+import org.codehaus.groovy.ast.stmt.Statement;
+import org.codehaus.groovy.control.CompilePhase;
+import org.codehaus.groovy.control.SourceUnit;
+
+import static org.codehaus.groovy.ast.tools.GeneralUtils.block;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.varX;
+import static
org.codehaus.groovy.ast.tools.GenericsUtils.makeClassSafeWithGenerics;
+
+/**
+ * Handles code generation for the {@link Async @Async} annotation.
+ * <p>
+ * Transforms the annotated method so that:
+ * <ol>
+ * <li>{@code await(expr)} calls within the method body are redirected to
+ * the runtime's {@code await()} via {@link AsyncTransformHelper}</li>
+ * <li>The method body is executed asynchronously — or as an async generator
+ * if it contains {@code yield return} — via factory methods on
+ * {@link AsyncTransformHelper}</li>
+ * <li>Methods containing {@code defer} statements are wrapped in a
+ * try-finally block that executes deferred actions in LIFO order</li>
+ * <li>The return type becomes {@code Awaitable<T>}
+ * (or {@code AsyncStream<T>} for generators)</li>
+ * </ol>
+ * <p>
+ * All AST node construction for async runtime calls is delegated to
+ * {@link AsyncTransformHelper}, which encapsulates the internal method
+ * names and class references. This class focuses on method-level
+ * validation and structural transformation.
+ * <p>
+ * This transformation runs during the {@link CompilePhase#CANONICALIZATION}
+ * phase — before type resolution, which allows the modified return types to
+ * participate in normal type checking.
+ *
+ * @see Async
+ * @see AsyncTransformHelper
+ * @since 6.0.0
+ */
+@GroovyASTTransformation(phase = CompilePhase.CANONICALIZATION)
+public class AsyncASTTransformation extends AbstractASTTransformation {
+
+ private static final Class<?> MY_CLASS = Async.class;
+ private static final ClassNode MY_TYPE = ClassHelper.make(MY_CLASS);
+ private static final String MY_TYPE_NAME = "@" +
MY_TYPE.getNameWithoutPackage();
+ private static final ClassNode AWAITABLE_TYPE =
ClassHelper.make(Awaitable.class);
+ private static final ClassNode ASYNC_STREAM_TYPE =
ClassHelper.make(AsyncStream.class);
+
+ @Override
+ public void visit(ASTNode[] nodes, SourceUnit source) {
+ init(nodes, source);
+ AnnotatedNode parent = (AnnotatedNode) nodes[1];
+ AnnotationNode anno = (AnnotationNode) nodes[0];
+ if (!MY_TYPE.equals(anno.getClassNode())) return;
+
+ if (!(parent instanceof MethodNode mNode)) return;
+
+ // Validate
+ if (mNode.isAbstract()) {
+ addError(MY_TYPE_NAME + " cannot be applied to abstract method '"
+ mNode.getName() + "'", mNode);
+ return;
+ }
+ if ("<init>".equals(mNode.getName()) ||
"<clinit>".equals(mNode.getName())) {
+ addError(MY_TYPE_NAME + " cannot be applied to constructors",
mNode);
+ return;
+ }
+ ClassNode originalReturnType = mNode.getReturnType();
+ if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName())
+ ||
ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName())
+ ||
"java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) {
Review Comment:
The error message says “already returns an async type”, but the check only
blocks `Awaitable`, `AsyncStream`, and `CompletableFuture`. Since
`CompletionStage`/`Future` are also async types in this PR’s model (and are
handled by `await`/adapters), allowing `@Async` on those return types can yield
confusing nested async types (e.g., `Awaitable<CompletionStage<T>>`). Consider
expanding this validation to include `CompletionStage` and `Future` (or adjust
the error text/documentation to reflect the narrower restriction).
```suggestion
||
"java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())
||
"java.util.concurrent.CompletionStage".equals(originalReturnType.getName())
||
"java.util.concurrent.Future".equals(originalReturnType.getName())) {
```
##########
src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform
+
+import groovy.concurrent.AsyncStream
+import groovy.concurrent.Awaitable
+import groovy.concurrent.AwaitableAdapter
+import groovy.concurrent.AwaitableAdapterRegistry
+import io.reactivex.rxjava3.core.Flowable
+import io.reactivex.rxjava3.core.Maybe
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.Single
+import org.apache.groovy.runtime.async.GroovyPromise
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+import static groovy.test.GroovyAssert.assertScript
+
+/**
+ * Integration tests for Groovy async/await with third-party reactive
+ * frameworks: Reactor (Spring WebFlux foundation), RxJava 3, and Spring-style
+ * async patterns.
+ * <p>
+ * All test logic is compiled via {@link groovy.test.GroovyAssert#assertScript}
+ * to verify that the async/await syntax works correctly from the developer's
+ * perspective. The {@link AwaitableAdapter} SPI adapters are registered in
+ * {@link #registerAdapters()} so that the globally shared
+ * {@link AwaitableAdapterRegistry} makes Reactor and RxJava types transparent
+ * to each script.
+ */
+class AsyncFrameworkIntegrationTest {
+
+ private ReactorAwaitableAdapter reactorAdapter
+ private RxJavaAwaitableAdapter rxJavaAdapter
+
+ /** Shared import preamble for Reactor-based assertScript tests. */
+ private static final String REACTOR_PREAMBLE = '''\
+import groovy.concurrent.*
+import org.apache.groovy.runtime.async.GroovyPromise
+import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.CompletionStage
+'''
+
+ /** Shared import preamble for RxJava-based assertScript tests. */
+ private static final String RXJAVA_PREAMBLE = '''\
+import groovy.concurrent.*
+import io.reactivex.rxjava3.core.Single
+import io.reactivex.rxjava3.core.Maybe
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.Flowable
+'''
+
+ /** Shared import preamble including both Reactor and RxJava. */
+ private static final String ALL_PREAMBLE = '''\
+import groovy.concurrent.*
+import org.apache.groovy.runtime.async.GroovyPromise
+import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import io.reactivex.rxjava3.core.Single
+import io.reactivex.rxjava3.core.Maybe
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.Flowable
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.CompletionStage
+'''
+
+ /** Spring-style service stubs defined inline for assertScript tests. */
+ private static final String SPRING_STUBS = '''
+class SpringStyleService {
+ CompletableFuture<Map> fetchUser(long id) {
+ CompletableFuture.supplyAsync { [id: id, name: "User${id}"] }
+ }
+ CompletionStage<String> processAsync(String input) {
+ CompletableFuture.supplyAsync { input.toUpperCase() }
+ }
+}
+class SpringWebFluxStyleController {
+ Mono<Map> getUser(long id) {
+ Mono.just([id: id, name: "User${id}"])
+ }
+ Flux<Map> listUsers() {
+ Flux.just([id: 1L, name: "User1"], [id: 2L, name: "User2"], [id: 3L,
name: "User3"])
+ }
+}
+'''
+
+ @BeforeEach
+ void registerAdapters() {
+ reactorAdapter = new ReactorAwaitableAdapter()
+ rxJavaAdapter = new RxJavaAwaitableAdapter()
+ AwaitableAdapterRegistry.register(reactorAdapter)
+ AwaitableAdapterRegistry.register(rxJavaAdapter)
+ }
+
+ @AfterEach
+ void resetExecutor() {
+ Awaitable.setExecutor(null)
+ if (rxJavaAdapter != null)
AwaitableAdapterRegistry.unregister(rxJavaAdapter)
+ if (reactorAdapter != null)
AwaitableAdapterRegistry.unregister(reactorAdapter)
+ }
+
+ // =====================================================================
+ // Reactor (Spring WebFlux foundation) tests
+ // =====================================================================
+
+ @Test
+ void testAwaitReactorMono() {
+ assertScript REACTOR_PREAMBLE + '''
+ def mono = Mono.just("reactor-value")
+ def result = await(mono)
+ assert result == "reactor-value"
+ '''
+ }
+
+ @Test
+ void testAwaitReactorMonoWithMap() {
+ assertScript REACTOR_PREAMBLE + '''
+ def mono = Mono.just(10).map { it * 3 }
+ def result = await(mono)
+ assert result == 30
+ '''
+ }
+
+ @Test
+ void testAwaitReactorMonoEmpty() {
+ assertScript REACTOR_PREAMBLE + '''
+ def mono = Mono.empty()
+ def result = await(mono)
+ assert result == null
+ '''
+ }
+
+ @Test
+ void testAwaitReactorMonoError() {
+ assertScript REACTOR_PREAMBLE + '''
+ def mono = Mono.error(new IllegalStateException("reactor error"))
+ try {
+ await(mono)
+ assert false : "should have thrown"
+ } catch (IllegalStateException e) {
+ assert e.message == "reactor error"
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitReactorMonoDeferred() {
+ assertScript REACTOR_PREAMBLE + '''
+ def mono = Mono.fromSupplier { Thread.sleep(50); "deferred" }
Review Comment:
Tests relying on small `Thread.sleep(...)` durations can be flaky on loaded
CI agents (timing variance) while not materially improving coverage of the
feature. Prefer using deterministic synchronization (e.g., latches) or removing
the sleep if the test intent is only to validate await semantics on a deferred
publisher.
```suggestion
def mono = Mono.fromSupplier { "deferred" }
```
##########
src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent;
+
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Central registry for {@link AwaitableAdapter} instances.
+ * <p>
+ * On class-load, adapters are discovered via {@link ServiceLoader} from
+ * {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. A built-in
+ * adapter is always present as the lowest-priority fallback, handling:
+ * <ul>
+ * <li>{@link CompletableFuture} and {@link CompletionStage}</li>
+ * <li>{@link Future} (adapted via a blocking wrapper)</li>
+ * <li>JDK {@link Flow.Publisher} — single-value
+ * ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
+ * with backpressure support and upstream cancellation when the
+ * resulting {@link AsyncStream} is {@linkplain AsyncStream#close()
closed}</li>
+ * </ul>
+ * <p>
+ * Additional adapters can be registered at runtime via {@link #register}.
+ *
+ * @see AwaitableAdapter
+ * @since 6.0.0
+ */
+public class AwaitableAdapterRegistry {
+
+ private static final List<AwaitableAdapter> ADAPTERS = new
CopyOnWriteArrayList<>();
+
+ /**
+ * Optional executor supplier for blocking Future adaptation, to avoid
+ * starving the common pool. Defaults to null; when set, the provided
+ * executor is used instead of {@code CompletableFuture.runAsync}'s
default.
+ */
+ private static volatile Executor blockingExecutor;
+
+ static {
+ // SPI-discovered adapters
+ for (AwaitableAdapter adapter :
ServiceLoader.load(AwaitableAdapter.class)) {
+ ADAPTERS.add(adapter);
+ }
+ // Built-in fallback (lowest priority)
+ ADAPTERS.add(new BuiltInAdapter());
+ }
+
+ private AwaitableAdapterRegistry() { }
+
+ /**
+ * Registers an adapter with higher priority than existing ones.
+ *
+ * @return an {@link AutoCloseable} that, when closed, removes this adapter
+ * from the registry. Useful for test isolation.
+ */
+ public static AutoCloseable register(AwaitableAdapter adapter) {
+ ADAPTERS.add(0, adapter);
+ return () -> ADAPTERS.remove(adapter);
+ }
+
+ /**
+ * Removes the given adapter from the registry.
+ *
+ * @return {@code true} if the adapter was found and removed
+ */
+ public static boolean unregister(AwaitableAdapter adapter) {
+ return ADAPTERS.remove(adapter);
+ }
+
+ /**
+ * Sets the executor used for blocking {@link Future#get()} adaptation.
+ * When non-null, this executor is used instead of
+ * {@link CompletableFuture#runAsync(Runnable)}'s default executor to avoid
+ * pool starvation when many blocking futures are being adapted
+ * simultaneously.
+ *
+ * @param executor the executor to use, or {@code null} to use the default
+ */
+ public static void setBlockingExecutor(Executor executor) {
+ blockingExecutor = executor;
+ }
+
+ /**
+ * Converts the given source to an {@link Awaitable}.
+ * If the source is already an {@code Awaitable}, it is returned as-is.
+ *
+ * @param source the source object; must not be {@code null}
+ * @throws IllegalArgumentException if {@code source} is {@code null}
+ * or no adapter supports the source type
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Awaitable<T> toAwaitable(Object source) {
+ if (source == null) {
+ throw new IllegalArgumentException("Cannot convert null to
Awaitable");
+ }
+ if (source instanceof Awaitable) return (Awaitable<T>) source;
+ Class<?> type = source.getClass();
+ for (AwaitableAdapter adapter : ADAPTERS) {
+ if (adapter.supportsAwaitable(type)) {
+ return adapter.toAwaitable(source);
+ }
+ }
+ throw new IllegalArgumentException(
+ "No AwaitableAdapter found for type: " + type.getName()
+ + ". Register one via
AwaitableAdapterRegistry.register() or ServiceLoader.");
+ }
+
+ /**
+ * Converts the given source to an {@link AsyncStream}.
+ * If the source is already an {@code AsyncStream}, it is returned as-is.
+ *
+ * @param source the source object; must not be {@code null}
+ * @throws IllegalArgumentException if {@code source} is {@code null}
+ * or no adapter supports the source type
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> AsyncStream<T> toAsyncStream(Object source) {
+ if (source == null) {
+ throw new IllegalArgumentException("Cannot convert null to
AsyncStream");
+ }
+ if (source instanceof AsyncStream) return (AsyncStream<T>) source;
+ Class<?> type = source.getClass();
+ for (AwaitableAdapter adapter : ADAPTERS) {
+ if (adapter.supportsAsyncStream(type)) {
+ return adapter.toAsyncStream(source);
+ }
+ }
+ throw new IllegalArgumentException(
+ "No AsyncStream adapter found for type: " + type.getName()
+ + ". Register one via
AwaitableAdapterRegistry.register() or ServiceLoader.");
+ }
+
+ /**
+ * Built-in adapter handling JDK {@link CompletableFuture}, {@link
CompletionStage},
+ * {@link Future}, {@link Flow.Publisher},
+ * and {@link Iterable}/{@link Iterator} (for async stream bridging).
+ * <p>
+ * {@link CompletionStage} support enables seamless integration with
frameworks
+ * that return {@code CompletionStage} (e.g., Spring's async APIs,
Reactor's
+ * {@code Mono.toFuture()}, etc.) without any additional adapter
registration.
+ * <p>
+ * {@link Flow.Publisher} support enables seamless
+ * consumption of reactive streams via {@code for await} without any
adapter
+ * registration. This covers any reactive library that implements the JDK
+ * standard reactive-streams interface (Reactor, RxJava via adapters,
etc.).
+ */
+ private static class BuiltInAdapter implements AwaitableAdapter {
+
+ @Override
+ public boolean supportsAwaitable(Class<?> type) {
+ return CompletionStage.class.isAssignableFrom(type)
+ || Future.class.isAssignableFrom(type)
+ || Flow.Publisher.class.isAssignableFrom(type);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> Awaitable<T> toAwaitable(Object source) {
+ if (source instanceof CompletionStage) {
+ return new GroovyPromise<>(((CompletionStage<T>)
source).toCompletableFuture());
+ }
+ if (source instanceof Flow.Publisher<?> pub) {
+ return publisherToAwaitable(pub);
+ }
+ if (source instanceof Future) {
+ Future<T> future = (Future<T>) source;
+ CompletableFuture<T> cf = new CompletableFuture<>();
+ if (future.isDone()) {
+ completeFrom(cf, future);
+ } else {
+ Executor exec = blockingExecutor;
+ if (exec != null) {
+ CompletableFuture.runAsync(() -> completeFrom(cf,
future), exec);
+ } else {
+ CompletableFuture.runAsync(() -> completeFrom(cf,
future));
+ }
+ }
+ return new GroovyPromise<>(cf);
+ }
+ throw new IllegalArgumentException("Cannot convert to Awaitable: "
+ source.getClass());
+ }
+
+ @Override
+ public boolean supportsAsyncStream(Class<?> type) {
+ return Iterable.class.isAssignableFrom(type)
+ || Iterator.class.isAssignableFrom(type)
+ || Flow.Publisher.class.isAssignableFrom(type);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> AsyncStream<T> toAsyncStream(Object source) {
+ if (source instanceof Flow.Publisher<?> pub) {
+ return publisherToAsyncStream((Flow.Publisher<T>) pub);
+ }
+ final Iterator<T> iterator;
+ if (source instanceof Iterable) {
+ iterator = ((Iterable<T>) source).iterator();
+ } else if (source instanceof Iterator) {
+ iterator = (Iterator<T>) source;
+ } else {
+ throw new IllegalArgumentException("Cannot convert to
AsyncStream: " + source.getClass());
+ }
+ return new AsyncStream<T>() {
+ private T current;
+
+ @Override
+ public Awaitable<Boolean> moveNext() {
+ boolean hasNext = iterator.hasNext();
+ if (hasNext) current = iterator.next();
+ return Awaitable.of(hasNext);
+ }
+
+ @Override
+ public T getCurrent() {
+ return current;
+ }
+ };
+ }
+
+ // Signal wrappers for the publisher-to-async-stream queue, ensuring
+ // that values, errors, and completion are never confused — even when
+ // the element type T extends Throwable.
+ private static final Object COMPLETE_SENTINEL = new Object();
+ private record ValueSignal<T>(T value) { }
+ private record ErrorSignal(Throwable error) { }
+
+ /**
+ * Adapts a {@link Flow.Publisher} to an {@link AsyncStream} using a
+ * bounded blocking queue to bridge the push-based reactive-streams
+ * protocol to the pull-based {@code moveNext}/{@code getCurrent}
+ * pattern. Backpressure is managed by requesting one item at a time:
+ * each {@code moveNext()} call requests the next item from the
upstream
+ * subscription only after the previous item has been consumed.
+ * <p>
+ * All queue entries are wrapped in typed signal objects
+ * ({@code ValueSignal}, {@code ErrorSignal}, or a completion sentinel)
+ * so that element types extending {@link Throwable} are never
+ * misidentified as error signals.
+ * <p>
+ * Thread interruption during {@code queue.take()} is converted to a
+ * {@link CancellationException} that is <em>thrown directly</em>
rather
+ * than stored in a {@code CompletableFuture}. On JDK 23+,
+ * {@code CompletableFuture.get()} wraps stored
+ * {@code CancellationException}s in a new instance with message
+ * {@code "get"}, discarding the original message and cause chain.
+ * Throwing directly bypasses {@code CF.get()} entirely, ensuring
+ * deterministic behaviour across all JDK versions (17 – 25+).
+ * The interrupt flag is restored per Java convention, and the original
+ * {@code InterruptedException} is preserved as the
+ * {@linkplain Throwable#getCause() cause}. This matches the pattern
+ * used by {@link
org.apache.groovy.runtime.async.AsyncStreamGenerator#moveNext()}.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> AsyncStream<T>
publisherToAsyncStream(Flow.Publisher<T> publisher) {
+ BlockingQueue<Object> queue = new LinkedBlockingQueue<>(256);
+ AtomicReference<Flow.Subscription> subRef = new
AtomicReference<>();
+ AtomicBoolean closedRef = new AtomicBoolean(false);
+
+ publisher.subscribe(new Flow.Subscriber<T>() {
+ @Override
+ public void onSubscribe(Flow.Subscription s) {
+ if (!closedRef.get()) {
+ subRef.set(s);
+ s.request(1);
+ } else {
+ s.cancel();
+ }
+ }
+
+ @Override
+ public void onNext(T item) {
+ if (!closedRef.get()) {
+ queue.offer(new ValueSignal<>(item));
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (!closedRef.get()) {
+ queue.offer(new ErrorSignal(t));
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (!closedRef.get()) {
+ queue.offer(COMPLETE_SENTINEL);
+ }
+ }
+ });
+
+ return new AsyncStream<T>() {
+ private volatile T current;
+ private final AtomicBoolean streamClosed = new
AtomicBoolean(false);
+
+ @Override
+ public Awaitable<Boolean> moveNext() {
+ if (streamClosed.get()) {
+ return Awaitable.of(false);
+ }
+ CompletableFuture<Boolean> cf = new CompletableFuture<>();
+ try {
+ Object signal = queue.take();
+ if (signal == COMPLETE_SENTINEL) {
+ streamClosed.set(true);
+ cf.complete(false);
+ } else if (signal instanceof ErrorSignal es) {
+ streamClosed.set(true);
+ cf.completeExceptionally(es.error());
+ } else if (signal instanceof ValueSignal<?> vs) {
+ current = (T) vs.value();
+ cf.complete(true);
+ Flow.Subscription sub = subRef.get();
+ if (sub != null) sub.request(1);
+ }
+ } catch (InterruptedException e) {
+ if (streamClosed.get()) {
+ return Awaitable.of(false);
+ }
+ // Throw directly instead of storing in the
CompletableFuture.
+ // On JDK 23+, CF.get() wraps stored
CancellationExceptions in a
+ // new CancellationException("get"), discarding our
message and
+ // cause chain. Throwing directly avoids CF.get()
entirely and
+ // ensures deterministic behaviour across all JDK
versions.
+ // This matches the pattern used by
AsyncStreamGenerator.moveNext().
+ Thread.currentThread().interrupt();
+ CancellationException ce = new
CancellationException("Interrupted while waiting for next item");
+ ce.initCause(e);
+ throw ce;
+ }
+ return new GroovyPromise<>(cf);
+ }
+
+ @Override
+ public T getCurrent() {
+ return current;
+ }
+
+ @Override
+ public void close() {
+ if (!streamClosed.compareAndSet(false, true)) {
+ return;
+ }
+ closedRef.set(true);
+ Flow.Subscription subscription = subRef.getAndSet(null);
+ if (subscription != null) {
+ subscription.cancel();
+ }
+ queue.clear();
+ queue.offer(COMPLETE_SENTINEL);
+ }
+ };
+ }
+
+ /**
+ * Adapts a single-value {@link Flow.Publisher} to
+ * an {@link Awaitable}. Subscribes and takes the first emitted value.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<?>
publisher) {
+ CompletableFuture<T> cf = new CompletableFuture<>();
+ publisher.subscribe(new Flow.Subscriber<Object>() {
+ private Flow.Subscription subscription;
+
+ @Override
+ public void onSubscribe(Flow.Subscription s) {
+ this.subscription = s;
+ s.request(1);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void onNext(Object item) {
+ cf.complete((T) item);
+ subscription.cancel();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ cf.completeExceptionally(t);
+ }
+
+ @Override
+ public void onComplete() {
+ if (!cf.isDone()) cf.complete(null);
+ }
+ });
+ return new GroovyPromise<>(cf);
+ }
+
+ private static <T> void completeFrom(CompletableFuture<T> cf,
Future<T> future) {
+ try {
+ cf.complete(future.get());
+ } catch (Exception e) {
+ cf.completeExceptionally(e.getCause() != null ? e.getCause() :
e);
+ }
+ }
Review Comment:
This hides interrupt semantics: if `future.get()` throws
`InterruptedException`, the thread interrupt flag is not restored, and the
adapted awaitable completes exceptionally with `InterruptedException` rather
than making interruption/cancellation behavior consistent with the rest of the
async runtime (which restores interrupts and commonly uses
`CancellationException` with a cause). Handle `InterruptedException` explicitly
(restore interrupt flag and complete exceptionally with a consistent
cancellation/interrupt exception) and handle `ExecutionException` separately to
unwrap its cause without conflating it with interruption.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]