This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 7edb245  Add clone method to consumer and sender builders (#34)
7edb245 is described below

commit 7edb245627d28a9ddaf15c63bf9f5a28800a12a9
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 10:48:27 2022 +0100

    Add clone method to consumer and sender builders (#34)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../AdaptedReactiveMessageConsumerBuilder.java       | 15 ++++++++++++++-
 .../adapter/AdaptedReactiveMessageReaderBuilder.java | 11 ++++++++---
 .../adapter/AdaptedReactiveMessageSenderBuilder.java | 20 +++++++++++++++++++-
 .../client/api/ReactiveMessageConsumerBuilder.java   |  6 ++++++
 .../client/api/ReactiveMessageSenderBuilder.java     |  6 ++++++
 5 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
index ed59a3c..7df510f 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerBuilder.java
@@ -27,12 +27,19 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements 
ReactiveMessageConsume
 
        private final ReactiveConsumerAdapterFactory 
reactiveConsumerAdapterFactory;
 
-       private final MutableReactiveMessageConsumerSpec consumerSpec = new 
MutableReactiveMessageConsumerSpec();
+       private final MutableReactiveMessageConsumerSpec consumerSpec;
 
        AdaptedReactiveMessageConsumerBuilder(Schema<T> schema,
                        ReactiveConsumerAdapterFactory 
reactiveConsumerAdapterFactory) {
+               this(schema, reactiveConsumerAdapterFactory, new 
MutableReactiveMessageConsumerSpec());
+       }
+
+       private AdaptedReactiveMessageConsumerBuilder(Schema<T> schema,
+                       ReactiveConsumerAdapterFactory 
reactiveConsumerAdapterFactory,
+                       MutableReactiveMessageConsumerSpec consumerSpec) {
                this.schema = schema;
                this.reactiveConsumerAdapterFactory = 
reactiveConsumerAdapterFactory;
+               this.consumerSpec = consumerSpec;
        }
 
        @Override
@@ -40,6 +47,12 @@ class AdaptedReactiveMessageConsumerBuilder<T> implements 
ReactiveMessageConsume
                return this.consumerSpec;
        }
 
+       @Override
+       public ReactiveMessageConsumerBuilder<T> clone() {
+               return new AdaptedReactiveMessageConsumerBuilder<>(this.schema, 
this.reactiveConsumerAdapterFactory,
+                               new 
MutableReactiveMessageConsumerSpec(this.consumerSpec));
+       }
+
        @Override
        public ReactiveMessageConsumer<T> build() {
                return new 
AdaptedReactiveMessageConsumer<T>(this.reactiveConsumerAdapterFactory, 
this.schema,
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
index 252caae..fc568a3 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderBuilder.java
@@ -29,15 +29,21 @@ class AdaptedReactiveMessageReaderBuilder<T> implements 
ReactiveMessageReaderBui
 
        private final Schema<T> schema;
 
-       private MutableReactiveMessageReaderSpec readerSpec = new 
MutableReactiveMessageReaderSpec();
+       private final MutableReactiveMessageReaderSpec readerSpec;
 
        private StartAtSpec startAtSpec = StartAtSpec.ofEarliest();
 
        private EndOfStreamAction endOfStreamAction = 
EndOfStreamAction.COMPLETE;
 
        AdaptedReactiveMessageReaderBuilder(Schema<T> schema, 
ReactiveReaderAdapterFactory reactiveReaderAdapterFactory) {
+               this(schema, reactiveReaderAdapterFactory, new 
MutableReactiveMessageReaderSpec());
+       }
+
+       private AdaptedReactiveMessageReaderBuilder(Schema<T> schema,
+                       ReactiveReaderAdapterFactory 
reactiveReaderAdapterFactory, MutableReactiveMessageReaderSpec readerSpec) {
                this.reactiveReaderAdapterFactory = 
reactiveReaderAdapterFactory;
                this.schema = schema;
+               this.readerSpec = readerSpec;
        }
 
        @Override
@@ -60,8 +66,7 @@ class AdaptedReactiveMessageReaderBuilder<T> implements 
ReactiveMessageReaderBui
        @Override
        public ReactiveMessageReaderBuilder<T> clone() {
                AdaptedReactiveMessageReaderBuilder<T> cloned = new 
AdaptedReactiveMessageReaderBuilder<>(this.schema,
-                               this.reactiveReaderAdapterFactory);
-               cloned.readerSpec = new 
MutableReactiveMessageReaderSpec(this.readerSpec);
+                               this.reactiveReaderAdapterFactory, new 
MutableReactiveMessageReaderSpec(this.readerSpec));
                cloned.startAtSpec = this.startAtSpec;
                cloned.endOfStreamAction = this.endOfStreamAction;
                return this;
diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
index c060cae..29c41af 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java
@@ -37,7 +37,7 @@ class AdaptedReactiveMessageSenderBuilder<T> implements 
ReactiveMessageSenderBui
 
        private final ReactiveProducerAdapterFactory 
reactiveProducerAdapterFactory;
 
-       private final MutableReactiveMessageSenderSpec senderSpec = new 
MutableReactiveMessageSenderSpec();
+       private final MutableReactiveMessageSenderSpec senderSpec;
 
        private ReactiveMessageSenderCache producerCache;
 
@@ -49,8 +49,15 @@ class AdaptedReactiveMessageSenderBuilder<T> implements 
ReactiveMessageSenderBui
 
        AdaptedReactiveMessageSenderBuilder(Schema<T> schema,
                        ReactiveProducerAdapterFactory 
reactiveProducerAdapterFactory) {
+               this(schema, reactiveProducerAdapterFactory, new 
MutableReactiveMessageSenderSpec());
+       }
+
+       private AdaptedReactiveMessageSenderBuilder(Schema<T> schema,
+                       ReactiveProducerAdapterFactory 
reactiveProducerAdapterFactory,
+                       MutableReactiveMessageSenderSpec senderSpec) {
                this.schema = schema;
                this.reactiveProducerAdapterFactory = 
reactiveProducerAdapterFactory;
+               this.senderSpec = senderSpec;
        }
 
        @Override
@@ -76,6 +83,17 @@ class AdaptedReactiveMessageSenderBuilder<T> implements 
ReactiveMessageSenderBui
                return this;
        }
 
+       @Override
+       public ReactiveMessageSenderBuilder<T> clone() {
+               AdaptedReactiveMessageSenderBuilder<T> cloned = new 
AdaptedReactiveMessageSenderBuilder<>(this.schema,
+                               this.reactiveProducerAdapterFactory, new 
MutableReactiveMessageSenderSpec(this.senderSpec));
+               cloned.producerCache = this.producerCache;
+               cloned.maxInflight = this.maxInflight;
+               cloned.maxConcurrentSenderSubscriptions = 
this.maxConcurrentSenderSubscriptions;
+               cloned.producerActionTransformer = 
this.producerActionTransformer;
+               return cloned;
+       }
+
        @Override
        public ReactiveMessageSender<T> build() {
                if (this.maxInflight > 0) {
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index aa13a57..cbc0d26 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -68,6 +68,12 @@ public interface ReactiveMessageConsumerBuilder<T> {
         */
        MutableReactiveMessageConsumerSpec getMutableSpec();
 
+       /**
+        * Creates and returns a copy of this reactive consumer builder.
+        * @return the cloned reactive reader builder
+        */
+       ReactiveMessageConsumerBuilder<T> clone();
+
        /**
         * Adds a topic this consumer will subscribe on.
         * @param topicName a topic that the consumer will subscribe on
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
index 6f2ac5a..c620a53 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
@@ -67,6 +67,12 @@ public interface ReactiveMessageSenderBuilder<T> {
         */
        ReactiveMessageSenderBuilder<T> maxConcurrentSenderSubscriptions(int 
maxConcurrentSenderSubscriptions);
 
+       /**
+        * Creates and returns a copy of this reactive sender builder.
+        * @return the cloned reactive reader builder
+        */
+       ReactiveMessageSenderBuilder<T> clone();
+
        /**
         * Applies a sender spec to configure the sender.
         * @param senderSpec the sender spec to apply

Reply via email to