[ 
https://issues.apache.org/jira/browse/GROOVY-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18061964#comment-18061964
 ] 

ASF GitHub Bot commented on GROOVY-9381:
----------------------------------------

Copilot commented on code in PR #2386:
URL: https://github.com/apache/groovy/pull/2386#discussion_r2869503062


##########
src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java:
##########
@@ -0,0 +1,349 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent;
+
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+
+/**
+ * Central registry for {@link AwaitableAdapter} instances.
+ * <p>
+ * On class-load, adapters are discovered via {@link ServiceLoader} from
+ * {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. A built-in
+ * adapter is always present as the lowest-priority fallback, handling:
+ * <ul>
+ *   <li>{@link CompletableFuture} and {@link CompletionStage}</li>
+ *   <li>{@link Future} (adapted via a blocking wrapper)</li>
+ *   <li>JDK {@link java.util.concurrent.Flow.Publisher} — single-value
+ *       ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
+ *       with backpressure support</li>
+ * </ul>
+ * <p>
+ * Additional adapters can be registered at runtime via {@link #register}.
+ *
+ * @see AwaitableAdapter
+ * @since 6.0.0
+ */
+public class AwaitableAdapterRegistry {
+
+    private static final List<AwaitableAdapter> ADAPTERS = new 
CopyOnWriteArrayList<>();
+
+    /**
+     * Optional executor supplier for blocking Future adaptation, to avoid
+     * starving the common pool. Defaults to null; when set, the provided
+     * executor is used instead of {@code CompletableFuture.runAsync}'s 
default.
+     */
+    private static volatile Executor blockingExecutor;
+
+    static {
+        // SPI-discovered adapters
+        for (AwaitableAdapter adapter : 
ServiceLoader.load(AwaitableAdapter.class)) {
+            ADAPTERS.add(adapter);
+        }
+        // Built-in fallback (lowest priority)
+        ADAPTERS.add(new BuiltInAdapter());
+    }
+
+    private AwaitableAdapterRegistry() { }
+
+    /**
+     * Registers an adapter with higher priority than existing ones.
+     *
+     * @return an {@link AutoCloseable} that, when closed, removes this adapter
+     *         from the registry. Useful for test isolation.
+     */
+    public static AutoCloseable register(AwaitableAdapter adapter) {
+        ADAPTERS.add(0, adapter);
+        return () -> ADAPTERS.remove(adapter);
+    }
+
+    /**
+     * Removes the given adapter from the registry.
+     *
+     * @return {@code true} if the adapter was found and removed
+     */
+    public static boolean unregister(AwaitableAdapter adapter) {
+        return ADAPTERS.remove(adapter);
+    }
+
+    /**
+     * Sets the executor used for blocking {@link Future#get()} adaptation.
+     * When non-null, this executor is used instead of
+     * {@link CompletableFuture#runAsync(Runnable)}'s default executor to avoid
+     * pool starvation when many blocking futures are being adapted
+     * simultaneously.
+     *
+     * @param executor the executor to use, or {@code null} to use the default
+     */
+    public static void setBlockingExecutor(Executor executor) {
+        blockingExecutor = executor;
+    }
+
+    /**
+     * Converts the given source to an {@link Awaitable}.
+     * If the source is already an {@code Awaitable}, it is returned as-is.
+     *
+     * @throws IllegalArgumentException if no adapter supports the source type
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Awaitable<T> toAwaitable(Object source) {
+        if (source instanceof Awaitable) return (Awaitable<T>) source;
+        Class<?> type = source.getClass();
+        for (AwaitableAdapter adapter : ADAPTERS) {
+            if (adapter.supportsAwaitable(type)) {
+                return adapter.toAwaitable(source);
+            }
+        }
+        throw new IllegalArgumentException(
+                "No AwaitableAdapter found for type: " + type.getName()
+                        + ". Register one via 
AwaitableAdapterRegistry.register() or ServiceLoader.");
+    }
+
+    /**
+     * Converts the given source to an {@link AsyncStream}.
+     * If the source is already an {@code AsyncStream}, it is returned as-is.
+     *
+     * @throws IllegalArgumentException if no adapter supports the source type
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> AsyncStream<T> toAsyncStream(Object source) {
+        if (source instanceof AsyncStream) return (AsyncStream<T>) source;
+        Class<?> type = source.getClass();
+        for (AwaitableAdapter adapter : ADAPTERS) {
+            if (adapter.supportsAsyncStream(type)) {
+                return adapter.toAsyncStream(source);
+            }
+        }
+        throw new IllegalArgumentException(
+                "No AsyncStream adapter found for type: " + type.getName()
+                        + ". Register one via 
AwaitableAdapterRegistry.register() or ServiceLoader.");
+    }
+
+    /**
+     * Built-in adapter handling JDK {@link CompletableFuture}, {@link 
CompletionStage},
+     * {@link Future}, {@link java.util.concurrent.Flow.Publisher},
+     * and {@link Iterable}/{@link Iterator} (for async stream bridging).
+     * <p>
+     * {@link CompletionStage} support enables seamless integration with 
frameworks
+     * that return {@code CompletionStage} (e.g., Spring's async APIs, 
Reactor's
+     * {@code Mono.toFuture()}, etc.) without any additional adapter 
registration.
+     * <p>
+     * {@link java.util.concurrent.Flow.Publisher} support enables seamless
+     * consumption of reactive streams via {@code for await} without any 
adapter
+     * registration.  This covers any reactive library that implements the JDK
+     * standard reactive-streams interface (Reactor, RxJava via adapters, 
etc.).
+     */
+    private static class BuiltInAdapter implements AwaitableAdapter {
+
+        @Override
+        public boolean supportsAwaitable(Class<?> type) {
+            return CompletionStage.class.isAssignableFrom(type)
+                    || Future.class.isAssignableFrom(type)
+                    || 
java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> Awaitable<T> toAwaitable(Object source) {
+            if (source instanceof CompletionStage) {
+                return new GroovyPromise<>(((CompletionStage<T>) 
source).toCompletableFuture());
+            }
+            if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) {
+                return publisherToAwaitable(pub);
+            }
+            if (source instanceof Future) {
+                Future<T> future = (Future<T>) source;
+                CompletableFuture<T> cf = new CompletableFuture<>();
+                if (future.isDone()) {
+                    completeFrom(cf, future);
+                } else {
+                    Executor exec = blockingExecutor;
+                    if (exec != null) {
+                        CompletableFuture.runAsync(() -> completeFrom(cf, 
future), exec);
+                    } else {
+                        CompletableFuture.runAsync(() -> completeFrom(cf, 
future));

Review Comment:
   When adapting a non-completed `Future`, this uses 
`CompletableFuture.runAsync(...)` without an executor when `blockingExecutor` 
is null, which defaults to the ForkJoin common pool and can cause starvation 
because `Future#get()` is blocking. Consider defaulting `blockingExecutor` to a 
dedicated daemon executor (or wiring it to Groovy's async executor) so 
`awaitAll/awaitAny` on plain `Future`s doesn't block the common pool by default.
   ```suggestion
                           Thread t = new Thread(() -> completeFrom(cf, 
future), "AwaitableAdapterRegistry-blocking");
                           t.setDaemon(true);
                           t.start();
   ```





> Support async/await like ES7
> ----------------------------
>
>                 Key: GROOVY-9381
>                 URL: https://issues.apache.org/jira/browse/GROOVY-9381
>             Project: Groovy
>          Issue Type: New Feature
>            Reporter: Daniel Sun
>            Priority: Major
>
> Here is an example to show proposed syntax and backend API(Java's 
> {{CompletableFuture}} or GPars's {{{}Promise{}}}), but I think it's better 
> for Groovy to have its own {{Promise}} to decouple with Java API because 
> async/await as a language feature should be as stable as possible.
> {{async}} will generate the {{Awaitable}} instance such as Groovy {{Promise}} 
> implementing the {{Awaitable}} interface, and {{await}} can wait for any 
> {{Awaitable}} instance to complete and unwrap it for the result. 
> {code:java}
> /**
>  * 1. An async function that simulates a network API call.
>  * The 'async' keyword implies it runs asynchronously without blocking.
>  */
> async fetchUserData(userId) {
>     println "Starting to fetch data for user ${userId}..."
>     
>     // Simulate a 1-second network delay.
>     Thread.sleep(1000) 
>     
>     println "Fetch successful!"
>     // The 'async' function implicitly returns a "CompletableFuture" or 
> "Promise" containing this value.
>     return [userId: userId, name: 'Daniel']
> }
> /**
>  * 2. An async function that uses 'await' to consume the result.
>  */
> async processUserData() {
>     println "Process started, preparing to fetch user data..."
>     
>     try {
>         // 'await' pauses this function until fetchUserData completes
>         // and returns the final result directly.
>         def user = await fetchUserData(1)
>         
>         println "Data received: ${user}"
>         return "Processing complete for ${user.name}."
>         
>     } catch (Exception e) {
>         return "An error occurred: ${e.message}"
>     }
> }
> // --- Execution ---
> println "Script starting..."
> // Kick off the entire asynchronous process.
> def future = processUserData()
> // This line executes immediately, proving the process is non-blocking.
> println "Script continues to run while user data is being fetched in the 
> background..."
> def result = future.get()
> println "Script finished: ${result}"
> {code}
> Use async/await with closure or lambda expression:
> {code}
> // use closure
> def c = async {
>     println "Process started, preparing to fetch user data..."
>     
>     try {
>         // 'await' pauses this function until fetchUserData completes
>         // and returns the final result directly.
>         def user = await fetchUserData(1)
>         
>         println "Data received: ${user}"
>         return "Processing complete for ${user.name}."
>         
>     } catch (Exception e) {
>         return "An error occurred: ${e.message}"
>     }
> }
> def future = c()
> {code}
> {code}
> // use lambda expression
> def c = async () -> {
>     println "Process started, preparing to fetch user data..."
>     
>     try {
>         // 'await' pauses this function until fetchUserData completes
>         // and returns the final result directly.
>         def user = await fetchUserData(1)
>         
>         println "Data received: ${user}"
>         return "Processing complete for ${user.name}."
>         
>     } catch (Exception e) {
>         return "An error occurred: ${e.message}"
>     }
> }
> def future = c()
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to