This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ef02b0  Simplify consumeOne API (#18)
5ef02b0 is described below

commit 5ef02b0134f55301bd63aaded68010148981ae14
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 9 09:36:44 2022 +0100

    Simplify consumeOne API (#18)
---
 .../internal/adapter/AdaptedReactiveMessageConsumer.java    | 13 ++++++++-----
 .../pulsar/reactive/client/api/ReactiveMessageConsumer.java |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index 5240781..2a41ecd 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -57,11 +57,14 @@ class AdaptedReactiveMessageConsumer<T> implements 
ReactiveMessageConsumer<T> {
        }
 
        @Override
-       public <R> Mono<R> consumeOne(Function<Mono<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler) {
-               return createReactiveConsumerAdapter().usingConsumer((consumer) 
-> Mono.using(this::pinAcknowledgeScheduler,
-                               (pinnedAcknowledgeScheduler) -> 
Mono.from(messageHandler.apply(readNextMessage(consumer))).delayUntil(
-                                               (messageResult) -> 
handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
-                                               
.handle(this::handleMessageResult),
+       public <R> Mono<R> consumeOne(Function<Message<T>, 
Publisher<MessageResult<R>>> messageHandler) {
+               return createReactiveConsumerAdapter().usingConsumer((consumer) 
-> Mono.using(
+                               this::pinAcknowledgeScheduler, (
+                                               pinnedAcknowledgeScheduler) -> 
readNextMessage(consumer)
+                                                               
.flatMap((message) -> Mono.from(messageHandler.apply(message)))
+                                                               
.delayUntil((messageResult) -> handleAcknowledgement(consumer, messageResult,
+                                                                               
pinnedAcknowledgeScheduler))
+                                                               
.handle(this::handleMessageResult),
                                Scheduler::dispose));
        }
 
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 51a966d..6cc70d3 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -25,7 +25,7 @@ import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageConsumer<T> {
 
-       <R> Mono<R> consumeOne(Function<Mono<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
+       <R> Mono<R> consumeOne(Function<Message<T>, 
Publisher<MessageResult<R>>> messageHandler);
 
        <R> Flux<R> consumeMany(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
 

Reply via email to