This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 7edb245 Add clone method to consumer and sender builders (#34)
7edb245 is described below
commit 7edb245627d28a9ddaf15c63bf9f5a28800a12a9
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 10:48:27 2022 +0100
Add clone method to consumer and sender builders (#34)
Co-authored-by: Lari Hotari <[email protected]>
---
.../AdaptedReactiveMessageConsumerBuilder.java | 15 ++++++++++++++-
.../adapter/AdaptedReactiveMessageReaderBuilder.java | 11 ++++++++---
.../adapter/AdaptedReactiveMessageSenderBuilder.java | 20 +++++++++++++++++++-
.../client/api/ReactiveMessageConsumerBuilder.java | 6 ++++++
.../client/api/ReactiveMessageSenderBuilder.java | 6 ++++++
5 files changed, 53 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
index ed59a3c..7df510f 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
@@ -27,12 +27,19 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements
ReactiveMessageConsume
private final ReactiveConsumerAdapterFactory
reactiveConsumerAdapterFactory;
- private final MutableReactiveMessageConsumerSpec consumerSpec = new
MutableReactiveMessageConsumerSpec();
+ private final MutableReactiveMessageConsumerSpec consumerSpec;
AdaptedReactiveMessageConsumerBuilder(Schema<T> schema,
ReactiveConsumerAdapterFactory
reactiveConsumerAdapterFactory) {
+ this(schema, reactiveConsumerAdapterFactory, new
MutableReactiveMessageConsumerSpec());
+ }
+
+ private AdaptedReactiveMessageConsumerBuilder(Schema<T> schema,
+ ReactiveConsumerAdapterFactory
reactiveConsumerAdapterFactory,
+ MutableReactiveMessageConsumerSpec consumerSpec) {
this.schema = schema;
this.reactiveConsumerAdapterFactory =
reactiveConsumerAdapterFactory;
+ this.consumerSpec = consumerSpec;
}
@Override
@@ -40,6 +47,12 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements
ReactiveMessageConsume
return this.consumerSpec;
}
+ @Override
+ public ReactiveMessageConsumerBuilder<T> clone() {
+ return new AdaptedReactiveMessageConsumerBuilder<>(this.schema,
this.reactiveConsumerAdapterFactory,
+ new
MutableReactiveMessageConsumerSpec(this.consumerSpec));
+ }
+
@Override
public ReactiveMessageConsumer<T> build() {
return new
AdaptedReactiveMessageConsumer<T>(this.reactiveConsumerAdapterFactory,
this.schema,
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
index 252caae..fc568a3 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
@@ -29,15 +29,21 @@ class AdaptedReactiveMessageReaderBuilder<T> implements
ReactiveMessageReaderBui
private final Schema<T> schema;
- private MutableReactiveMessageReaderSpec readerSpec = new
MutableReactiveMessageReaderSpec();
+ private final MutableReactiveMessageReaderSpec readerSpec;
private StartAtSpec startAtSpec = StartAtSpec.ofEarliest();
private EndOfStreamAction endOfStreamAction =
EndOfStreamAction.COMPLETE;
AdaptedReactiveMessageReaderBuilder(Schema<T> schema,
ReactiveReaderAdapterFactory reactiveReaderAdapterFactory) {
+ this(schema, reactiveReaderAdapterFactory, new
MutableReactiveMessageReaderSpec());
+ }
+
+ private AdaptedReactiveMessageReaderBuilder(Schema<T> schema,
+ ReactiveReaderAdapterFactory
reactiveReaderAdapterFactory, MutableReactiveMessageReaderSpec readerSpec) {
this.reactiveReaderAdapterFactory =
reactiveReaderAdapterFactory;
this.schema = schema;
+ this.readerSpec = readerSpec;
}
@Override
@@ -60,8 +66,7 @@ class AdaptedReactiveMessageReaderBuilder<T> implements
ReactiveMessageReaderBui
@Override
public ReactiveMessageReaderBuilder<T> clone() {
AdaptedReactiveMessageReaderBuilder<T> cloned = new
AdaptedReactiveMessageReaderBuilder<>(this.schema,
- this.reactiveReaderAdapterFactory);
- cloned.readerSpec = new
MutableReactiveMessageReaderSpec(this.readerSpec);
+ this.reactiveReaderAdapterFactory, new
MutableReactiveMessageReaderSpec(this.readerSpec));
cloned.startAtSpec = this.startAtSpec;
cloned.endOfStreamAction = this.endOfStreamAction;
return this;
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index c060cae..29c41af 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -37,7 +37,7 @@ class AdaptedReactiveMessageSenderBuilder<T> implements
ReactiveMessageSenderBui
private final ReactiveProducerAdapterFactory
reactiveProducerAdapterFactory;
- private final MutableReactiveMessageSenderSpec senderSpec = new
MutableReactiveMessageSenderSpec();
+ private final MutableReactiveMessageSenderSpec senderSpec;
private ReactiveMessageSenderCache producerCache;
@@ -49,8 +49,15 @@ class AdaptedReactiveMessageSenderBuilder<T> implements
ReactiveMessageSenderBui
AdaptedReactiveMessageSenderBuilder(Schema<T> schema,
ReactiveProducerAdapterFactory
reactiveProducerAdapterFactory) {
+ this(schema, reactiveProducerAdapterFactory, new
MutableReactiveMessageSenderSpec());
+ }
+
+ private AdaptedReactiveMessageSenderBuilder(Schema<T> schema,
+ ReactiveProducerAdapterFactory
reactiveProducerAdapterFactory,
+ MutableReactiveMessageSenderSpec senderSpec) {
this.schema = schema;
this.reactiveProducerAdapterFactory =
reactiveProducerAdapterFactory;
+ this.senderSpec = senderSpec;
}
@Override
@@ -76,6 +83,17 @@ class AdaptedReactiveMessageSenderBuilder<T> implements
ReactiveMessageSenderBui
return this;
}
+ @Override
+ public ReactiveMessageSenderBuilder<T> clone() {
+ AdaptedReactiveMessageSenderBuilder<T> cloned = new
AdaptedReactiveMessageSenderBuilder<>(this.schema,
+ this.reactiveProducerAdapterFactory, new
MutableReactiveMessageSenderSpec(this.senderSpec));
+ cloned.producerCache = this.producerCache;
+ cloned.maxInflight = this.maxInflight;
+ cloned.maxConcurrentSenderSubscriptions =
this.maxConcurrentSenderSubscriptions;
+ cloned.producerActionTransformer =
this.producerActionTransformer;
+ return cloned;
+ }
+
@Override
public ReactiveMessageSender<T> build() {
if (this.maxInflight > 0) {
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index aa13a57..cbc0d26 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -68,6 +68,12 @@ public interface ReactiveMessageConsumerBuilder<T> {
*/
MutableReactiveMessageConsumerSpec getMutableSpec();
+ /**
+ * Creates and returns a copy of this reactive consumer builder.
+ * @return the cloned reactive reader builder
+ */
+ ReactiveMessageConsumerBuilder<T> clone();
+
/**
* Adds a topic this consumer will subscribe on.
* @param topicName a topic that the consumer will subscribe on
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
index 6f2ac5a..c620a53 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
@@ -67,6 +67,12 @@ public interface ReactiveMessageSenderBuilder<T> {
*/
ReactiveMessageSenderBuilder<T> maxConcurrentSenderSubscriptions(int
maxConcurrentSenderSubscriptions);
+ /**
+ * Creates and returns a copy of this reactive sender builder.
+ * @return the cloned reactive reader builder
+ */
+ ReactiveMessageSenderBuilder<T> clone();
+
/**
* Applies a sender spec to configure the sender.
* @param senderSpec the sender spec to apply