This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 6170ebe  Add untilStarted & untilStopped to ReactiveMessagePipeline 
(#214)
6170ebe is described below

commit 6170ebe7f3b6d7ee75071d9b6fb3d6a750a622e7
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 20 23:57:15 2025 +0300

    Add untilStarted & untilStopped to ReactiveMessagePipeline (#214)
---
 gradle/libs.versions.toml                          |  1 +
 pulsar-client-reactive-adapter/build.gradle        |  1 +
 .../adapter/ReactiveMessagePipelineE2ETests.java   | 35 ++++++++++++
 .../client/adapter/SingletonPulsarContainer.java   |  7 +++
 .../internal/adapter/ReactiveConsumerAdapter.java  | 15 ++++--
 .../client/api/ReactiveMessagePipeline.java        | 59 ++++++++++++++++++--
 .../api/DefaultReactiveMessagePipeline.java        | 63 +++++++++++++++++++++-
 .../api/InternalConsumerListener.java}             | 37 ++++++-------
 .../client/api/ReactiveMessagePipelineTests.java   | 43 ++++++++++++++-
 9 files changed, 228 insertions(+), 33 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 05282e7..2fd4c57 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -52,6 +52,7 @@ log4j-slf4j2-impl = { module = 
"org.apache.logging.log4j:log4j-slf4j2-impl", ver
 mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
 pulsar-client-api = { module = "org.apache.pulsar:pulsar-client-api", 
version.ref = "pulsar" }
 pulsar-client-shaded = { module = "org.apache.pulsar:pulsar-client", 
version.ref = "pulsar" }
+pulsar-client-all = { module = "org.apache.pulsar:pulsar-client-all", 
version.ref = "pulsar" }
 rat-gradle = { module = "org.nosphere.apache:creadur-rat-gradle", version.ref 
= "rat-gradle" }
 reactor-core = { module = "io.projectreactor:reactor-core", version.ref = 
"reactor" }
 reactor-test = { module = "io.projectreactor:reactor-test", version.ref = 
"reactor" }
diff --git a/pulsar-client-reactive-adapter/build.gradle 
b/pulsar-client-reactive-adapter/build.gradle
index 984775a..37e6774 100644
--- a/pulsar-client-reactive-adapter/build.gradle
+++ b/pulsar-client-reactive-adapter/build.gradle
@@ -35,6 +35,7 @@ dependencies {
        testImplementation libs.bundles.log4j
        testImplementation libs.mockito.core
 
+       intTestImplementation libs.pulsar.client.all
        intTestImplementation 
project(':pulsar-client-reactive-producer-cache-caffeine')
        intTestImplementation project(path: 
':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration: 
'shadow')
        intTestImplementation libs.junit.jupiter
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
index 6c8dada..a93988d 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
@@ -36,8 +36,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
@@ -94,6 +97,38 @@ class ReactiveMessagePipelineE2ETests {
                }
        }
 
+       @Test
+       void shouldSupportWaitingForConsumingToStartAndStop() throws Exception {
+               try (PulsarClient pulsarClient = 
SingletonPulsarContainer.createPulsarClient();
+                               PulsarAdmin pulsarAdmin = 
SingletonPulsarContainer.createPulsarAdmin()) {
+                       String topicName = "test" + UUID.randomUUID();
+                       ReactivePulsarClient reactivePulsarClient = 
AdaptedReactivePulsarClientFactory.create(pulsarClient);
+                       ReactiveMessagePipeline pipeline = 
reactivePulsarClient.messageConsumer(Schema.STRING)
+                               .subscriptionName("sub")
+                               .topic(topicName)
+                               .build()
+                               .messagePipeline()
+                               .messageHandler((message) -> Mono.empty())
+                               .build()
+                               .start();
+
+                       // wait for consuming to start
+                       pipeline.untilStarted().block(Duration.ofSeconds(5));
+                       // there should be an existing subscription
+                       List<String> subscriptions = 
pulsarAdmin.topics().getSubscriptions(topicName);
+                       assertThat(subscriptions).as("subscription should be 
created").contains("sub");
+
+                       // stop the pipeline
+                       pipeline.stop();
+                       // and wait for it to stop
+                       pipeline.untilStopped().block(Duration.ofSeconds(5));
+                       // there should be no consumers
+                       TopicStats topicStats = 
pulsarAdmin.topics().getStats(topicName);
+                       SubscriptionStats subStats = 
topicStats.getSubscriptions().get("sub");
+                       assertThat(subStats.getConsumers()).isEmpty();
+               }
+       }
+
        @ParameterizedTest
        @EnumSource(MessageOrderScenario.class)
        void shouldRetainMessageOrder(MessageOrderScenario 
messageOrderScenario) throws Exception {
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
index 431e448..d53e185 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.reactive.client.adapter;
 
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.testcontainers.containers.PulsarContainer;
@@ -44,6 +45,12 @@ final class SingletonPulsarContainer {
                        .build();
        }
 
+       static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+               return PulsarAdmin.builder()
+                       
.serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+                       .build();
+       }
+
        static DockerImageName getPulsarImage() {
                return DockerImageName.parse("apachepulsar/pulsar:4.0.4");
        }
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
index 8e89a09..e66ba28 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
@@ -45,12 +46,20 @@ class ReactiveConsumerAdapter<T> {
        }
 
        private Mono<Consumer<T>> createConsumerMono() {
-               return AdapterImplementationFactory.adaptPulsarFuture(
-                               () -> 
this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync());
+               return Mono.deferContextual((contextView) -> 
AdapterImplementationFactory
+                       .adaptPulsarFuture(
+                                       () -> 
this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync())
+                       .doOnSuccess((consumer) -> 
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+                               .ifPresent((listener) -> 
listener.onConsumerCreated(consumer))));
        }
 
        private Mono<Void> closeConsumer(Consumer<?> consumer) {
-               return Mono.fromFuture(consumer::closeAsync).doOnSuccess((__) 
-> this.LOG.info("Consumer closed {}", consumer));
+               return Mono.deferContextual((contextView) -> 
Mono.fromFuture(consumer::closeAsync).doFinally((signalType) -> {
+                       this.LOG.info("Consumer closed {}", consumer);
+                       
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+                               .ifPresent((listener) -> 
listener.onConsumerClosed(consumer));
+               }));
+
        }
 
        <R> Mono<R> usingConsumer(Function<Consumer<T>, Mono<R>> 
usingConsumerAction) {
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
index 47804a5..9613293 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
@@ -19,20 +19,26 @@
 
 package org.apache.pulsar.reactive.client.api;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Reactive message pipeline interface.
  */
 public interface ReactiveMessagePipeline extends AutoCloseable {
 
        /**
-        * Starts the reactive pipeline.
-        * @return the pipeline
+        * Starts the reactive pipeline asynchronously.
+        * @return the pipeline instance
+        * @see #untilStarted() For returning a reactive publisher (Mono) that 
completes after
+        * the pipeline has actually started.
         */
        ReactiveMessagePipeline start();
 
        /**
-        * Stops the reactive pipeline.
-        * @return the reactive pipeline
+        * Stops the reactive pipeline asynchronously.
+        * @return the pipeline instance
+        * @see #untilStopped() For returning a reactive publisher (Mono) that 
completes after
+        * the pipeline has actually stopped.
         */
        ReactiveMessagePipeline stop();
 
@@ -43,11 +49,54 @@ public interface ReactiveMessagePipeline extends 
AutoCloseable {
        boolean isRunning();
 
        /**
-        * Closes the reactive pipeline.
+        * Closes the reactive pipeline asynchronously without waiting for 
shutdown
+        * completion.
         * @throws Exception if an error occurs
         */
        default void close() throws Exception {
                stop();
        }
 
+       /**
+        * <p>
+        * Returns a reactive publisher (Mono) that completes after the 
pipeline has
+        * successfully subscribed to the input topic(s) and started consuming 
messages for
+        * the first time after pipeline creation. This method is not intended 
to be used
+        * after a pipeline restarts following failure. Use this method to wait 
for consumer
+        * and Pulsar subscription creation. This helps avoid race conditions 
when sending
+        * messages immediately after the pipeline starts.
+        * </p>
+        * <p>
+        * The {@link #start()} method must be called before invoking this 
method.
+        * </p>
+        * <p>
+        * To wait for the operation to complete synchronously, it is necessary 
to call
+        * {@link Mono#block()} on the returned Mono.
+        * </p>
+        * @return a Mono that completes after the pipeline has created its 
underlying Pulsar
+        * consumer
+        */
+       default Mono<Void> untilStarted() {
+               return Mono.empty();
+       }
+
+       /**
+        * <p>
+        * Returns a reactive publisher (Mono) that completes after the 
pipeline has closed
+        * the underlying Pulsar consumer and stopped consuming new messages.
+        * </p>
+        * <p>
+        * The {@link #stop()} method must be called before invoking this 
method.
+        * </p>
+        * <p>
+        * To wait for the operation to complete synchronously, it is necessary 
to call
+        * {@link Mono#block()} on the returned Mono.
+        * </p>
+        * @return a Mono that completes when the pipeline has closed the 
underlying Pulsar
+        * consumer
+        */
+       default Mono<Void> untilStopped() {
+               return Mono.empty();
+       }
+
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index cd94f19..eefdcf2 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.reactive.client.internal.api;
 
 import java.time.Duration;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -67,6 +68,10 @@ class DefaultReactiveMessagePipeline<T> implements 
ReactiveMessagePipeline {
 
        private final MessageGroupingFunction groupingFunction;
 
+       private final AtomicReference<InternalConsumerListenerImpl> 
consumerListener = new AtomicReference<>();
+
+       private final AtomicReference<CompletableFuture<Void>> 
pipelineStoppedFuture = new AtomicReference<>();
+
        DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> 
messageConsumer,
                        Function<Message<T>, Publisher<Void>> messageHandler, 
BiConsumer<Message<T>, Throwable> errorLogger,
                        Retry pipelineRetrySpec, Duration handlingTimeout, 
Function<Mono<Void>, Publisher<Void>> transformer,
@@ -83,7 +88,14 @@ class DefaultReactiveMessagePipeline<T> implements 
ReactiveMessagePipeline {
                this.pipeline = 
messageConsumer.consumeMany(this::createMessageConsumer)
                        .then()
                        .transform(transformer)
-                       .transform(this::decoratePipeline);
+                       .transform(this::decoratePipeline)
+                       .doFinally((signalType) -> {
+                               CompletableFuture<Void> f = 
this.pipelineStoppedFuture.get();
+                               if (f != null) {
+                                       f.complete(null);
+                               }
+                       })
+                       .doFirst(() -> this.pipelineStoppedFuture.set(new 
CompletableFuture<>()));
        }
 
        private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
@@ -168,14 +180,26 @@ class DefaultReactiveMessagePipeline<T> implements 
ReactiveMessagePipeline {
                if (this.killSwitch.get() != null) {
                        throw new IllegalStateException("Message handler is 
already running.");
                }
-               Disposable disposable = this.pipeline.subscribe(null, 
this::logError, this::logUnexpectedCompletion);
+               InternalConsumerListenerImpl consumerListener = new 
InternalConsumerListenerImpl();
+               Disposable disposable = 
this.pipeline.contextWrite(Context.of(InternalConsumerListener.class, 
consumerListener))
+                       .subscribe(null, this::logError, 
this::logUnexpectedCompletion);
                if (!this.killSwitch.compareAndSet(null, disposable)) {
                        disposable.dispose();
                        throw new IllegalStateException("Message handler was 
already running.");
                }
+               this.consumerListener.set(consumerListener);
                return this;
        }
 
+       @Override
+       public Mono<Void> untilStarted() {
+               if (!isRunning()) {
+                       throw new IllegalStateException("Pipeline isn't 
running. Call start first.");
+               }
+               InternalConsumerListenerImpl internalConsumerListener = 
this.consumerListener.get();
+               return internalConsumerListener.waitForConsumerCreated();
+       }
+
        private void logError(Throwable throwable) {
                LOG.error("ReactiveMessageHandler was unexpectedly 
terminated.", throwable);
        }
@@ -195,9 +219,44 @@ class DefaultReactiveMessagePipeline<T> implements 
ReactiveMessagePipeline {
                return this;
        }
 
+       @Override
+       public Mono<Void> untilStopped() {
+               if (isRunning()) {
+                       throw new IllegalStateException("Pipeline is running. 
Call stop first.");
+               }
+               CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
+               if (f != null) {
+                       return Mono.fromFuture(f, true);
+               }
+               else {
+                       return Mono.empty();
+               }
+       }
+
        @Override
        public boolean isRunning() {
                return this.killSwitch.get() != null;
        }
 
+       private static final class InternalConsumerListenerImpl implements 
InternalConsumerListener {
+
+               private final CompletableFuture<Void> createdFuture;
+
+               private InternalConsumerListenerImpl() {
+                       this.createdFuture = new CompletableFuture<>();
+               }
+
+               @Override
+               public void onConsumerCreated(Object nativeConsumer) {
+                       if (!this.createdFuture.isDone()) {
+                               this.createdFuture.complete(null);
+                       }
+               }
+
+               Mono<Void> waitForConsumerCreated() {
+                       return Mono.fromFuture(this.createdFuture, true);
+               }
+
+       }
+
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
similarity index 51%
copy from 
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
copy to 
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
index 47804a5..8ee8f4b 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
@@ -17,37 +17,30 @@
  * under the License.
  */
 
-package org.apache.pulsar.reactive.client.api;
+package org.apache.pulsar.reactive.client.internal.api;
 
 /**
- * Reactive message pipeline interface.
+ * Internal interface to signal the creation and closing of a native consumer. 
This is not
+ * to be intended to be used by applications.
  */
-public interface ReactiveMessagePipeline extends AutoCloseable {
+public interface InternalConsumerListener {
 
        /**
-        * Starts the reactive pipeline.
-        * @return the pipeline
+        * Called when a new native consumer is created. This is called each 
time a new
+        * consumer is created initially or as a result of a reactive pipeline 
retry.
+        * @param nativeConsumer the native consumer instance
         */
-       ReactiveMessagePipeline start();
-
-       /**
-        * Stops the reactive pipeline.
-        * @return the reactive pipeline
-        */
-       ReactiveMessagePipeline stop();
-
-       /**
-        * Gets whether the reactive pipeline is running.
-        * @return true if the reactive pipeline is running
-        */
-       boolean isRunning();
+       default void onConsumerCreated(Object nativeConsumer) {
+               // no-op
+       }
 
        /**
-        * Closes the reactive pipeline.
-        * @throws Exception if an error occurs
+        * Called when a native consumer is closed. This is called each time a 
consumer is
+        * closed as a result of a reactive pipeline retry or when the pipeline 
is closed.
+        * @param nativeConsumer the native consumer instance
         */
-       default void close() throws Exception {
-               stop();
+       default void onConsumerClosed(Object nativeConsumer) {
+               // no-op
        }
 
 }
diff --git 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
index 3415481..f330a54 100644
--- 
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
+++ 
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -38,6 +39,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -49,6 +51,7 @@ import reactor.util.retry.Retry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class ReactiveMessagePipelineTests {
 
@@ -164,6 +167,27 @@ class ReactiveMessagePipelineTests {
 
        }
 
+       @Test
+       void pipelineUntilStartedAndStopped() throws Exception {
+               int numMessages = 10;
+               Duration subscriptionDelay = Duration.ofSeconds(1);
+               TestConsumer testConsumer = new TestConsumer(numMessages, 
subscriptionDelay);
+               CountDownLatch latch = new CountDownLatch(numMessages);
+               Function<Message<String>, Publisher<Void>> messageHandler = (
+                               message) -> Mono.empty().then().doFinally((__) 
-> latch.countDown());
+               ReactiveMessagePipeline pipeline = 
testConsumer.messagePipeline().messageHandler(messageHandler).build();
+               pipeline.start();
+               // timeout should occur since subscription delay is 1 second in 
TestConsumer
+               assertThatThrownBy(() -> 
pipeline.untilStarted().block(Duration.ofMillis(100)))
+                       .isInstanceOf(IllegalStateException.class)
+                       .hasCauseInstanceOf(TimeoutException.class);
+               // now wait for consuming to start
+               pipeline.untilStarted().block(Duration.ofSeconds(2));
+               assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+               // now wait for consuming to stop
+               pipeline.stop().untilStopped().block(Duration.ofSeconds(1));
+       }
+
        @Test
        void streamingHandler() throws Exception {
                int numMessages = 10;
@@ -480,10 +504,17 @@ class ReactiveMessagePipelineTests {
 
                private final int numMessages;
 
+               private final Duration subscriptionDelay;
+
                private volatile Runnable finishedCallback;
 
                TestConsumer(int numMessages) {
+                       this(numMessages, null);
+               }
+
+               TestConsumer(int numMessages, Duration subscriptionDelay) {
                        this.numMessages = numMessages;
+                       this.subscriptionDelay = subscriptionDelay;
                }
 
                private final List<MessageId> acknowledgedMessages = new 
CopyOnWriteArrayList<>();
@@ -496,7 +527,10 @@ class ReactiveMessagePipelineTests {
 
                @Override
                public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, 
Publisher<MessageResult<R>>> messageHandler) {
-                       return Flux.defer(() -> {
+                       Flux<R> flux = Flux.deferContextual((contextView) -> {
+                               Optional<InternalConsumerListener> 
internalConsumerListener = contextView
+                                       
.getOrEmpty(InternalConsumerListener.class);
+                               internalConsumerListener.ifPresent((listener) 
-> listener.onConsumerCreated(this));
                                Flux<Message<String>> messages = Flux.range(0, 
this.numMessages)
                                        .map(Object::toString)
                                        .map(TestMessage::new);
@@ -511,8 +545,15 @@ class ReactiveMessagePipelineTests {
                                        if (this.finishedCallback != null) {
                                                this.finishedCallback.run();
                                        }
+                                       
internalConsumerListener.ifPresent((listener) -> 
listener.onConsumerClosed(this));
                                });
                        });
+                       if (this.subscriptionDelay != null) {
+                               return 
flux.delaySubscription(this.subscriptionDelay);
+                       }
+                       else {
+                               return flux;
+                       }
                }
 
                List<MessageId> getAcknowledgedMessages() {

Reply via email to