[
https://issues.apache.org/jira/browse/GROOVY-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18061964#comment-18061964
]
ASF GitHub Bot commented on GROOVY-9381:
----------------------------------------
Copilot commented on code in PR #2386:
URL: https://github.com/apache/groovy/pull/2386#discussion_r2869503062
##########
src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent;
+
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+
+/**
+ * Central registry for {@link AwaitableAdapter} instances.
+ * <p>
+ * On class-load, adapters are discovered via {@link ServiceLoader} from
+ * {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. A built-in
+ * adapter is always present as the lowest-priority fallback, handling:
+ * <ul>
+ * <li>{@link CompletableFuture} and {@link CompletionStage}</li>
+ * <li>{@link Future} (adapted via a blocking wrapper)</li>
+ * <li>JDK {@link java.util.concurrent.Flow.Publisher} — single-value
+ * ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
+ * with backpressure support</li>
+ * </ul>
+ * <p>
+ * Additional adapters can be registered at runtime via {@link #register}.
+ *
+ * @see AwaitableAdapter
+ * @since 6.0.0
+ */
+public class AwaitableAdapterRegistry {
+
+ private static final List<AwaitableAdapter> ADAPTERS = new
CopyOnWriteArrayList<>();
+
+ /**
+ * Optional executor supplier for blocking Future adaptation, to avoid
+ * starving the common pool. Defaults to null; when set, the provided
+ * executor is used instead of {@code CompletableFuture.runAsync}'s
default.
+ */
+ private static volatile Executor blockingExecutor;
+
+ static {
+ // SPI-discovered adapters
+ for (AwaitableAdapter adapter :
ServiceLoader.load(AwaitableAdapter.class)) {
+ ADAPTERS.add(adapter);
+ }
+ // Built-in fallback (lowest priority)
+ ADAPTERS.add(new BuiltInAdapter());
+ }
+
+ private AwaitableAdapterRegistry() { }
+
+ /**
+ * Registers an adapter with higher priority than existing ones.
+ *
+ * @return an {@link AutoCloseable} that, when closed, removes this adapter
+ * from the registry. Useful for test isolation.
+ */
+ public static AutoCloseable register(AwaitableAdapter adapter) {
+ ADAPTERS.add(0, adapter);
+ return () -> ADAPTERS.remove(adapter);
+ }
+
+ /**
+ * Removes the given adapter from the registry.
+ *
+ * @return {@code true} if the adapter was found and removed
+ */
+ public static boolean unregister(AwaitableAdapter adapter) {
+ return ADAPTERS.remove(adapter);
+ }
+
+ /**
+ * Sets the executor used for blocking {@link Future#get()} adaptation.
+ * When non-null, this executor is used instead of
+ * {@link CompletableFuture#runAsync(Runnable)}'s default executor to avoid
+ * pool starvation when many blocking futures are being adapted
+ * simultaneously.
+ *
+ * @param executor the executor to use, or {@code null} to use the default
+ */
+ public static void setBlockingExecutor(Executor executor) {
+ blockingExecutor = executor;
+ }
+
+ /**
+ * Converts the given source to an {@link Awaitable}.
+ * If the source is already an {@code Awaitable}, it is returned as-is.
+ *
+ * @throws IllegalArgumentException if no adapter supports the source type
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Awaitable<T> toAwaitable(Object source) {
+ if (source instanceof Awaitable) return (Awaitable<T>) source;
+ Class<?> type = source.getClass();
+ for (AwaitableAdapter adapter : ADAPTERS) {
+ if (adapter.supportsAwaitable(type)) {
+ return adapter.toAwaitable(source);
+ }
+ }
+ throw new IllegalArgumentException(
+ "No AwaitableAdapter found for type: " + type.getName()
+ + ". Register one via
AwaitableAdapterRegistry.register() or ServiceLoader.");
+ }
+
+ /**
+ * Converts the given source to an {@link AsyncStream}.
+ * If the source is already an {@code AsyncStream}, it is returned as-is.
+ *
+ * @throws IllegalArgumentException if no adapter supports the source type
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> AsyncStream<T> toAsyncStream(Object source) {
+ if (source instanceof AsyncStream) return (AsyncStream<T>) source;
+ Class<?> type = source.getClass();
+ for (AwaitableAdapter adapter : ADAPTERS) {
+ if (adapter.supportsAsyncStream(type)) {
+ return adapter.toAsyncStream(source);
+ }
+ }
+ throw new IllegalArgumentException(
+ "No AsyncStream adapter found for type: " + type.getName()
+ + ". Register one via
AwaitableAdapterRegistry.register() or ServiceLoader.");
+ }
+
+ /**
+ * Built-in adapter handling JDK {@link CompletableFuture}, {@link
CompletionStage},
+ * {@link Future}, {@link java.util.concurrent.Flow.Publisher},
+ * and {@link Iterable}/{@link Iterator} (for async stream bridging).
+ * <p>
+ * {@link CompletionStage} support enables seamless integration with
frameworks
+ * that return {@code CompletionStage} (e.g., Spring's async APIs,
Reactor's
+ * {@code Mono.toFuture()}, etc.) without any additional adapter
registration.
+ * <p>
+ * {@link java.util.concurrent.Flow.Publisher} support enables seamless
+ * consumption of reactive streams via {@code for await} without any
adapter
+ * registration. This covers any reactive library that implements the JDK
+ * standard reactive-streams interface (Reactor, RxJava via adapters,
etc.).
+ */
+ private static class BuiltInAdapter implements AwaitableAdapter {
+
+ @Override
+ public boolean supportsAwaitable(Class<?> type) {
+ return CompletionStage.class.isAssignableFrom(type)
+ || Future.class.isAssignableFrom(type)
+ ||
java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> Awaitable<T> toAwaitable(Object source) {
+ if (source instanceof CompletionStage) {
+ return new GroovyPromise<>(((CompletionStage<T>)
source).toCompletableFuture());
+ }
+ if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) {
+ return publisherToAwaitable(pub);
+ }
+ if (source instanceof Future) {
+ Future<T> future = (Future<T>) source;
+ CompletableFuture<T> cf = new CompletableFuture<>();
+ if (future.isDone()) {
+ completeFrom(cf, future);
+ } else {
+ Executor exec = blockingExecutor;
+ if (exec != null) {
+ CompletableFuture.runAsync(() -> completeFrom(cf,
future), exec);
+ } else {
+ CompletableFuture.runAsync(() -> completeFrom(cf,
future));
Review Comment:
When adapting a non-completed `Future`, this uses
`CompletableFuture.runAsync(...)` without an executor when `blockingExecutor`
is null, which defaults to the ForkJoin common pool and can cause starvation
because `Future#get()` is blocking. Consider defaulting `blockingExecutor` to a
dedicated daemon executor (or wiring it to Groovy's async executor) so
`awaitAll/awaitAny` on plain `Future`s doesn't block the common pool by default.
```suggestion
Thread t = new Thread(() -> completeFrom(cf,
future), "AwaitableAdapterRegistry-blocking");
t.setDaemon(true);
t.start();
```
> Support async/await like ES7
> ----------------------------
>
> Key: GROOVY-9381
> URL: https://issues.apache.org/jira/browse/GROOVY-9381
> Project: Groovy
> Issue Type: New Feature
> Reporter: Daniel Sun
> Priority: Major
>
> Here is an example to show proposed syntax and backend API(Java's
> {{CompletableFuture}} or GPars's {{{}Promise{}}}), but I think it's better
> for Groovy to have its own {{Promise}} to decouple with Java API because
> async/await as a language feature should be as stable as possible.
> {{async}} will generate the {{Awaitable}} instance such as Groovy {{Promise}}
> implementing the {{Awaitable}} interface, and {{await}} can wait for any
> {{Awaitable}} instance to complete and unwrap it for the result.
> {code:java}
> /**
> * 1. An async function that simulates a network API call.
> * The 'async' keyword implies it runs asynchronously without blocking.
> */
> async fetchUserData(userId) {
> println "Starting to fetch data for user ${userId}..."
>
> // Simulate a 1-second network delay.
> Thread.sleep(1000)
>
> println "Fetch successful!"
> // The 'async' function implicitly returns a "CompletableFuture" or
> "Promise" containing this value.
> return [userId: userId, name: 'Daniel']
> }
> /**
> * 2. An async function that uses 'await' to consume the result.
> */
> async processUserData() {
> println "Process started, preparing to fetch user data..."
>
> try {
> // 'await' pauses this function until fetchUserData completes
> // and returns the final result directly.
> def user = await fetchUserData(1)
>
> println "Data received: ${user}"
> return "Processing complete for ${user.name}."
>
> } catch (Exception e) {
> return "An error occurred: ${e.message}"
> }
> }
> // --- Execution ---
> println "Script starting..."
> // Kick off the entire asynchronous process.
> def future = processUserData()
> // This line executes immediately, proving the process is non-blocking.
> println "Script continues to run while user data is being fetched in the
> background..."
> def result = future.get()
> println "Script finished: ${result}"
> {code}
> Use async/await with closure or lambda expression:
> {code}
> // use closure
> def c = async {
> println "Process started, preparing to fetch user data..."
>
> try {
> // 'await' pauses this function until fetchUserData completes
> // and returns the final result directly.
> def user = await fetchUserData(1)
>
> println "Data received: ${user}"
> return "Processing complete for ${user.name}."
>
> } catch (Exception e) {
> return "An error occurred: ${e.message}"
> }
> }
> def future = c()
> {code}
> {code}
> // use lambda expression
> def c = async () -> {
> println "Process started, preparing to fetch user data..."
>
> try {
> // 'await' pauses this function until fetchUserData completes
> // and returns the final result directly.
> def user = await fetchUserData(1)
>
> println "Data received: ${user}"
> return "Processing complete for ${user.name}."
>
> } catch (Exception e) {
> return "An error occurred: ${e.message}"
> }
> }
> def future = c()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)