This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 5ef02b0 Simplify consumeOne API (#18)
5ef02b0 is described below
commit 5ef02b0134f55301bd63aaded68010148981ae14
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 9 09:36:44 2022 +0100
Simplify consumeOne API (#18)
---
.../internal/adapter/AdaptedReactiveMessageConsumer.java | 13 ++++++++-----
.../pulsar/reactive/client/api/ReactiveMessageConsumer.java | 2 +-
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index 5240781..2a41ecd 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -57,11 +57,14 @@ class AdaptedReactiveMessageConsumer<T> implements
ReactiveMessageConsumer<T> {
}
@Override
- public <R> Mono<R> consumeOne(Function<Mono<Message<T>>,
Publisher<MessageResult<R>>> messageHandler) {
- return createReactiveConsumerAdapter().usingConsumer((consumer)
-> Mono.using(this::pinAcknowledgeScheduler,
- (pinnedAcknowledgeScheduler) ->
Mono.from(messageHandler.apply(readNextMessage(consumer))).delayUntil(
- (messageResult) ->
handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
-
.handle(this::handleMessageResult),
+ public <R> Mono<R> consumeOne(Function<Message<T>,
Publisher<MessageResult<R>>> messageHandler) {
+ return createReactiveConsumerAdapter().usingConsumer((consumer)
-> Mono.using(
+ this::pinAcknowledgeScheduler, (
+ pinnedAcknowledgeScheduler) ->
readNextMessage(consumer)
+
.flatMap((message) -> Mono.from(messageHandler.apply(message)))
+
.delayUntil((messageResult) -> handleAcknowledgement(consumer, messageResult,
+
pinnedAcknowledgeScheduler))
+
.handle(this::handleMessageResult),
Scheduler::dispose));
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 51a966d..6cc70d3 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -25,7 +25,7 @@ import reactor.core.publisher.Mono;
public interface ReactiveMessageConsumer<T> {
- <R> Mono<R> consumeOne(Function<Mono<Message<T>>,
Publisher<MessageResult<R>>> messageHandler);
+ <R> Mono<R> consumeOne(Function<Message<T>,
Publisher<MessageResult<R>>> messageHandler);
<R> Flux<R> consumeMany(Function<Flux<Message<T>>,
Publisher<MessageResult<R>>> messageHandler);