Copilot commented on code in PR #2387: URL: https://github.com/apache/groovy/pull/2387#discussion_r2969497079
########## src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy: ########## @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.codehaus.groovy.transform + +import groovy.concurrent.AsyncStream +import groovy.concurrent.Awaitable +import groovy.concurrent.AwaitableAdapter +import groovy.concurrent.AwaitableAdapterRegistry +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import org.apache.groovy.runtime.async.GroovyPromise +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +import static groovy.test.GroovyAssert.assertScript + +/** + * Integration tests for Groovy async/await with third-party reactive + * frameworks: Reactor (Spring WebFlux foundation), RxJava 3, and Spring-style + * async patterns. + * <p> + * All test logic is compiled via {@link groovy.test.GroovyAssert#assertScript} + * to verify that the async/await syntax works correctly from the developer's + * perspective. The {@link AwaitableAdapter} SPI adapters are registered in + * {@link #registerAdapters()} so that the globally shared + * {@link AwaitableAdapterRegistry} makes Reactor and RxJava types transparent + * to each script. + */ +class AsyncFrameworkIntegrationTest { + + private ReactorAwaitableAdapter reactorAdapter + private RxJavaAwaitableAdapter rxJavaAdapter + + /** Shared import preamble for Reactor-based assertScript tests. */ + private static final String REACTOR_PREAMBLE = '''\ +import groovy.concurrent.* +import org.apache.groovy.runtime.async.GroovyPromise +import reactor.core.publisher.Mono +import reactor.core.publisher.Flux +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +''' + + /** Shared import preamble for RxJava-based assertScript tests. */ + private static final String RXJAVA_PREAMBLE = '''\ +import groovy.concurrent.* +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Flowable +''' + + /** Shared import preamble including both Reactor and RxJava. */ + private static final String ALL_PREAMBLE = '''\ +import groovy.concurrent.* +import org.apache.groovy.runtime.async.GroovyPromise +import reactor.core.publisher.Mono +import reactor.core.publisher.Flux +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Flowable +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +''' + + /** Spring-style service stubs defined inline for assertScript tests. */ + private static final String SPRING_STUBS = ''' +class SpringStyleService { + CompletableFuture<Map> fetchUser(long id) { + CompletableFuture.supplyAsync { [id: id, name: "User${id}"] } + } + CompletionStage<String> processAsync(String input) { + CompletableFuture.supplyAsync { input.toUpperCase() } + } +} +class SpringWebFluxStyleController { + Mono<Map> getUser(long id) { + Mono.just([id: id, name: "User${id}"]) + } + Flux<Map> listUsers() { + Flux.just([id: 1L, name: "User1"], [id: 2L, name: "User2"], [id: 3L, name: "User3"]) + } +} +''' + + @BeforeEach + void registerAdapters() { + reactorAdapter = new ReactorAwaitableAdapter() + rxJavaAdapter = new RxJavaAwaitableAdapter() + AwaitableAdapterRegistry.register(reactorAdapter) + AwaitableAdapterRegistry.register(rxJavaAdapter) + } + + @AfterEach + void resetExecutor() { + Awaitable.setExecutor(null) + if (rxJavaAdapter != null) AwaitableAdapterRegistry.unregister(rxJavaAdapter) + if (reactorAdapter != null) AwaitableAdapterRegistry.unregister(reactorAdapter) + } + + // ===================================================================== + // Reactor (Spring WebFlux foundation) tests + // ===================================================================== + + @Test + void testAwaitReactorMono() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just("reactor-value") + def result = await(mono) + assert result == "reactor-value" + ''' + } + + @Test + void testAwaitReactorMonoWithMap() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(10).map { it * 3 } + def result = await(mono) + assert result == 30 + ''' + } + + @Test + void testAwaitReactorMonoEmpty() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.empty() + def result = await(mono) + assert result == null + ''' + } + + @Test + void testAwaitReactorMonoError() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.error(new IllegalStateException("reactor error")) + try { + await(mono) + assert false : "should have thrown" + } catch (IllegalStateException e) { + assert e.message == "reactor error" + } + ''' + } + + @Test + void testAwaitReactorMonoDeferred() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.fromSupplier { "deferred" } + def result = await(mono) + assert result == "deferred" + ''' + } + + @Test + void testAwaitReactorMonoChain() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just("hello") + .map { it.toUpperCase() } + .flatMap { s -> Mono.just("${s}!") } + def result = await(mono) + assert result == "HELLO!" + ''' + } + + @Test + void testForAwaitReactorFlux() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.just(1, 2, 3, 4, 5) + def results = [] + for await (item in flux) { + results << item + } + assert results == [1, 2, 3, 4, 5] + ''' + } + + @Test + void testForAwaitReactorFluxWithOperators() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.range(1, 10).filter { it % 2 == 0 }.map { it * 10 } + def results = [] + for await (item in flux) { + results << item + } + assert results == [20, 40, 60, 80, 100] + ''' + } + + @Test + void testForAwaitReactorFluxEmpty() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.empty() + def results = [] + for await (item in flux) { + results << item + } + assert results == [] + ''' + } + + @Test + void testReactorMonoToAwaitableApi() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(42) + Awaitable<Integer> awaitable = Awaitable.from(mono) + assert awaitable.get() == 42 + assert awaitable.isDone() + ''' + } + + @Test + void testReactorMonoAwaitableThen() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(5) + Awaitable<Integer> a = Awaitable.from(mono) + Awaitable<Integer> doubled = a.then { it * 2 } + assert doubled.get() == 10 + ''' + } + + // ===================================================================== + // RxJava tests + // ===================================================================== + + @Test + void testAwaitRxJavaSingle() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.just("rxjava-value") + def result = await(single) + assert result == "rxjava-value" + ''' + } + + @Test + void testAwaitRxJavaSingleWithMap() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.just(7).map { it * 6 } + def result = await(single) + assert result == 42 + ''' + } + + @Test + void testAwaitRxJavaSingleError() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.error(new RuntimeException("rx error")) + try { + await(single) + assert false : "should have thrown" + } catch (RuntimeException e) { + assert e.message == "rx error" + } + ''' + } + + @Test + void testAwaitRxJavaMaybe() { + assertScript RXJAVA_PREAMBLE + ''' + def maybe = Maybe.just("maybe-value") + def result = await(maybe) + assert result == "maybe-value" + ''' + } + + @Test + void testAwaitRxJavaMaybeEmpty() { + assertScript RXJAVA_PREAMBLE + ''' + def maybe = Maybe.empty() + def result = await(maybe) + assert result == null + ''' + } + + @Test + void testForAwaitRxJavaObservable() { + assertScript RXJAVA_PREAMBLE + ''' + def observable = Observable.just("a", "b", "c") + def results = [] + for await (item in observable) { + results << item + } + assert results == ["a", "b", "c"] + ''' + } + + @Test + void testForAwaitRxJavaObservableWithOperators() { + assertScript RXJAVA_PREAMBLE + ''' + def observable = Observable.range(1, 5).filter { it > 2 }.map { "item-${it}" } + def results = [] + for await (item in observable) { + results << item + } + assert results == ["item-3", "item-4", "item-5"] + ''' + } + + @Test + void testForAwaitRxJavaFlowable() { + assertScript RXJAVA_PREAMBLE + ''' + def flowable = Flowable.just(10, 20, 30) + def results = [] + for await (item in flowable) { + results << item + } + assert results == [10, 20, 30] + ''' + } + + @Test + void testRxJavaSingleToAwaitableApi() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.just("adapted") + Awaitable<String> awaitable = Awaitable.from(single) + assert awaitable.get() == "adapted" + assert awaitable.isDone() + ''' + } + + // ===================================================================== + // Spring-style async pattern tests + // + // Spring @Async methods return CompletableFuture (Spring 6+) or + // CompletionStage. Spring WebFlux returns Reactor Mono/Flux. + // These tests demonstrate seamless interoperability. + // ===================================================================== + + @Test + void testSpringStyleCompletableFutureService() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def springService = new SpringStyleService() + def result = await(springService.fetchUser(1L)) + assert result == [id: 1L, name: "User1"] + ''' + } + + @Test + void testSpringStyleCompletionStageService() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def springService = new SpringStyleService() + CompletionStage<String> stage = springService.processAsync("hello") + def result = await(stage) + assert result == "HELLO" + ''' + } + + @Test + void testSpringWebFluxMonoEndpoint() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def controller = new SpringWebFluxStyleController() + def mono = controller.getUser(42L) + def result = await(mono) + assert result == [id: 42L, name: "User42"] + ''' + } + + @Test + void testSpringWebFluxFluxEndpoint() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def controller = new SpringWebFluxStyleController() + def flux = controller.listUsers() + def results = [] + for await (item in flux) { + results << item + } + assert results.size() == 3 + assert results[0].name == "User1" + assert results[2].name == "User3" + ''' + } + + @Test + void testSpringStyleAsyncChainWithReactorMono() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def userService = new SpringWebFluxStyleController() + def enriched = userService.getUser(1L).map { user -> user + [role: "admin"] } + def result = await(enriched) + assert result.name == "User1" + assert result.role == "admin" + ''' + } + + @Test + void testSpringStyleMultipleAsyncCalls() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def service = new SpringStyleService() + def f1 = service.fetchUser(1L) + def f2 = service.fetchUser(2L) + def f3 = service.fetchUser(3L) + def results = [await(f1), await(f2), await(f3)] + assert results.collect { it.name } == ["User1", "User2", "User3"] + ''' + } + + @Test + void testSpringStyleCompletionStageAdapter() { + assertScript REACTOR_PREAMBLE + ''' + CompletionStage<String> stage = CompletableFuture.supplyAsync { "stage-value" } + Awaitable<String> awaitable = Awaitable.from(stage) + assert awaitable.get() == "stage-value" + ''' + } + + // ===================================================================== + // Cross-framework interoperability + // ===================================================================== + + @Test + void testMixedFrameworkAwait() { + assertScript ALL_PREAMBLE + ''' + def mono = Mono.just(10) + def single = Single.just(20) + def cf = CompletableFuture.supplyAsync { 30 } + def r1 = await(mono) + def r2 = await(single) + def r3 = await(cf) + assert r1 + r2 + r3 == 60 + ''' + } + + @Test + void testReactorToRxJavaInterop() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just("from-reactor") + Awaitable<String> awaitable = Awaitable.from(mono) + CompletableFuture<String> cf = awaitable.toCompletableFuture() + assert cf.get() == "from-reactor" + ''' + } + + @Test + void testAwaitableExceptionallyWithReactor() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.error(new RuntimeException("fail")) + Awaitable<String> awaitable = Awaitable.from(mono) + Awaitable<String> recovered = awaitable.exceptionally { "recovered" } + assert recovered.get() == "recovered" + ''' + } + + @Test + void testAwaitableThenComposeAcrossFrameworks() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(5) + Awaitable<Integer> a = Awaitable.from(mono) + Awaitable<Integer> composed = a.thenCompose { val -> + GroovyPromise.of(CompletableFuture.supplyAsync { val * 10 }) + } + assert composed.get() == 50 + ''' + } + + // ===================================================================== + // Reactive Programming patterns (Jochen's examples from GROOVY-9381) + // ===================================================================== + + @Test + void testForAwaitConsumingReactorFlux() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.generate( + { 0 }, + { state, sink -> + sink.next("3 x ${state} = ${3 * state}") + if (state == 4) sink.complete() + return state + 1 + } + ) + def results = [] + for await (item in flux) { + results << item + } + assert results == [ + '3 x 0 = 0', '3 x 1 = 3', '3 x 2 = 6', + '3 x 3 = 9', '3 x 4 = 12' + ] + ''' + } + + @Test + void testForAwaitConsumingRxJavaFlowable() { + assertScript RXJAVA_PREAMBLE + ''' + def flowable = Flowable.range(1, 5).map { "item-${it}" } + def results = [] + for await (item in flowable) { + results << item + } + assert results == ['item-1', 'item-2', 'item-3', 'item-4', 'item-5'] + ''' + } + + @Test + void testForAwaitConsumingRxJavaObservable() { + assertScript RXJAVA_PREAMBLE + ''' + def observable = Observable.just('alpha', 'beta', 'gamma') + def results = [] + for await (item in observable) { + results << item + } + assert results == ['alpha', 'beta', 'gamma'] + ''' + } + + @Test + void testForAwaitReactorFluxWithAsyncProcessing() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.just(1, 2, 3) + def results = [] + for await (val in flux) { + def doubled = await CompletableFuture.supplyAsync { val * 2 } + results << doubled + } + assert results == [2, 4, 6] + ''' + } + + @Test + void testForAwaitReactorFluxWithDeferCleanup() { + assertScript REACTOR_PREAMBLE + ''' + import java.util.concurrent.SubmissionPublisher + + class RPPatternTest { + static log = [] + + async processStreamWithCleanup() { + defer { log << 'cleanup-done' } + def publisher = new SubmissionPublisher<Integer>() + Thread.start { + Thread.sleep(50) + (1..3).each { publisher.submit(it) } + publisher.close() + } + def sum = 0 + for await (item in publisher) { + sum += item + } + log << "sum=$sum" + return sum + } + } + + def result = await new RPPatternTest().processStreamWithCleanup() + assert result == 6 + assert RPPatternTest.log == ['sum=6', 'cleanup-done'] + ''' + } + + // ===================================================================== + // Adapter implementation classes (used by @BeforeEach/@AfterEach) + // ===================================================================== + + /** + * Reactor adapter: supports {@link Mono} as {@link Awaitable} and + * {@link Flux} as {@link AsyncStream}. This is the pattern Spring + * WebFlux users would register. + */ + static class ReactorAwaitableAdapter implements AwaitableAdapter { + + @Override + boolean supportsAwaitable(Class<?> type) { + return Mono.isAssignableFrom(type) + } + + @Override + Awaitable toAwaitable(Object source) { + return new GroovyPromise<>(((Mono) source).toFuture()) + } + + @Override + boolean supportsAsyncStream(Class<?> type) { + return Flux.isAssignableFrom(type) + } + + @Override + AsyncStream toAsyncStream(Object source) { + def iter = ((Flux) source).toIterable().iterator() + return new AsyncStream() { + private Object current + + @Override + Awaitable<Boolean> moveNext() { + boolean hasNext = iter.hasNext() + if (hasNext) current = iter.next() + return Awaitable.of(hasNext) + } + + @Override + Object getCurrent() { return current } + } + } Review Comment: The test Reactor adapter converts `Flux` to a blocking `Iterator` and returns an `AsyncStream` whose `close()` is a no-op (default). This means `for await` early-exit (break/return/error) will not cancel the underlying Flux subscription, which can leak work/resources and may keep producing in the background. Consider overriding `close()` to cancel upstream (or use a Reactor primitive that provides explicit cancellation) so the adapter pattern demonstrated by the integration test matches intended runtime semantics. ########## src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy: ########## @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.codehaus.groovy.transform + +import groovy.concurrent.AsyncStream +import groovy.concurrent.Awaitable +import groovy.concurrent.AwaitableAdapter +import groovy.concurrent.AwaitableAdapterRegistry +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import org.apache.groovy.runtime.async.GroovyPromise +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +import static groovy.test.GroovyAssert.assertScript + +/** + * Integration tests for Groovy async/await with third-party reactive + * frameworks: Reactor (Spring WebFlux foundation), RxJava 3, and Spring-style + * async patterns. + * <p> + * All test logic is compiled via {@link groovy.test.GroovyAssert#assertScript} + * to verify that the async/await syntax works correctly from the developer's + * perspective. The {@link AwaitableAdapter} SPI adapters are registered in + * {@link #registerAdapters()} so that the globally shared + * {@link AwaitableAdapterRegistry} makes Reactor and RxJava types transparent + * to each script. + */ +class AsyncFrameworkIntegrationTest { + + private ReactorAwaitableAdapter reactorAdapter + private RxJavaAwaitableAdapter rxJavaAdapter + + /** Shared import preamble for Reactor-based assertScript tests. */ + private static final String REACTOR_PREAMBLE = '''\ +import groovy.concurrent.* +import org.apache.groovy.runtime.async.GroovyPromise +import reactor.core.publisher.Mono +import reactor.core.publisher.Flux +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +''' + + /** Shared import preamble for RxJava-based assertScript tests. */ + private static final String RXJAVA_PREAMBLE = '''\ +import groovy.concurrent.* +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Flowable +''' + + /** Shared import preamble including both Reactor and RxJava. */ + private static final String ALL_PREAMBLE = '''\ +import groovy.concurrent.* +import org.apache.groovy.runtime.async.GroovyPromise +import reactor.core.publisher.Mono +import reactor.core.publisher.Flux +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Flowable +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +''' + + /** Spring-style service stubs defined inline for assertScript tests. */ + private static final String SPRING_STUBS = ''' +class SpringStyleService { + CompletableFuture<Map> fetchUser(long id) { + CompletableFuture.supplyAsync { [id: id, name: "User${id}"] } + } + CompletionStage<String> processAsync(String input) { + CompletableFuture.supplyAsync { input.toUpperCase() } + } +} +class SpringWebFluxStyleController { + Mono<Map> getUser(long id) { + Mono.just([id: id, name: "User${id}"]) + } + Flux<Map> listUsers() { + Flux.just([id: 1L, name: "User1"], [id: 2L, name: "User2"], [id: 3L, name: "User3"]) + } +} +''' + + @BeforeEach + void registerAdapters() { + reactorAdapter = new ReactorAwaitableAdapter() + rxJavaAdapter = new RxJavaAwaitableAdapter() + AwaitableAdapterRegistry.register(reactorAdapter) + AwaitableAdapterRegistry.register(rxJavaAdapter) + } + + @AfterEach + void resetExecutor() { + Awaitable.setExecutor(null) + if (rxJavaAdapter != null) AwaitableAdapterRegistry.unregister(rxJavaAdapter) + if (reactorAdapter != null) AwaitableAdapterRegistry.unregister(reactorAdapter) + } + + // ===================================================================== + // Reactor (Spring WebFlux foundation) tests + // ===================================================================== + + @Test + void testAwaitReactorMono() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just("reactor-value") + def result = await(mono) + assert result == "reactor-value" + ''' + } + + @Test + void testAwaitReactorMonoWithMap() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(10).map { it * 3 } + def result = await(mono) + assert result == 30 + ''' + } + + @Test + void testAwaitReactorMonoEmpty() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.empty() + def result = await(mono) + assert result == null + ''' + } + + @Test + void testAwaitReactorMonoError() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.error(new IllegalStateException("reactor error")) + try { + await(mono) + assert false : "should have thrown" + } catch (IllegalStateException e) { + assert e.message == "reactor error" + } + ''' + } + + @Test + void testAwaitReactorMonoDeferred() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.fromSupplier { "deferred" } + def result = await(mono) + assert result == "deferred" + ''' + } + + @Test + void testAwaitReactorMonoChain() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just("hello") + .map { it.toUpperCase() } + .flatMap { s -> Mono.just("${s}!") } + def result = await(mono) + assert result == "HELLO!" + ''' + } + + @Test + void testForAwaitReactorFlux() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.just(1, 2, 3, 4, 5) + def results = [] + for await (item in flux) { + results << item + } + assert results == [1, 2, 3, 4, 5] + ''' + } + + @Test + void testForAwaitReactorFluxWithOperators() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.range(1, 10).filter { it % 2 == 0 }.map { it * 10 } + def results = [] + for await (item in flux) { + results << item + } + assert results == [20, 40, 60, 80, 100] + ''' + } + + @Test + void testForAwaitReactorFluxEmpty() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.empty() + def results = [] + for await (item in flux) { + results << item + } + assert results == [] + ''' + } + + @Test + void testReactorMonoToAwaitableApi() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(42) + Awaitable<Integer> awaitable = Awaitable.from(mono) + assert awaitable.get() == 42 + assert awaitable.isDone() + ''' + } + + @Test + void testReactorMonoAwaitableThen() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(5) + Awaitable<Integer> a = Awaitable.from(mono) + Awaitable<Integer> doubled = a.then { it * 2 } + assert doubled.get() == 10 + ''' + } + + // ===================================================================== + // RxJava tests + // ===================================================================== + + @Test + void testAwaitRxJavaSingle() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.just("rxjava-value") + def result = await(single) + assert result == "rxjava-value" + ''' + } + + @Test + void testAwaitRxJavaSingleWithMap() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.just(7).map { it * 6 } + def result = await(single) + assert result == 42 + ''' + } + + @Test + void testAwaitRxJavaSingleError() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.error(new RuntimeException("rx error")) + try { + await(single) + assert false : "should have thrown" + } catch (RuntimeException e) { + assert e.message == "rx error" + } + ''' + } + + @Test + void testAwaitRxJavaMaybe() { + assertScript RXJAVA_PREAMBLE + ''' + def maybe = Maybe.just("maybe-value") + def result = await(maybe) + assert result == "maybe-value" + ''' + } + + @Test + void testAwaitRxJavaMaybeEmpty() { + assertScript RXJAVA_PREAMBLE + ''' + def maybe = Maybe.empty() + def result = await(maybe) + assert result == null + ''' + } + + @Test + void testForAwaitRxJavaObservable() { + assertScript RXJAVA_PREAMBLE + ''' + def observable = Observable.just("a", "b", "c") + def results = [] + for await (item in observable) { + results << item + } + assert results == ["a", "b", "c"] + ''' + } + + @Test + void testForAwaitRxJavaObservableWithOperators() { + assertScript RXJAVA_PREAMBLE + ''' + def observable = Observable.range(1, 5).filter { it > 2 }.map { "item-${it}" } + def results = [] + for await (item in observable) { + results << item + } + assert results == ["item-3", "item-4", "item-5"] + ''' + } + + @Test + void testForAwaitRxJavaFlowable() { + assertScript RXJAVA_PREAMBLE + ''' + def flowable = Flowable.just(10, 20, 30) + def results = [] + for await (item in flowable) { + results << item + } + assert results == [10, 20, 30] + ''' + } + + @Test + void testRxJavaSingleToAwaitableApi() { + assertScript RXJAVA_PREAMBLE + ''' + def single = Single.just("adapted") + Awaitable<String> awaitable = Awaitable.from(single) + assert awaitable.get() == "adapted" + assert awaitable.isDone() + ''' + } + + // ===================================================================== + // Spring-style async pattern tests + // + // Spring @Async methods return CompletableFuture (Spring 6+) or + // CompletionStage. Spring WebFlux returns Reactor Mono/Flux. + // These tests demonstrate seamless interoperability. + // ===================================================================== + + @Test + void testSpringStyleCompletableFutureService() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def springService = new SpringStyleService() + def result = await(springService.fetchUser(1L)) + assert result == [id: 1L, name: "User1"] + ''' + } + + @Test + void testSpringStyleCompletionStageService() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def springService = new SpringStyleService() + CompletionStage<String> stage = springService.processAsync("hello") + def result = await(stage) + assert result == "HELLO" + ''' + } + + @Test + void testSpringWebFluxMonoEndpoint() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def controller = new SpringWebFluxStyleController() + def mono = controller.getUser(42L) + def result = await(mono) + assert result == [id: 42L, name: "User42"] + ''' + } + + @Test + void testSpringWebFluxFluxEndpoint() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def controller = new SpringWebFluxStyleController() + def flux = controller.listUsers() + def results = [] + for await (item in flux) { + results << item + } + assert results.size() == 3 + assert results[0].name == "User1" + assert results[2].name == "User3" + ''' + } + + @Test + void testSpringStyleAsyncChainWithReactorMono() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def userService = new SpringWebFluxStyleController() + def enriched = userService.getUser(1L).map { user -> user + [role: "admin"] } + def result = await(enriched) + assert result.name == "User1" + assert result.role == "admin" + ''' + } + + @Test + void testSpringStyleMultipleAsyncCalls() { + assertScript REACTOR_PREAMBLE + SPRING_STUBS + ''' + def service = new SpringStyleService() + def f1 = service.fetchUser(1L) + def f2 = service.fetchUser(2L) + def f3 = service.fetchUser(3L) + def results = [await(f1), await(f2), await(f3)] + assert results.collect { it.name } == ["User1", "User2", "User3"] + ''' + } + + @Test + void testSpringStyleCompletionStageAdapter() { + assertScript REACTOR_PREAMBLE + ''' + CompletionStage<String> stage = CompletableFuture.supplyAsync { "stage-value" } + Awaitable<String> awaitable = Awaitable.from(stage) + assert awaitable.get() == "stage-value" + ''' + } + + // ===================================================================== + // Cross-framework interoperability + // ===================================================================== + + @Test + void testMixedFrameworkAwait() { + assertScript ALL_PREAMBLE + ''' + def mono = Mono.just(10) + def single = Single.just(20) + def cf = CompletableFuture.supplyAsync { 30 } + def r1 = await(mono) + def r2 = await(single) + def r3 = await(cf) + assert r1 + r2 + r3 == 60 + ''' + } + + @Test + void testReactorToRxJavaInterop() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just("from-reactor") + Awaitable<String> awaitable = Awaitable.from(mono) + CompletableFuture<String> cf = awaitable.toCompletableFuture() + assert cf.get() == "from-reactor" + ''' + } + + @Test + void testAwaitableExceptionallyWithReactor() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.error(new RuntimeException("fail")) + Awaitable<String> awaitable = Awaitable.from(mono) + Awaitable<String> recovered = awaitable.exceptionally { "recovered" } + assert recovered.get() == "recovered" + ''' + } + + @Test + void testAwaitableThenComposeAcrossFrameworks() { + assertScript REACTOR_PREAMBLE + ''' + def mono = Mono.just(5) + Awaitable<Integer> a = Awaitable.from(mono) + Awaitable<Integer> composed = a.thenCompose { val -> + GroovyPromise.of(CompletableFuture.supplyAsync { val * 10 }) + } + assert composed.get() == 50 + ''' + } + + // ===================================================================== + // Reactive Programming patterns (Jochen's examples from GROOVY-9381) + // ===================================================================== + + @Test + void testForAwaitConsumingReactorFlux() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.generate( + { 0 }, + { state, sink -> + sink.next("3 x ${state} = ${3 * state}") + if (state == 4) sink.complete() + return state + 1 + } + ) + def results = [] + for await (item in flux) { + results << item + } + assert results == [ + '3 x 0 = 0', '3 x 1 = 3', '3 x 2 = 6', + '3 x 3 = 9', '3 x 4 = 12' + ] + ''' + } + + @Test + void testForAwaitConsumingRxJavaFlowable() { + assertScript RXJAVA_PREAMBLE + ''' + def flowable = Flowable.range(1, 5).map { "item-${it}" } + def results = [] + for await (item in flowable) { + results << item + } + assert results == ['item-1', 'item-2', 'item-3', 'item-4', 'item-5'] + ''' + } + + @Test + void testForAwaitConsumingRxJavaObservable() { + assertScript RXJAVA_PREAMBLE + ''' + def observable = Observable.just('alpha', 'beta', 'gamma') + def results = [] + for await (item in observable) { + results << item + } + assert results == ['alpha', 'beta', 'gamma'] + ''' + } + + @Test + void testForAwaitReactorFluxWithAsyncProcessing() { + assertScript REACTOR_PREAMBLE + ''' + def flux = Flux.just(1, 2, 3) + def results = [] + for await (val in flux) { + def doubled = await CompletableFuture.supplyAsync { val * 2 } + results << doubled + } + assert results == [2, 4, 6] + ''' + } + + @Test + void testForAwaitReactorFluxWithDeferCleanup() { + assertScript REACTOR_PREAMBLE + ''' + import java.util.concurrent.SubmissionPublisher + + class RPPatternTest { + static log = [] + + async processStreamWithCleanup() { + defer { log << 'cleanup-done' } + def publisher = new SubmissionPublisher<Integer>() + Thread.start { + Thread.sleep(50) + (1..3).each { publisher.submit(it) } + publisher.close() + } + def sum = 0 + for await (item in publisher) { + sum += item + } + log << "sum=$sum" + return sum + } + } + + def result = await new RPPatternTest().processStreamWithCleanup() + assert result == 6 + assert RPPatternTest.log == ['sum=6', 'cleanup-done'] + ''' + } + + // ===================================================================== + // Adapter implementation classes (used by @BeforeEach/@AfterEach) + // ===================================================================== + + /** + * Reactor adapter: supports {@link Mono} as {@link Awaitable} and + * {@link Flux} as {@link AsyncStream}. This is the pattern Spring + * WebFlux users would register. + */ + static class ReactorAwaitableAdapter implements AwaitableAdapter { + + @Override + boolean supportsAwaitable(Class<?> type) { + return Mono.isAssignableFrom(type) + } + + @Override + Awaitable toAwaitable(Object source) { + return new GroovyPromise<>(((Mono) source).toFuture()) + } + + @Override + boolean supportsAsyncStream(Class<?> type) { + return Flux.isAssignableFrom(type) + } + + @Override + AsyncStream toAsyncStream(Object source) { + def iter = ((Flux) source).toIterable().iterator() + return new AsyncStream() { + private Object current + + @Override + Awaitable<Boolean> moveNext() { + boolean hasNext = iter.hasNext() + if (hasNext) current = iter.next() + return Awaitable.of(hasNext) + } + + @Override + Object getCurrent() { return current } + } + } + } + + /** + * RxJava 3 adapter: supports {@link Single}/{@link Maybe} as + * {@link Awaitable} and {@link Observable}/{@link Flowable} as + * {@link AsyncStream}. + */ + static class RxJavaAwaitableAdapter implements AwaitableAdapter { + + @Override + boolean supportsAwaitable(Class<?> type) { + return Single.isAssignableFrom(type) || Maybe.isAssignableFrom(type) + } + + @Override + Awaitable toAwaitable(Object source) { + if (source instanceof Single) { + return new GroovyPromise<>(((Single) source).toCompletionStage().toCompletableFuture()) + } + if (source instanceof Maybe) { + return new GroovyPromise<>(((Maybe) source).toCompletionStage(null).toCompletableFuture()) + } + throw new IllegalArgumentException("Unsupported RxJava type: ${source.class}") + } + + @Override + boolean supportsAsyncStream(Class<?> type) { + return Observable.isAssignableFrom(type) || Flowable.isAssignableFrom(type) + } + + @Override + AsyncStream toAsyncStream(Object source) { + Iterator iter + if (source instanceof Observable) { + iter = ((Observable) source).blockingIterable().iterator() + } else if (source instanceof Flowable) { + iter = ((Flowable) source).blockingIterable().iterator() + } else { + throw new IllegalArgumentException("Unsupported RxJava type: ${source.class}") + } + return new AsyncStream() { + private Object current + + @Override + Awaitable<Boolean> moveNext() { + boolean hasNext = iter.hasNext() + if (hasNext) current = iter.next() + return Awaitable.of(hasNext) + } + + @Override + Object getCurrent() { return current } + } + } Review Comment: Same concern as the Reactor adapter: `blockingIterable()` + default no-op `close()` means early exit from `for await` won’t dispose/cancel the RxJava subscription. Update the test adapter to dispose upstream on `close()` to prevent background production and to reflect a robust adapter implementation pattern. ########## src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java: ########## @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AsyncStream; +import groovy.concurrent.Awaitable; +import groovy.concurrent.AwaitableAdapter; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Bridges JDK {@link Flow.Publisher} instances into Groovy's + * {@link Awaitable}/{@link AsyncStream} world. + * + * <h2>Registration</h2> + * This adapter is auto-discovered by the + * {@link groovy.concurrent.AwaitableAdapterRegistry} via the + * {@code META-INF/services/groovy.concurrent.AwaitableAdapter} file. + * It handles any object that implements {@link Flow.Publisher}. + * + * <h2>Adaptation modes</h2> + * <ul> + * <li><b>Single-value</b> ({@code await publisher}): + * subscribes, takes the first {@code onNext} item, cancels the + * upstream subscription, and completes the returned {@link Awaitable}.</li> + * <li><b>Multi-value</b> ({@code for await (item in publisher)}): + * wraps the publisher into an {@link AsyncStream} backed by a + * bounded {@link LinkedBlockingQueue}, providing natural + * back-pressure by requesting one item at a time.</li> + * </ul> + * + * <h2>Thread safety</h2> + * <p>All subscriber callbacks ({@code onSubscribe}, {@code onNext}, + * {@code onError}, {@code onComplete}) are safe for invocation from + * any thread. Subscription references use {@link AtomicReference} + * for safe publication and race-free cancellation. The close path + * uses a CAS on an {@link AtomicBoolean} to guarantee exactly-once + * cleanup semantics.</p> + * + * <h2>Reactive Streams compliance</h2> + * <p>This adapter follows the Reactive Streams specification + * (JDK {@link Flow} variant) rules:</p> + * <ul> + * <li>§2.5 — duplicate {@code onSubscribe} cancels the second subscription</li> + * <li>§2.13 — {@code null} items in {@code onNext} are rejected immediately</li> + * <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use + * blocking {@code put()} with a non-blocking {@code offer()} fallback + * when the publisher thread is interrupted — this prevents both silent + * item loss and consumer deadlock even under unexpected interrupts</li> + * <li>Terminal callbacks atomically close the stream via a single + * {@link AtomicBoolean} flag and clear/cancel the stored subscription. + * This makes post-terminal {@code onNext} calls from non-compliant + * publishers harmless and releases resources promptly.</li> + * <li>Back-pressure is enforced by requesting exactly one item after + * each consumed element; demand is signalled <em>before</em> + * {@code moveNext()} returns, preventing livelock when producer and + * consumer share the same thread pool</li> + * </ul> + * + * @see groovy.concurrent.AwaitableAdapterRegistry + * @see AsyncStream + * @since 6.0.0 + */ +public class FlowPublisherAdapter implements AwaitableAdapter { + + /** + * Returns {@code true} if the given type is assignable to + * {@link Flow.Publisher}, enabling single-value {@code await}. + * + * @param type the source type to check + * @return {@code true} if this adapter can handle the type + */ + @Override + public boolean supportsAwaitable(Class<?> type) { + return Flow.Publisher.class.isAssignableFrom(type); + } + + /** + * Returns {@code true} if the given type is assignable to + * {@link Flow.Publisher}, enabling multi-value {@code for await}. + * + * @param type the source type to check + * @return {@code true} if this adapter can produce an async stream + */ + @Override + public boolean supportsAsyncStream(Class<?> type) { + return Flow.Publisher.class.isAssignableFrom(type); + } + + /** + * Converts a {@link Flow.Publisher} to a single-value {@link Awaitable} + * by subscribing and taking the first emitted item. + * + * @param source the publisher instance + * @param <T> the element type + * @return an awaitable that resolves to the first emitted value + */ + @Override + @SuppressWarnings("unchecked") + public <T> Awaitable<T> toAwaitable(Object source) { + return publisherToAwaitable((Flow.Publisher<T>) source); + } + + /** + * Converts a {@link Flow.Publisher} to a multi-value {@link AsyncStream} + * for use with {@code for await} loops. + * + * @param source the publisher instance + * @param <T> the element type + * @return an async stream that yields publisher items + */ + @Override + @SuppressWarnings("unchecked") + public <T> AsyncStream<T> toAsyncStream(Object source) { + return publisherToAsyncStream((Flow.Publisher<T>) source); + } + + // ---- Single-value adaptation (await publisher) ---- + + /** + * Subscribes to the publisher, takes the <em>first</em> emitted item, + * cancels the upstream subscription, and returns a completed + * {@link Awaitable}. + * + * <p>If the publisher completes without emitting any item, + * the returned awaitable resolves to {@code null}.</p> + * + * @param publisher the upstream publisher + * @param <T> the element type + * @return an awaitable that completes with the first emitted value + */ + private <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<T> publisher) { + CompletableFuture<T> cf = new CompletableFuture<>(); + // AtomicReference ensures safe publication of the subscription + // across the onSubscribe thread and callback threads (§1.3). + AtomicReference<Flow.Subscription> subRef = new AtomicReference<>(); + // Guard against non-compliant publishers that send multiple signals + AtomicBoolean done = new AtomicBoolean(false); + + publisher.subscribe(new Flow.Subscriber<T>() { + @Override + public void onSubscribe(Flow.Subscription s) { + // §2.5: reject duplicate subscriptions + if (!subRef.compareAndSet(null, s)) { + s.cancel(); + return; + } + s.request(1); + } + + @Override + public void onNext(T item) { + // §2.13: null items are spec violations + if (item == null) { + onError(new NullPointerException( + "Flow.Publisher onNext received null (Reactive Streams §2.13)")); + return; + } + if (done.compareAndSet(false, true)) { + cf.complete(item); + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + } + } + + @Override + public void onError(Throwable t) { + if (done.compareAndSet(false, true)) { + // §2.13 requires non-null, but defend against non-compliant publishers + cf.completeExceptionally(t != null ? t + : new NullPointerException("onError called with null (Reactive Streams §2.13 violation)")); + // Cancel subscription to release resources (idempotent per §3.7) + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + } + } + + @Override + public void onComplete() { + // Publisher completed before emitting — resolve to null + if (done.compareAndSet(false, true)) { + cf.complete(null); + // Mirror onNext/onError cleanup for prompt resource release. + Flow.Subscription sub = subRef.getAndSet(null); + if (sub != null) sub.cancel(); + } + } + }); + + return new GroovyPromise<>(cf); + } + + // ---- Multi-value adaptation (for await publisher) ---- + + /** + * Wraps a {@link Flow.Publisher} into an {@link AsyncStream}, + * providing a pull-based iteration interface over a push-based source. + * + * <p>Back-pressure is enforced by requesting exactly one item after + * each consumed element. Demand is signalled <em>before</em> the + * consumer's {@code moveNext()} awaitable completes, so the publisher + * can begin producing the next value while the consumer processes the + * current one — this prevents livelock when producer and consumer + * share the same thread pool.</p> + * + * <p><b>Resource management:</b> When the consumer calls + * {@link AsyncStream#close()} (e.g. via {@code break} in a + * {@code for await} loop), the upstream subscription is cancelled + * and a completion sentinel is injected to unblock any pending + * {@code moveNext()} call.</p> + * + * @param publisher the upstream publisher + * @param <T> the element type + * @return an async stream that yields publisher items + */ + private <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> publisher) { + FlowAsyncStream<T> stream = new FlowAsyncStream<>(); + publisher.subscribe(stream.newSubscriber()); + return stream; + } + + /** + * Named implementation of {@link AsyncStream} that extends + * {@link AbstractAsyncStream} to bridge a push-based + * {@link Flow.Publisher} into a pull-based async stream. + * + * <p>Inherits the common {@link AbstractAsyncStream#moveNext() moveNext()}/ + * {@link AbstractAsyncStream#getCurrent() getCurrent()}/ + * {@link AbstractAsyncStream#close() close()} template and overrides:</p> + * <ul> + * <li>{@link #beforeTake()} — drains remaining signals before returning + * false (checks {@code closed && queue.isEmpty()})</li> + * <li>{@link #afterValueConsumed()} — signals back-pressure demand via + * {@code Subscription.request(1)}</li> + * <li>{@link #onClose()} — cancels the upstream subscription, drains + * the queue, and injects a {@link #COMPLETE} sentinel to unblock + * any pending {@code moveNext()}</li> + * </ul> + * + * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY}) + * absorbs minor timing jitter. Signals use blocking {@code put()} + * for normal delivery with a non-blocking {@code offer()} fallback + * when the publisher thread is interrupted — ensuring no items or + * terminal events are silently dropped.</p> + * + * @param <T> the element type + */ + private static final class FlowAsyncStream<T> extends AbstractAsyncStream<T> { + + /** + * Queue capacity for the push→pull bridge. With one-at-a-time + * demand ({@code request(1)} per consumed element), a well-behaved + * publisher will never enqueue more than one value at a time. + * A capacity of 2 accommodates the value + a racing terminal + * signal without blocking, while keeping memory minimal. + */ + private static final int QUEUE_CAPACITY = 2; + + private final AtomicReference<Flow.Subscription> subRef = new AtomicReference<>(); + + FlowAsyncStream() { + super(new LinkedBlockingQueue<>(QUEUE_CAPACITY)); + } + + /** + * Creates a new {@link Flow.Subscriber} wired to this stream's + * internal queue and lifecycle state. + */ + Flow.Subscriber<T> newSubscriber() { + return new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription s) { + // §2.5: reject duplicate subscriptions + if (!subRef.compareAndSet(null, s)) { + s.cancel(); + return; + } + // Double-check: if close() raced before subscription was set, + // cancel immediately to avoid a dangling upstream. + if (closed.get()) { + cancelSubscription(); + return; + } + s.request(1); + } + + @Override + public void onNext(T item) { + // §2.13: null items are spec violations + if (item == null) { + onError(new NullPointerException( + "Flow.Publisher onNext received null (Reactive Streams §2.13)")); + return; + } + if (closed.get()) return; + try { + queue.put(new ValueSignal(item)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (!queue.offer(new ValueSignal(item))) { + cancelSubscription(); + closed.set(true); + // Clear the queue to guarantee room for the terminal signal; + // without this, a full queue could leave the consumer blocked + // in take() with no signal to unblock it. + queue.clear(); + queue.offer(COMPLETE); + } + } + } + + @Override + public void onError(Throwable t) { + // §2.13 requires non-null, but defend against non-compliant publishers + Throwable cause = t != null ? t + : new NullPointerException("onError called with null (Reactive Streams §2.13 violation)"); + putTerminalSignal(new ErrorSignal(cause)); + } + + @Override + public void onComplete() { + putTerminalSignal(COMPLETE); + } + + /** + * Shared logic for {@code onError()} and {@code onComplete()}: + * atomically marks the upstream as closed, cancels the subscription, + * and delivers the terminal signal to the consumer queue. + */ + private void putTerminalSignal(Object signal) { + if (!closed.compareAndSet(false, true)) return; + cancelSubscription(); + try { + queue.put(signal); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + queue.offer(signal); Review Comment: In the interrupt fallback path for terminal signals, `queue.offer(signal)` can fail if the bounded queue is full. If that happens, the terminal signal is dropped, and a consumer can block indefinitely in `moveNext()` waiting for completion/error. Fix by ensuring the terminal signal is always delivered (e.g., if `offer` fails, clear/drain the queue and then offer the terminal signal, or inject a completion sentinel as a guaranteed unblock). ```suggestion // Fallback path for terminal signals must guarantee delivery. // If the queue is full, clear it to make room and re-offer. if (!queue.offer(signal)) { queue.clear(); queue.offer(signal); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
