This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3fce3097c76 [improve][client] Make replicateSubscriptionState nullable
(#23757)
3fce3097c76 is described below
commit 3fce3097c76a9c8cb64cf3d8d87f6e050e6cb3a5
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Dec 20 20:58:03 2024 +0800
[improve][client] Make replicateSubscriptionState nullable (#23757)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
.../pulsar/broker/service/SubscriptionOption.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 4 +-
.../service/persistent/PersistentSubscription.java | 19 +++--
.../broker/service/persistent/PersistentTopic.java | 19 ++---
.../client/api/ReplicateSubscriptionTest.java | 96 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../impl/conf/ConsumerConfigurationData.java | 14 +++-
.../client/impl/ConsumerBuilderImplTest.java | 36 +++++++-
.../apache/pulsar/common/protocol/Commands.java | 8 +-
10 files changed, 176 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f9e593345d8..2415930a99a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1241,8 +1241,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ?
getSchema(subscribe.getSchema()) : null;
- final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
- && subscribe.isReplicateSubscriptionState();
+ final Boolean isReplicated =
+ subscribe.hasReplicateSubscriptionState() ?
subscribe.isReplicateSubscriptionState() : null;
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
index af56d023616..328e7618f8c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
@@ -46,7 +46,7 @@ public class SubscriptionOption {
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private long startMessageRollbackDurationSec;
- private boolean replicatedSubscriptionStateArg;
+ private Boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 34c2678f847..7cdc8cc11a4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -256,7 +256,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
return internalSubscribe(option.getCnx(),
option.getSubscriptionName(), option.getConsumerId(),
option.getSubType(), option.getPriorityLevel(),
option.getConsumerName(),
option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
- option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(),
+ option.getStartMessageRollbackDurationSec(),
option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
}
@@ -279,7 +279,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
String consumerName,
MessageId startMessageId,
Map<String, String>
metadata, boolean readCompacted,
long
resetStartMessageBackInSec,
- boolean
replicateSubscriptionState,
+ Boolean
replicateSubscriptionState,
KeySharedMeta
keySharedMeta,
Map<String, String>
subscriptionProperties,
SchemaType
schemaType) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 0096f398ada..8cebbd52695 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -133,9 +133,11 @@ public class PersistentSubscription extends
AbstractSubscription {
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;
+ private volatile Boolean replicatedControlled;
- static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
- return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
+ static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
+ return isReplicated != null && isReplicated ?
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
+ NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}
static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
@@ -143,19 +145,21 @@ public class PersistentSubscription extends
AbstractSubscription {
}
public PersistentSubscription(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
- boolean replicated) {
+ Boolean replicated) {
this(topic, subscriptionName, cursor, replicated,
Collections.emptyMap());
}
public PersistentSubscription(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
- boolean replicated, Map<String, String>
subscriptionProperties) {
+ Boolean replicated, Map<String, String>
subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic",
topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic,
subscriptionName, cursor, this);
- this.setReplicated(replicated);
+ if (replicated != null) {
+ this.setReplicated(replicated);
+ }
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() :
Collections.unmodifiableMap(subscriptionProperties);
if
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
@@ -194,6 +198,7 @@ public class PersistentSubscription extends
AbstractSubscription {
}
public boolean setReplicated(boolean replicated) {
+ replicatedControlled = replicated;
ServiceConfiguration config =
topic.getBrokerService().getPulsar().getConfig();
if (!replicated || !config.isEnableReplicatedSubscriptions()) {
@@ -1557,4 +1562,8 @@ public class PersistentSubscription extends
AbstractSubscription {
private static final Logger log =
LoggerFactory.getLogger(PersistentSubscription.class);
+ @VisibleForTesting
+ public Boolean getReplicatedControlled() {
+ return replicatedControlled;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 056fad2a005..11220d1c955 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -513,7 +513,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
final String subscriptionName =
Codec.decode(cursor.getName());
subscriptions.put(subscriptionName,
createPersistentSubscription(subscriptionName, cursor,
-
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
cursor.getCursorProperties()));
// subscription-cursor gets activated by default:
deactivate as there is no active subscription
// right now
@@ -584,7 +584,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private PersistentSubscription createPersistentSubscription(String
subscriptionName, ManagedCursor cursor,
- boolean replicated, Map<String, String> subscriptionProperties) {
+ Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
&& topicCompactionService instanceof
PulsarTopicCompactionService pulsarTopicCompactionService) {
@@ -888,7 +888,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
option.getSubType(), option.getPriorityLevel(),
option.getConsumerName(), option.isDurable(),
option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(),
- option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(),
+ option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch(), option.getSchemaType());
}
@@ -900,7 +900,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Map<String, String>
metadata, boolean readCompacted,
InitialPosition
initialPosition,
long
startMessageRollbackDurationSec,
- boolean
replicatedSubscriptionStateArg,
+ Boolean
replicatedSubscriptionStateArg,
KeySharedMeta
keySharedMeta,
Map<String, String>
subscriptionProperties,
long consumerEpoch,
@@ -911,12 +911,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__
-> {
- boolean replicatedSubscriptionState =
replicatedSubscriptionStateArg;
-
- if (replicatedSubscriptionState
+ if (replicatedSubscriptionStateArg != null &&
replicatedSubscriptionStateArg
&&
!brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by
broker.", getName());
- replicatedSubscriptionState = false;
}
if (subType == SubType.Key_Shared
@@ -985,7 +982,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<? extends Subscription> subscriptionFuture =
isDurable
? getDurableSubscription(subscriptionName,
initialPosition, startMessageRollbackDurationSec,
- replicatedSubscriptionState,
subscriptionProperties)
+ replicatedSubscriptionStateArg, subscriptionProperties)
: getNonDurableSubscription(subscriptionName,
startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted,
subscriptionProperties);
@@ -1082,7 +1079,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private CompletableFuture<Subscription> getDurableSubscription(String
subscriptionName,
InitialPosition initialPosition,
long
startMessageRollbackDurationSec,
- boolean
replicated,
+ Boolean
replicated,
Map<String,
String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -1113,7 +1110,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return;
}
}
- if (replicated && !subscription.isReplicated()) {
+ if (replicated != null && replicated &&
!subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
new file mode 100644
index 00000000000..327081bf1b9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ReplicateSubscriptionTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ }
+
+ @DataProvider
+ public Object[] replicateSubscriptionState() {
+ return new Object[]{
+ Boolean.TRUE,
+ Boolean.FALSE,
+ null
+ };
+ }
+
+ @Test(dataProvider = "replicateSubscriptionState")
+ public void testReplicateSubscriptionState(Boolean
replicateSubscriptionState)
+ throws Exception {
+ String topic = "persistent://my-property/my-ns/" + System.nanoTime();
+ String subName = "sub-" + System.nanoTime();
+ ConsumerBuilder<String> consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName);
+ if (replicateSubscriptionState != null) {
+
consumerBuilder.replicateSubscriptionState(replicateSubscriptionState);
+ }
+ ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl)
consumerBuilder;
+
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(),
replicateSubscriptionState);
+ @Cleanup
+ Consumer<String> ignored = consumerBuilder.subscribe();
+ CompletableFuture<Optional<Topic>> topicIfExists =
pulsar.getBrokerService().getTopicIfExists(topic);
+ assertThat(topicIfExists)
+ .succeedsWithin(3, TimeUnit.SECONDS)
+ .matches(optionalTopic -> {
+ assertTrue(optionalTopic.isPresent());
+ Topic topicRef = optionalTopic.get();
+ Subscription subscription =
topicRef.getSubscription(subName);
+ assertNotNull(subscription);
+ assertTrue(subscription instanceof PersistentSubscription);
+ PersistentSubscription persistentSubscription =
(PersistentSubscription) subscription;
+
assertEquals(persistentSubscription.getReplicatedControlled(),
replicateSubscriptionState);
+ return true;
+ });
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4d1b51e34db..16dc70f736e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -902,7 +902,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
synchronized (this) {
ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable,
startMessageIdData, metadata, readCompacted,
- conf.isReplicateSubscriptionState(),
+ conf.getReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si,
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index f9ff5913f62..6e884ba2791 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.conf;
import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
@@ -381,7 +382,8 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
value = "If `replicateSubscriptionState` is enabled, a
subscription state is replicated to geo-replicated"
+ " clusters."
)
- private boolean replicateSubscriptionState = false;
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private Boolean replicateSubscriptionState;
private boolean resetIncludeHead = false;
@@ -437,4 +439,14 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
throw new RuntimeException("Failed to clone
ConsumerConfigurationData");
}
}
+
+ /**
+ * Backward compatibility with the old `replicateSubscriptionState` field.
+ * @deprecated Using {@link #getReplicateSubscriptionState()} instead.
+ */
+ @JsonIgnore
+ @Deprecated
+ public boolean isReplicateSubscriptionState() {
+ return replicateSubscriptionState != null &&
replicateSubscriptionState;
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index e4b7b4d1ec8..c103712d400 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -504,7 +504,7 @@ public class ConsumerBuilderImplTest {
assertTrue(configurationData.isRetryEnable());
assertFalse(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
- assertTrue(configurationData.isReplicateSubscriptionState());
+ assertEquals(configurationData.getReplicateSubscriptionState(),
Boolean.TRUE);
assertTrue(configurationData.isResetIncludeHead());
assertTrue(configurationData.isBatchIndexAckEnabled());
assertTrue(configurationData.isAckReceiptEnabled());
@@ -564,7 +564,7 @@ public class ConsumerBuilderImplTest {
assertFalse(configurationData.isRetryEnable());
assertTrue(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
- assertFalse(configurationData.isReplicateSubscriptionState());
+ assertNull(configurationData.getReplicateSubscriptionState());
assertFalse(configurationData.isResetIncludeHead());
assertFalse(configurationData.isBatchIndexAckEnabled());
assertFalse(configurationData.isAckReceiptEnabled());
@@ -584,6 +584,38 @@ public class ConsumerBuilderImplTest {
assertNull(configurationData.getPayloadProcessor());
}
+ @Test
+ public void testReplicateSubscriptionState() {
+ ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+ assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
+
+ consumerBuilder.replicateSubscriptionState(true);
+
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(),
Boolean.TRUE);
+
+ consumerBuilder.replicateSubscriptionState(false);
+
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(),
Boolean.FALSE);
+
+ Map<String, Object> conf = new HashMap<>();
+ consumerBuilder = createConsumerBuilder();
+ consumerBuilder.loadConf(conf);
+ assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
+
+ conf.put("replicateSubscriptionState", true);
+ consumerBuilder = createConsumerBuilder();
+ consumerBuilder.loadConf(conf);
+
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(),
Boolean.TRUE);
+
+ conf.put("replicateSubscriptionState", false);
+ consumerBuilder = createConsumerBuilder();
+ consumerBuilder.loadConf(conf);
+
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(),
Boolean.FALSE);
+
+ conf.put("replicateSubscriptionState", null);
+ consumerBuilder = createConsumerBuilder();
+ consumerBuilder.loadConf(conf);
+ assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
+ }
+
private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
ConsumerBuilderImpl<byte[]> consumerBuilder = new
ConsumerBuilderImpl<>(null, Schema.BYTES);
Map<String, String> properties = new HashMap<>();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 19aa9907549..4f390cc99e6 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -583,7 +583,7 @@ public class Commands {
public static ByteBuf newSubscribe(String topic, String subscription, long
consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean
isDurable, MessageIdData startMessageId,
- Map<String, String> metadata, boolean readCompacted, boolean
isReplicated,
+ Map<String, String> metadata, boolean readCompacted, Boolean
isReplicated,
InitialPosition subscriptionInitialPosition, long
startMessageRollbackDurationInSec, SchemaInfo schemaInfo,
boolean createTopicIfDoesNotExist) {
return newSubscribe(topic, subscription, consumerId, requestId,
subType, priorityLevel, consumerName,
@@ -594,7 +594,7 @@ public class Commands {
public static ByteBuf newSubscribe(String topic, String subscription, long
consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName,
boolean isDurable, MessageIdData startMessageId,
- Map<String, String> metadata, boolean readCompacted, boolean
isReplicated,
+ Map<String, String> metadata, boolean readCompacted, Boolean
isReplicated,
InitialPosition subscriptionInitialPosition, long
startMessageRollbackDurationInSec,
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist,
KeySharedPolicy keySharedPolicy,
Map<String, String> subscriptionProperties, long consumerEpoch)
{
@@ -610,9 +610,11 @@ public class Commands {
.setDurable(isDurable)
.setReadCompacted(readCompacted)
.setInitialPosition(subscriptionInitialPosition)
- .setReplicateSubscriptionState(isReplicated)
.setForceTopicCreation(createTopicIfDoesNotExist)
.setConsumerEpoch(consumerEpoch);
+ if (isReplicated != null) {
+ subscribe.setReplicateSubscriptionState(isReplicated);
+ }
if (subscriptionProperties != null &&
!subscriptionProperties.isEmpty()) {
List<KeyValue> keyValues = new ArrayList<>();