This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 569d3d2  Add 
ReactiveMessageConsumerSpec::getSubscriptionInitialPosition (#19)
569d3d2 is described below

commit 569d3d236aec18a5973e25a4c8ba6a02dd54be45
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 9 10:50:06 2022 +0100

    Add ReactiveMessageConsumerSpec::getSubscriptionInitialPosition (#19)
---
 .../adapter/AdaptedReactiveMessageConsumer.java    |  3 ++
 .../api/ImmutableReactiveMessageConsumerSpec.java  | 55 +++++++++++++++++++---
 .../api/MutableReactiveMessageConsumerSpec.java    | 17 +++++++
 .../client/api/ReactiveMessageConsumerBuilder.java |  7 +++
 .../client/api/ReactiveMessageConsumerSpec.java    |  3 ++
 .../ImmutableReactiveMessageConsumerSpecMixin.java |  2 +
 .../jackson/PulsarReactiveClientModuleTest.java    |  4 ++
 7 files changed, 85 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index 2a41ecd..587ed69 100644
--- 
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++ 
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -129,6 +129,9 @@ class AdaptedReactiveMessageConsumer<T> implements 
ReactiveMessageConsumer<T> {
                if (this.consumerSpec.getSubscriptionType() != null) {
                        
consumerBuilder.subscriptionType(this.consumerSpec.getSubscriptionType());
                }
+               if (this.consumerSpec.getSubscriptionInitialPosition() != null) 
{
+                       
consumerBuilder.subscriptionInitialPosition(this.consumerSpec.getSubscriptionInitialPosition());
+               }
                if (this.consumerSpec.getKeySharedPolicy() != null) {
                        
consumerBuilder.keySharedPolicy(this.consumerSpec.getKeySharedPolicy());
                }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
index 4f4878e..bdf681f 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import reactor.core.scheduler.Scheduler;
@@ -49,6 +50,8 @@ public class ImmutableReactiveMessageConsumerSpec implements 
ReactiveMessageCons
 
        private final SubscriptionType subscriptionType;
 
+       private final SubscriptionInitialPosition subscriptionInitialPosition;
+
        private final KeySharedPolicy keySharedPolicy;
 
        private final Boolean replicateSubscriptionState;
@@ -115,6 +118,8 @@ public class ImmutableReactiveMessageConsumerSpec 
implements ReactiveMessageCons
 
                this.subscriptionType = consumerSpec.getSubscriptionType();
 
+               this.subscriptionInitialPosition = 
consumerSpec.getSubscriptionInitialPosition();
+
                this.keySharedPolicy = consumerSpec.getKeySharedPolicy();
 
                this.replicateSubscriptionState = 
consumerSpec.getReplicateSubscriptionState();
@@ -171,12 +176,12 @@ public class ImmutableReactiveMessageConsumerSpec 
implements ReactiveMessageCons
        public ImmutableReactiveMessageConsumerSpec(List<String> topicNames, 
Pattern topicsPattern,
                        RegexSubscriptionMode topicsPatternSubscriptionMode, 
Duration topicsPatternAutoDiscoveryPeriod,
                        String subscriptionName, SubscriptionMode 
subscriptionMode, SubscriptionType subscriptionType,
-                       KeySharedPolicy keySharedPolicy, Boolean 
replicateSubscriptionState,
-                       Map<String, String> subscriptionProperties, String 
consumerName, Map<String, String> properties,
-                       Integer priorityLevel, Boolean readCompacted, Boolean 
batchIndexAckEnabled, Duration ackTimeout,
-                       Duration ackTimeoutTickTime, Duration 
acknowledgementsGroupTime, Boolean acknowledgeAsynchronously,
-                       Scheduler acknowledgeScheduler, Duration 
negativeAckRedeliveryDelay, DeadLetterPolicy deadLetterPolicy,
-                       Boolean retryLetterTopicEnable, Integer 
receiverQueueSize,
+                       SubscriptionInitialPosition 
subscriptionInitialPosition, KeySharedPolicy keySharedPolicy,
+                       Boolean replicateSubscriptionState, Map<String, String> 
subscriptionProperties, String consumerName,
+                       Map<String, String> properties, Integer priorityLevel, 
Boolean readCompacted, Boolean batchIndexAckEnabled,
+                       Duration ackTimeout, Duration ackTimeoutTickTime, 
Duration acknowledgementsGroupTime,
+                       Boolean acknowledgeAsynchronously, Scheduler 
acknowledgeScheduler, Duration negativeAckRedeliveryDelay,
+                       DeadLetterPolicy deadLetterPolicy, Boolean 
retryLetterTopicEnable, Integer receiverQueueSize,
                        Integer maxTotalReceiverQueueSizeAcrossPartitions, 
Boolean autoUpdatePartitions,
                        Duration autoUpdatePartitionsInterval, CryptoKeyReader 
cryptoKeyReader,
                        ConsumerCryptoFailureAction cryptoFailureAction, 
Integer maxPendingChunkedMessage,
@@ -188,6 +193,7 @@ public class ImmutableReactiveMessageConsumerSpec 
implements ReactiveMessageCons
                this.subscriptionName = subscriptionName;
                this.subscriptionMode = subscriptionMode;
                this.subscriptionType = subscriptionType;
+               this.subscriptionInitialPosition = subscriptionInitialPosition;
                this.keySharedPolicy = keySharedPolicy;
                this.replicateSubscriptionState = replicateSubscriptionState;
                this.subscriptionProperties = subscriptionProperties;
@@ -215,130 +221,167 @@ public class ImmutableReactiveMessageConsumerSpec 
implements ReactiveMessageCons
                this.expireTimeOfIncompleteChunkedMessage = 
expireTimeOfIncompleteChunkedMessage;
        }
 
+       @Override
        public List<String> getTopicNames() {
                return this.topicNames;
        }
 
+       @Override
        public Pattern getTopicsPattern() {
                return this.topicsPattern;
        }
 
+       @Override
        public RegexSubscriptionMode getTopicsPatternSubscriptionMode() {
                return this.topicsPatternSubscriptionMode;
        }
 
+       @Override
        public Duration getTopicsPatternAutoDiscoveryPeriod() {
                return this.topicsPatternAutoDiscoveryPeriod;
        }
 
+       @Override
        public String getSubscriptionName() {
                return this.subscriptionName;
        }
 
+       @Override
        public SubscriptionMode getSubscriptionMode() {
                return this.subscriptionMode;
        }
 
+       @Override
        public SubscriptionType getSubscriptionType() {
                return this.subscriptionType;
        }
 
+       @Override
+       public SubscriptionInitialPosition getSubscriptionInitialPosition() {
+               return this.subscriptionInitialPosition;
+       }
+
+       @Override
        public KeySharedPolicy getKeySharedPolicy() {
                return this.keySharedPolicy;
        }
 
+       @Override
        public Boolean getReplicateSubscriptionState() {
                return this.replicateSubscriptionState;
        }
 
+       @Override
        public Map<String, String> getSubscriptionProperties() {
                return this.subscriptionProperties;
        }
 
+       @Override
        public String getConsumerName() {
                return this.consumerName;
        }
 
+       @Override
        public Map<String, String> getProperties() {
                return this.properties;
        }
 
+       @Override
        public Integer getPriorityLevel() {
                return this.priorityLevel;
        }
 
+       @Override
        public Boolean getReadCompacted() {
                return this.readCompacted;
        }
 
+       @Override
        public Boolean getBatchIndexAckEnabled() {
                return this.batchIndexAckEnabled;
        }
 
+       @Override
        public Duration getAckTimeout() {
                return this.ackTimeout;
        }
 
+       @Override
        public Duration getAckTimeoutTickTime() {
                return this.ackTimeoutTickTime;
        }
 
+       @Override
        public Duration getAcknowledgementsGroupTime() {
                return this.acknowledgementsGroupTime;
        }
 
+       @Override
        public Boolean getAcknowledgeAsynchronously() {
                return this.acknowledgeAsynchronously;
        }
 
+       @Override
        public Scheduler getAcknowledgeScheduler() {
                return this.acknowledgeScheduler;
        }
 
+       @Override
        public Duration getNegativeAckRedeliveryDelay() {
                return this.negativeAckRedeliveryDelay;
        }
 
+       @Override
        public DeadLetterPolicy getDeadLetterPolicy() {
                return this.deadLetterPolicy;
        }
 
+       @Override
        public Boolean getRetryLetterTopicEnable() {
                return this.retryLetterTopicEnable;
        }
 
+       @Override
        public Integer getReceiverQueueSize() {
                return this.receiverQueueSize;
        }
 
+       @Override
        public Integer getMaxTotalReceiverQueueSizeAcrossPartitions() {
                return this.maxTotalReceiverQueueSizeAcrossPartitions;
        }
 
+       @Override
        public Boolean getAutoUpdatePartitions() {
                return this.autoUpdatePartitions;
        }
 
+       @Override
        public Duration getAutoUpdatePartitionsInterval() {
                return this.autoUpdatePartitionsInterval;
        }
 
+       @Override
        public CryptoKeyReader getCryptoKeyReader() {
                return this.cryptoKeyReader;
        }
 
+       @Override
        public ConsumerCryptoFailureAction getCryptoFailureAction() {
                return this.cryptoFailureAction;
        }
 
+       @Override
        public Integer getMaxPendingChunkedMessage() {
                return this.maxPendingChunkedMessage;
        }
 
+       @Override
        public Boolean getAutoAckOldestChunkedMessageOnQueueFull() {
                return this.autoAckOldestChunkedMessageOnQueueFull;
        }
 
+       @Override
        public Duration getExpireTimeOfIncompleteChunkedMessage() {
                return this.expireTimeOfIncompleteChunkedMessage;
        }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
index c96d371..2b28eb1 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import reactor.core.scheduler.Scheduler;
@@ -48,6 +49,8 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
 
        private SubscriptionType subscriptionType;
 
+       private SubscriptionInitialPosition subscriptionInitialPosition;
+
        private KeySharedPolicy keySharedPolicy;
 
        private Boolean replicateSubscriptionState;
@@ -118,6 +121,8 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
 
                this.subscriptionType = consumerSpec.getSubscriptionType();
 
+               this.subscriptionInitialPosition = 
consumerSpec.getSubscriptionInitialPosition();
+
                this.keySharedPolicy = consumerSpec.getKeySharedPolicy();
 
                this.replicateSubscriptionState = 
consumerSpec.getReplicateSubscriptionState();
@@ -233,6 +238,15 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                this.subscriptionType = subscriptionType;
        }
 
+       @Override
+       public SubscriptionInitialPosition getSubscriptionInitialPosition() {
+               return this.subscriptionInitialPosition;
+       }
+
+       public void setSubscriptionInitialPosition(SubscriptionInitialPosition 
subscriptionInitialPosition) {
+               this.subscriptionInitialPosition = subscriptionInitialPosition;
+       }
+
        @Override
        public KeySharedPolicy getKeySharedPolicy() {
                return this.keySharedPolicy;
@@ -480,6 +494,9 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                if (consumerSpec.getSubscriptionType() != null) {
                        setSubscriptionType(consumerSpec.getSubscriptionType());
                }
+               if (consumerSpec.getSubscriptionInitialPosition() != null) {
+                       
setSubscriptionInitialPosition(consumerSpec.getSubscriptionInitialPosition());
+               }
                if (consumerSpec.getKeySharedPolicy() != null) {
                        setKeySharedPolicy(consumerSpec.getKeySharedPolicy());
                }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index b3cb76b..e09179c 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import reactor.core.scheduler.Scheduler;
@@ -91,6 +92,12 @@ public interface ReactiveMessageConsumerBuilder<T> {
                return this;
        }
 
+       default ReactiveMessageConsumerBuilder<T> subscriptionInitialPosition(
+                       SubscriptionInitialPosition 
subscriptionInitialPosition) {
+               
getMutableSpec().setSubscriptionInitialPosition(subscriptionInitialPosition);
+               return this;
+       }
+
        default ReactiveMessageConsumerBuilder<T> 
keySharedPolicy(KeySharedPolicy keySharedPolicy) {
                getMutableSpec().setKeySharedPolicy(keySharedPolicy);
                return this;
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
index 46a212b..ebc09b1 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import reactor.core.scheduler.Scheduler;
@@ -46,6 +47,8 @@ public interface ReactiveMessageConsumerSpec {
 
        SubscriptionType getSubscriptionType();
 
+       SubscriptionInitialPosition getSubscriptionInitialPosition();
+
        KeySharedPolicy getKeySharedPolicy();
 
        Boolean getReplicateSubscriptionState();
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
index 878cca7..3ab1aee 100644
--- 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
@@ -46,6 +47,7 @@ abstract class ImmutableReactiveMessageConsumerSpecMixin {
                        @JsonProperty("subscriptionName") String 
subscriptionName,
                        @JsonProperty("subscriptionMode") SubscriptionMode 
subscriptionMode,
                        @JsonProperty("subscriptionType") SubscriptionType 
subscriptionType,
+                       @JsonProperty("subscriptionInitialPosition") 
SubscriptionInitialPosition subscriptionInitialPosition,
                        @JsonProperty("keySharedPolicy") KeySharedPolicy 
keySharedPolicy,
                        @JsonProperty("replicateSubscriptionState") Boolean 
replicateSubscriptionState,
                        @JsonProperty("subscriptionProperties") Map<String, 
String> subscriptionProperties,
diff --git 
a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
 
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
index d36bea8..aa1f648 100644
--- 
a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
+++ 
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.KeyBasedBatcherBuilder;
@@ -72,6 +73,7 @@ class PulsarReactiveClientModuleTest {
                                + "'subscriptionName': 'my-sub',"
                                + "'subscriptionMode': 'Durable',"
                                + "'subscriptionType': 'Exclusive',"
+                               + "'subscriptionInitialPosition': 'Latest',"
                                + "'keySharedPolicy': 'STICKY',"
                                + "'replicateSubscriptionState': true,"
                                + "'subscriptionProperties': {'my-key': 
'my-value'},"
@@ -117,6 +119,7 @@ class PulsarReactiveClientModuleTest {
                assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
                
assertThat(spec.getSubscriptionMode()).isEqualTo(SubscriptionMode.Durable);
                
assertThat(spec.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive);
+               
assertThat(spec.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Latest);
                
assertThat(spec.getKeySharedPolicy()).isInstanceOf(KeySharedPolicy.KeySharedPolicySticky.class);
                assertThat(spec.getReplicateSubscriptionState()).isTrue();
                
assertThat(spec.getSubscriptionProperties()).containsOnlyKeys("my-key");
@@ -162,6 +165,7 @@ class PulsarReactiveClientModuleTest {
                                + "  'subscriptionName' : 'my-sub',\n"
                                + "  'subscriptionMode' : 'Durable',\n"
                                + "  'subscriptionType' : 'Exclusive',\n"
+                               + "  'subscriptionInitialPosition' : 
'Latest',\n"
                                + "  'keySharedPolicy' : 'STICKY',\n"
                                + "  'replicateSubscriptionState' : true,\n"
                                + "  'subscriptionProperties' : {\n"

Reply via email to