This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 3831ea6  Rename methods to use naming convention of [action]One and 
[action]Many (#14)
3831ea6 is described below

commit 3831ea64281050c42d7dd9da9b630bf7968b395d
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Oct 27 19:40:24 2022 +0300

    Rename methods to use naming convention of [action]One and [action]Many 
(#14)
    
    * Rename consume* and read* methods
    
    * Rename send methods to be sendOne and sendMany
---
 .../reactive/client/adapter/ReactiveMessageConsumerE2ETest.java     | 6 +++---
 .../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java     | 4 ++--
 .../reactive/client/adapter/ReactiveMessageReaderE2ETest.java       | 4 ++--
 .../reactive/client/adapter/ReactiveMessageSenderE2ETest.java       | 4 ++--
 .../client/internal/adapter/AdaptedReactiveMessageConsumer.java     | 6 +++---
 .../client/internal/adapter/AdaptedReactiveMessageReader.java       | 4 ++--
 .../client/internal/adapter/AdaptedReactiveMessageSender.java       | 4 ++--
 .../apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java  | 4 ++--
 .../apache/pulsar/reactive/client/api/ReactiveMessageReader.java    | 4 ++--
 .../apache/pulsar/reactive/client/api/ReactiveMessageSender.java    | 4 ++--
 .../client/internal/api/DefaultReactiveMessagePipeline.java         | 2 +-
 11 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 833ce88..ed617de 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,12 +48,12 @@ public class ReactiveMessageConsumerE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.cache(producerCache).topic(topicName).build();
-                       messageSender.send(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+                       messageSender.sendMany(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
                        ReactiveMessageConsumer<String> messageConsumer = 
reactivePulsarClient.messageConsumer(Schema.STRING)
                                        
.topic(topicName).subscriptionName("sub").build();
                        List<String> messages = messageConsumer
-                                       .consumeMessages((messageFlux) -> 
messageFlux
+                                       .consumeMany((messageFlux) -> 
messageFlux
                                                        .map((message) -> 
MessageResult.acknowledge(message.getMessageId(), message.getValue())))
                                        
.take(Duration.ofSeconds(2)).collectList().block();
 
@@ -61,7 +61,7 @@ public class ReactiveMessageConsumerE2ETest {
 
                        // should have acknowledged all messages
                        List<Message<String>> remainingMessages = 
messageConsumer
-                                       .consumeMessages((messageFlux) -> 
messageFlux.map(MessageResult::acknowledgeAndReturn))
+                                       .consumeMany((messageFlux) -> 
messageFlux.map(MessageResult::acknowledgeAndReturn))
                                        
.take(Duration.ofSeconds(2)).collectList().block();
                        assertThat(remainingMessages).isEmpty();
                }
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 5b69a51..04c8c8d 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        .topic(topicName).build();
-                       messageSender.send(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+                       messageSender.sendMany(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
                        List<String> messages = 
Collections.synchronizedList(new ArrayList<>());
                        CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
                        List<MessageSpec<Integer>> messageSpecs = 
generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
                                        messageOrderScenario);
 
-                       
messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
+                       
messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();
 
                        ConcurrentMap<Integer, List<Integer>> messages = new 
ConcurrentHashMap<>();
                        CountDownLatch latch = new 
CountDownLatch(messageSpecs.size());
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index a82fd3b..f6dafe7 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,11 +46,11 @@ public class ReactiveMessageReaderE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.cache(producerCache).topic(topicName).build();
-                       messageSender.send(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+                       messageSender.sendMany(Flux.range(1, 
100).map(Object::toString).map(MessageSpec::of)).blockLast();
 
                        ReactiveMessageReader<String> messageReader = 
reactivePulsarClient.messageReader(Schema.STRING)
                                        .topic(topicName).build();
-                       List<String> messages = 
messageReader.readMessages().map(Message::getValue).collectList().block();
+                       List<String> messages = 
messageReader.readMany().map(Message::getValue).collectList().block();
 
                        assertThat(messages).isEqualTo(Flux.range(1, 
100).map(Object::toString).collectList().block());
                }
diff --git 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 078a0d6..c55379a 100644
--- 
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++ 
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -63,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.topic(topicName).maxInflight(1).build();
-                       MessageId messageId = 
messageSender.send(MessageSpec.of("Hello world!")).block();
+                       MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
                        assertThat(messageId).isNotNull();
 
                        Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
@@ -86,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
 
                        ReactiveMessageSender<String> messageSender = 
reactivePulsarClient.messageSender(Schema.STRING)
                                        
.cache(producerCache).maxInflight(1).topic(topicName).build();
-                       MessageId messageId = 
messageSender.send(MessageSpec.of("Hello world!")).block();
+                       MessageId messageId = 
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
                        assertThat(messageId).isNotNull();
 
                        Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index cdd44e7..5240781 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -57,9 +57,9 @@ class AdaptedReactiveMessageConsumer<T> implements 
ReactiveMessageConsumer<T> {
        }
 
        @Override
-       public <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, 
Mono<MessageResult<R>>> messageHandler) {
+       public <R> Mono<R> consumeOne(Function<Mono<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler) {
                return createReactiveConsumerAdapter().usingConsumer((consumer) 
-> Mono.using(this::pinAcknowledgeScheduler,
-                               (pinnedAcknowledgeScheduler) -> 
messageHandler.apply(readNextMessage(consumer)).delayUntil(
+                               (pinnedAcknowledgeScheduler) -> 
Mono.from(messageHandler.apply(readNextMessage(consumer))).delayUntil(
                                                (messageResult) -> 
handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
                                                
.handle(this::handleMessageResult),
                                Scheduler::dispose));
@@ -206,7 +206,7 @@ class AdaptedReactiveMessageConsumer<T> implements 
ReactiveMessageConsumer<T> {
        }
 
        @Override
-       public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler) {
+       public <R> Flux<R> consumeMany(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler) {
                return 
createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
                                this::pinAcknowledgeScheduler, (
                                                pinnedAcknowledgeScheduler) -> 
Flux
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
index 76abb2e..e06a84e 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
@@ -127,13 +127,13 @@ class AdaptedReactiveMessageReader<T> implements 
ReactiveMessageReader<T> {
        }
 
        @Override
-       public Mono<Message<T>> readMessage() {
+       public Mono<Message<T>> readOne() {
                return createReactiveReaderAdapter(this.startAtSpec)
                                .usingReader((reader) -> 
readNextMessage(reader, this.endOfStreamAction));
        }
 
        @Override
-       public Flux<Message<T>> readMessages() {
+       public Flux<Message<T>> readMany() {
                return 
createReactiveReaderAdapter(this.startAtSpec).usingReaderMany((reader) -> {
                        Mono<Message<T>> messageMono = readNextMessage(reader, 
this.endOfStreamAction);
                        if (this.endOfStreamAction == 
EndOfStreamAction.COMPLETE) {
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 038b885..0471b7c 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -179,7 +179,7 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
        }
 
        @Override
-       public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+       public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
                return createReactiveProducerAdapter().usingProducer((producer) 
-> createMessageMono(messageSpec, producer));
        }
 
@@ -192,7 +192,7 @@ class AdaptedReactiveMessageSender<T> implements 
ReactiveMessageSender<T> {
        }
 
        @Override
-       public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
+       public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs) 
{
                return 
createReactiveProducerAdapter().usingProducerMany((producer) ->
                // TODO: ensure that inner publishers are subscribed in order 
so that message
                // order is retained
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 312fa48..51a966d 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -25,9 +25,9 @@ import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageConsumer<T> {
 
-       <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, 
Mono<MessageResult<R>>> messageHandler);
+       <R> Mono<R> consumeOne(Function<Mono<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
 
-       <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
+       <R> Flux<R> consumeMany(Function<Flux<Message<T>>, 
Publisher<MessageResult<R>>> messageHandler);
 
        /**
         * Creates the Pulsar Consumer and immediately closes it. This is 
useful for creating
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
index 9392b26..6b76334 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
@@ -22,8 +22,8 @@ import reactor.core.publisher.Mono;
 
 public interface ReactiveMessageReader<T> {
 
-       Mono<Message<T>> readMessage();
+       Mono<Message<T>> readOne();
 
-       Flux<Message<T>> readMessages();
+       Flux<Message<T>> readMany();
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 50640dc..2d5fb83 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -28,7 +28,7 @@ public interface ReactiveMessageSender<T> {
         * @param messageSpec the spec of the message to send
         * @return a publisher that will emit one message id and complete
         */
-       Mono<MessageId> send(MessageSpec<T> messageSpec);
+       Mono<MessageId> sendOne(MessageSpec<T> messageSpec);
 
        /**
         * Send multiple messages and get the associated message ids in the 
same order as the
@@ -37,6 +37,6 @@ public interface ReactiveMessageSender<T> {
         * @return a publisher that will emit a message id per message 
successfully sent in
         * the order that they have been sent
         */
-       Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
+       Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
 
 }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index bf7d265..c094628 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -76,7 +76,7 @@ class DefaultReactiveMessagePipeline<T> implements 
ReactiveMessagePipeline {
                this.groupingFunction = groupingFunction;
                this.concurrency = concurrency;
                this.maxInflight = maxInflight;
-               this.pipeline = 
messageConsumer.consumeMessages(this::createMessageConsumer).then().transform(transformer)
+               this.pipeline = 
messageConsumer.consumeMany(this::createMessageConsumer).then().transform(transformer)
                                .transform(this::decoratePipeline);
        }
 

Reply via email to