This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 3831ea6 Rename methods to use naming convention of [action]One and
[action]Many (#14)
3831ea6 is described below
commit 3831ea64281050c42d7dd9da9b630bf7968b395d
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Oct 27 19:40:24 2022 +0300
Rename methods to use naming convention of [action]One and [action]Many
(#14)
* Rename consume* and read* methods
* Rename send methods to be sendOne and sendMany
---
.../reactive/client/adapter/ReactiveMessageConsumerE2ETest.java | 6 +++---
.../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java | 4 ++--
.../reactive/client/adapter/ReactiveMessageReaderE2ETest.java | 4 ++--
.../reactive/client/adapter/ReactiveMessageSenderE2ETest.java | 4 ++--
.../client/internal/adapter/AdaptedReactiveMessageConsumer.java | 6 +++---
.../client/internal/adapter/AdaptedReactiveMessageReader.java | 4 ++--
.../client/internal/adapter/AdaptedReactiveMessageSender.java | 4 ++--
.../apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java | 4 ++--
.../apache/pulsar/reactive/client/api/ReactiveMessageReader.java | 4 ++--
.../apache/pulsar/reactive/client/api/ReactiveMessageSender.java | 4 ++--
.../client/internal/api/DefaultReactiveMessagePipeline.java | 2 +-
11 files changed, 23 insertions(+), 23 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
index 833ce88..ed617de 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETest.java
@@ -48,12 +48,12 @@ public class ReactiveMessageConsumerE2ETest {
ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).topic(topicName).build();
- messageSender.send(Flux.range(1,
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.sendMany(Flux.range(1,
100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageConsumer<String> messageConsumer =
reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName).subscriptionName("sub").build();
List<String> messages = messageConsumer
- .consumeMessages((messageFlux) ->
messageFlux
+ .consumeMany((messageFlux) ->
messageFlux
.map((message) ->
MessageResult.acknowledge(message.getMessageId(), message.getValue())))
.take(Duration.ofSeconds(2)).collectList().block();
@@ -61,7 +61,7 @@ public class ReactiveMessageConsumerE2ETest {
// should have acknowledged all messages
List<Message<String>> remainingMessages =
messageConsumer
- .consumeMessages((messageFlux) ->
messageFlux.map(MessageResult::acknowledgeAndReturn))
+ .consumeMany((messageFlux) ->
messageFlux.map(MessageResult::acknowledgeAndReturn))
.take(Duration.ofSeconds(2)).collectList().block();
assertThat(remainingMessages).isEmpty();
}
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index 5b69a51..04c8c8d 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -68,7 +68,7 @@ public class ReactiveMessagePipelineE2ETest {
ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName).build();
- messageSender.send(Flux.range(1,
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.sendMany(Flux.range(1,
100).map(Object::toString).map(MessageSpec::of)).blockLast();
List<String> messages =
Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(100);
@@ -102,7 +102,7 @@ public class ReactiveMessagePipelineE2ETest {
List<MessageSpec<Integer>> messageSpecs =
generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
messageOrderScenario);
-
messageSender.send(Flux.fromIterable(messageSpecs)).blockLast();
+
messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();
ConcurrentMap<Integer, List<Integer>> messages = new
ConcurrentHashMap<>();
CountDownLatch latch = new
CountDownLatch(messageSpecs.size());
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
index a82fd3b..f6dafe7 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageReaderE2ETest.java
@@ -46,11 +46,11 @@ public class ReactiveMessageReaderE2ETest {
ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).topic(topicName).build();
- messageSender.send(Flux.range(1,
100).map(Object::toString).map(MessageSpec::of)).blockLast();
+ messageSender.sendMany(Flux.range(1,
100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName).build();
- List<String> messages =
messageReader.readMessages().map(Message::getValue).collectList().block();
+ List<String> messages =
messageReader.readMany().map(Message::getValue).collectList().block();
assertThat(messages).isEqualTo(Flux.range(1,
100).map(Object::toString).collectList().block());
}
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
index 078a0d6..c55379a 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java
@@ -63,7 +63,7 @@ public class ReactiveMessageSenderE2ETest {
ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName).maxInflight(1).build();
- MessageId messageId =
messageSender.send(MessageSpec.of("Hello world!")).block();
+ MessageId messageId =
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
@@ -86,7 +86,7 @@ public class ReactiveMessageSenderE2ETest {
ReactiveMessageSender<String> messageSender =
reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache).maxInflight(1).topic(topicName).build();
- MessageId messageId =
messageSender.send(MessageSpec.of("Hello world!")).block();
+ MessageId messageId =
messageSender.sendOne(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index cdd44e7..5240781 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -57,9 +57,9 @@ class AdaptedReactiveMessageConsumer<T> implements
ReactiveMessageConsumer<T> {
}
@Override
- public <R> Mono<R> consumeMessage(Function<Mono<Message<T>>,
Mono<MessageResult<R>>> messageHandler) {
+ public <R> Mono<R> consumeOne(Function<Mono<Message<T>>,
Publisher<MessageResult<R>>> messageHandler) {
return createReactiveConsumerAdapter().usingConsumer((consumer)
-> Mono.using(this::pinAcknowledgeScheduler,
- (pinnedAcknowledgeScheduler) ->
messageHandler.apply(readNextMessage(consumer)).delayUntil(
+ (pinnedAcknowledgeScheduler) ->
Mono.from(messageHandler.apply(readNextMessage(consumer))).delayUntil(
(messageResult) ->
handleAcknowledgement(consumer, messageResult, pinnedAcknowledgeScheduler))
.handle(this::handleMessageResult),
Scheduler::dispose));
@@ -206,7 +206,7 @@ class AdaptedReactiveMessageConsumer<T> implements
ReactiveMessageConsumer<T> {
}
@Override
- public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>,
Publisher<MessageResult<R>>> messageHandler) {
+ public <R> Flux<R> consumeMany(Function<Flux<Message<T>>,
Publisher<MessageResult<R>>> messageHandler) {
return
createReactiveConsumerAdapter().usingConsumerMany((consumer) -> Flux.using(
this::pinAcknowledgeScheduler, (
pinnedAcknowledgeScheduler) ->
Flux
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
index 76abb2e..e06a84e 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java
@@ -127,13 +127,13 @@ class AdaptedReactiveMessageReader<T> implements
ReactiveMessageReader<T> {
}
@Override
- public Mono<Message<T>> readMessage() {
+ public Mono<Message<T>> readOne() {
return createReactiveReaderAdapter(this.startAtSpec)
.usingReader((reader) ->
readNextMessage(reader, this.endOfStreamAction));
}
@Override
- public Flux<Message<T>> readMessages() {
+ public Flux<Message<T>> readMany() {
return
createReactiveReaderAdapter(this.startAtSpec).usingReaderMany((reader) -> {
Mono<Message<T>> messageMono = readNextMessage(reader,
this.endOfStreamAction);
if (this.endOfStreamAction ==
EndOfStreamAction.COMPLETE) {
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
index 038b885..0471b7c 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.java
@@ -179,7 +179,7 @@ class AdaptedReactiveMessageSender<T> implements
ReactiveMessageSender<T> {
}
@Override
- public Mono<MessageId> send(MessageSpec<T> messageSpec) {
+ public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
return createReactiveProducerAdapter().usingProducer((producer)
-> createMessageMono(messageSpec, producer));
}
@@ -192,7 +192,7 @@ class AdaptedReactiveMessageSender<T> implements
ReactiveMessageSender<T> {
}
@Override
- public Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs) {
+ public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs)
{
return
createReactiveProducerAdapter().usingProducerMany((producer) ->
// TODO: ensure that inner publishers are subscribed in order
so that message
// order is retained
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
index 312fa48..51a966d 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java
@@ -25,9 +25,9 @@ import reactor.core.publisher.Mono;
public interface ReactiveMessageConsumer<T> {
- <R> Mono<R> consumeMessage(Function<Mono<Message<T>>,
Mono<MessageResult<R>>> messageHandler);
+ <R> Mono<R> consumeOne(Function<Mono<Message<T>>,
Publisher<MessageResult<R>>> messageHandler);
- <R> Flux<R> consumeMessages(Function<Flux<Message<T>>,
Publisher<MessageResult<R>>> messageHandler);
+ <R> Flux<R> consumeMany(Function<Flux<Message<T>>,
Publisher<MessageResult<R>>> messageHandler);
/**
* Creates the Pulsar Consumer and immediately closes it. This is
useful for creating
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
index 9392b26..6b76334 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageReader.java
@@ -22,8 +22,8 @@ import reactor.core.publisher.Mono;
public interface ReactiveMessageReader<T> {
- Mono<Message<T>> readMessage();
+ Mono<Message<T>> readOne();
- Flux<Message<T>> readMessages();
+ Flux<Message<T>> readMany();
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
index 50640dc..2d5fb83 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSender.java
@@ -28,7 +28,7 @@ public interface ReactiveMessageSender<T> {
* @param messageSpec the spec of the message to send
* @return a publisher that will emit one message id and complete
*/
- Mono<MessageId> send(MessageSpec<T> messageSpec);
+ Mono<MessageId> sendOne(MessageSpec<T> messageSpec);
/**
* Send multiple messages and get the associated message ids in the
same order as the
@@ -37,6 +37,6 @@ public interface ReactiveMessageSender<T> {
* @return a publisher that will emit a message id per message
successfully sent in
* the order that they have been sent
*/
- Flux<MessageId> send(Publisher<MessageSpec<T>> messageSpecs);
+ Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs);
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index bf7d265..c094628 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -76,7 +76,7 @@ class DefaultReactiveMessagePipeline<T> implements
ReactiveMessagePipeline {
this.groupingFunction = groupingFunction;
this.concurrency = concurrency;
this.maxInflight = maxInflight;
- this.pipeline =
messageConsumer.consumeMessages(this::createMessageConsumer).then().transform(transformer)
+ this.pipeline =
messageConsumer.consumeMany(this::createMessageConsumer).then().transform(transformer)
.transform(this::decoratePipeline);
}