This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 555e4b2 Replace OneByOneMessagePipelineBuilder#concurrency by
OneByOneMessagePipelineBuilder#concurrency(int) (#39)
555e4b2 is described below
commit 555e4b2f1dbf31d3ccc1bc66481d46b5b2cb1180
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 10:50:53 2022 +0100
Replace OneByOneMessagePipelineBuilder#concurrency by
OneByOneMessagePipelineBuilder#concurrency(int) (#39)
---
.../reactive/client/adapter/ReactiveMessagePipelineE2ETest.java | 2 +-
.../pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java | 4 +---
.../client/internal/api/DefaultReactiveMessagePipelineBuilder.java | 5 -----
3 files changed, 2 insertions(+), 9 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
index a654a1a..5ea6d7b 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java
@@ -131,7 +131,7 @@ public class ReactiveMessagePipelineE2ETest {
return messageHandler;
});
if (messageOrderScenario !=
MessageOrderScenario.NO_PARALLEL) {
-
reactiveMessageHandlerBuilder.concurrent().concurrency(KEYS_COUNT).useKeyOrderedProcessing();
+
reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
}
try (ReactiveMessagePipeline reactiveMessagePipeline =
reactiveMessageHandlerBuilder.build().start()) {
boolean latchCompleted = latch.await(5,
TimeUnit.SECONDS);
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
index 86359a0..5d25555 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java
@@ -45,7 +45,7 @@ public interface ReactiveMessagePipelineBuilder<T> {
OneByOneMessagePipelineBuilder<T>
errorLogger(BiConsumer<Message<T>, Throwable> errorLogger);
- ConcurrentOneByOneMessagePipelineBuilder<T> concurrent();
+ ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int
concurrency);
}
@@ -55,8 +55,6 @@ public interface ReactiveMessagePipelineBuilder<T> {
ConcurrentOneByOneMessagePipelineBuilder<T>
groupOrderedProcessing(MessageGroupingFunction groupingFunction);
- ConcurrentOneByOneMessagePipelineBuilder<T> concurrency(int
concurrency);
-
ConcurrentOneByOneMessagePipelineBuilder<T> maxInflight(int
maxInflight);
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
index 59358ec..66b21a4 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipelineBuilder.java
@@ -101,11 +101,6 @@ class DefaultReactiveMessagePipelineBuilder<T>
return this;
}
- @Override
- public ConcurrentOneByOneMessagePipelineBuilder<T> concurrent() {
- return this;
- }
-
@Override
public ConcurrentOneByOneMessagePipelineBuilder<T>
useKeyOrderedProcessing() {
Objects.requireNonNull(KEY_ORDERED_GROUPING_FUNCTION,