This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 462f70ee463 [fix][client] Fix exception when calling loadConf on a
ConsumerBuilder that has a KeySharedPolicy (#18345)
462f70ee463 is described below
commit 462f70ee463e8478a7c045fa77d55dc0349510c8
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 16 01:45:24 2022 +0100
[fix][client] Fix exception when calling loadConf on a ConsumerBuilder that
has a KeySharedPolicy (#18345)
(cherry picked from commit 9c2ec5e218b4743430d654c84f7e26f663b126f1)
---
.../impl/conf/ConsumerConfigurationData.java | 1 +
.../client/impl/ConsumerBuilderImplTest.java | 201 +++++++++++++++++++++
2 files changed, 202 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 107694637b3..b7d76809f37 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -142,6 +142,7 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private boolean resetIncludeHead = false;
+ @JsonIgnore
private transient KeySharedPolicy keySharedPolicy;
private boolean batchIndexAckEnabled = false;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index d648ee75af7..4d33d00582c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -18,12 +18,18 @@
*/
package org.apache.pulsar.client.impl;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -31,14 +37,27 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
/**
* Unit tests of {@link ConsumerBuilderImpl}.
@@ -317,4 +336,186 @@ public class ConsumerBuilderImplTest {
consumerBuilderImpl.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
+ @Test
+ public void testLoadConf() throws Exception {
+ ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+
+ String jsonConf = ("{\n"
+ + " 'topicNames' : [ 'new-topic' ],\n"
+ + " 'topicsPattern' : 'new-topics-pattern',\n"
+ + " 'subscriptionName' : 'new-subscription',\n"
+ + " 'subscriptionType' : 'Key_Shared',\n"
+ + " 'subscriptionMode' : 'NonDurable',\n"
+ + " 'receiverQueueSize' : 2,\n"
+ + " 'acknowledgementsGroupTimeMicros' : 2,\n"
+ + " 'negativeAckRedeliveryDelayMicros' : 2,\n"
+ + " 'maxTotalReceiverQueueSizeAcrossPartitions' : 2,\n"
+ + " 'consumerName' : 'new-consumer',\n"
+ + " 'ackTimeoutMillis' : 2,\n"
+ + " 'tickDurationMillis' : 2,\n"
+ + " 'priorityLevel' : 2,\n"
+ + " 'maxPendingChunkedMessage' : 2,\n"
+ + " 'autoAckOldestChunkedMessageOnQueueFull' : true,\n"
+ + " 'expireTimeOfIncompleteChunkedMessageMillis' : 2,\n"
+ + " 'cryptoFailureAction' : 'DISCARD',\n"
+ + " 'properties' : {\n"
+ + " 'new-prop' : 'new-prop-value'\n"
+ + " },\n"
+ + " 'readCompacted' : true,\n"
+ + " 'subscriptionInitialPosition' : 'Earliest',\n"
+ + " 'patternAutoDiscoveryPeriod' : 2,\n"
+ + " 'regexSubscriptionMode' : 'AllTopics',\n"
+ + " 'deadLetterPolicy' : {\n"
+ + " 'retryLetterTopic' : 'new-retry',\n"
+ + " 'deadLetterTopic' : 'new-dlq',\n"
+ + " 'maxRedeliverCount' : 2\n"
+ + " },\n"
+ + " 'retryEnable' : true,\n"
+ + " 'autoUpdatePartitions' : false,\n"
+ + " 'autoUpdatePartitionsIntervalSeconds' : 2,\n"
+ + " 'replicateSubscriptionState' : true,\n"
+ + " 'resetIncludeHead' : true,\n"
+ + " 'batchIndexAckEnabled' : true,\n"
+ + " 'ackReceiptEnabled' : true,\n"
+ + " 'poolMessages' : true\n"
+ + " }").replace("'", "\"");
+
+ Map<String, Object> conf = new ObjectMapper().readValue(jsonConf, new
TypeReference<HashMap<String,Object>>() {});
+
+ MessageListener<byte[]> messageListener = (consumer, message) -> {};
+ conf.put("messageListener", messageListener);
+ ConsumerEventListener consumerEventListener =
mock(ConsumerEventListener.class);
+ conf.put("consumerEventListener", consumerEventListener);
+ CryptoKeyReader cryptoKeyReader =
DefaultCryptoKeyReader.builder().build();
+ conf.put("cryptoKeyReader", cryptoKeyReader);
+ MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true);
+ conf.put("messageCrypto", messageCrypto);
+ BatchReceivePolicy batchReceivePolicy =
BatchReceivePolicy.builder().maxNumBytes(2).build();
+ conf.put("batchReceivePolicy", batchReceivePolicy);
+ KeySharedPolicy keySharedPolicy = KeySharedPolicy.stickyHashRange();
+ conf.put("keySharedPolicy", keySharedPolicy);
+ MessagePayloadProcessor payloadProcessor =
mock(MessagePayloadProcessor.class);
+ conf.put("payloadProcessor", payloadProcessor);
+
+ consumerBuilder.loadConf(conf);
+
+ ConsumerConfigurationData<byte[]> configurationData =
consumerBuilder.getConf();
+ assertEquals(configurationData.getTopicNames(), new
HashSet<>(Collections.singletonList("new-topic")));
+ assertEquals(configurationData.getTopicsPattern().pattern(),
"new-topics-pattern");
+ assertEquals(configurationData.getSubscriptionName(),
"new-subscription");
+ assertEquals(configurationData.getSubscriptionType(),
SubscriptionType.Key_Shared);
+ assertEquals(configurationData.getSubscriptionMode(),
SubscriptionMode.NonDurable);
+ assertEquals(configurationData.getReceiverQueueSize(), 2);
+ assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(),
2);
+ assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(),
2);
+
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(),
2);
+ assertEquals(configurationData.getConsumerName(), "new-consumer");
+ assertEquals(configurationData.getAckTimeoutMillis(), 2);
+ assertEquals(configurationData.getTickDurationMillis(), 2);
+ assertEquals(configurationData.getPriorityLevel(), 2);
+ assertEquals(configurationData.getMaxPendingChunkedMessage(), 2);
+
assertTrue(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
+
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(),
2);
+ assertEquals(configurationData.getCryptoFailureAction(),
ConsumerCryptoFailureAction.DISCARD);
+ assertThat(configurationData.getProperties()).hasSize(1)
+ .hasFieldOrPropertyWithValue("new-prop", "new-prop-value");
+ assertTrue(configurationData.isReadCompacted());
+ assertEquals(configurationData.getSubscriptionInitialPosition(),
SubscriptionInitialPosition.Earliest);
+ assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 2);
+ assertEquals(configurationData.getRegexSubscriptionMode(),
RegexSubscriptionMode.AllTopics);
+
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(),
"new-dlq");
+
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(),
"new-retry");
+
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2);
+ assertTrue(configurationData.isRetryEnable());
+ assertFalse(configurationData.isAutoUpdatePartitions());
+
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
+ assertTrue(configurationData.isReplicateSubscriptionState());
+ assertTrue(configurationData.isResetIncludeHead());
+ assertTrue(configurationData.isBatchIndexAckEnabled());
+ assertTrue(configurationData.isAckReceiptEnabled());
+ assertTrue(configurationData.isPoolMessages());
+
+ assertNull(configurationData.getMessageListener());
+ assertNull(configurationData.getConsumerEventListener());
+ assertNull(configurationData.getMessageListener());
+ assertNull(configurationData.getMessageCrypto());
+ assertNull(configurationData.getCryptoKeyReader());
+ assertNull(configurationData.getBatchReceivePolicy());
+ assertNull(configurationData.getKeySharedPolicy());
+ assertNull(configurationData.getPayloadProcessor());
+ }
+
+ @Test
+ public void testLoadConfNotModified() {
+ ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+
+ consumerBuilder.loadConf(new HashMap<>());
+
+ ConsumerConfigurationData<byte[]> configurationData =
consumerBuilder.getConf();
+ assertEquals(configurationData.getTopicNames(), new
HashSet<>(Collections.singletonList("topic")));
+ assertEquals(configurationData.getTopicsPattern().pattern(),
"topics-pattern");
+ assertEquals(configurationData.getSubscriptionName(), "subscription");
+ assertEquals(configurationData.getSubscriptionType(),
SubscriptionType.Exclusive);
+ assertEquals(configurationData.getSubscriptionMode(),
SubscriptionMode.Durable);
+ assertEquals(configurationData.getReceiverQueueSize(), 1000);
+ assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(),
TimeUnit.MILLISECONDS.toMicros(100));
+ assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(),
TimeUnit.MINUTES.toMicros(1));
+
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(),
50000);
+ assertEquals(configurationData.getConsumerName(), "consumer");
+ assertEquals(configurationData.getAckTimeoutMillis(), 30000);
+ assertEquals(configurationData.getTickDurationMillis(), 1000);
+ assertEquals(configurationData.getPriorityLevel(), 0);
+ assertEquals(configurationData.getMaxPendingChunkedMessage(), 10);
+
assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
+
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(),
TimeUnit.MINUTES.toMillis(1));
+ assertEquals(configurationData.getCryptoFailureAction(),
ConsumerCryptoFailureAction.FAIL);
+ assertThat(configurationData.getProperties()).hasSize(1)
+ .hasFieldOrPropertyWithValue("prop", "prop-value");
+ assertFalse(configurationData.isReadCompacted());
+ assertEquals(configurationData.getSubscriptionInitialPosition(),
SubscriptionInitialPosition.Latest);
+ assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 60);
+ assertEquals(configurationData.getRegexSubscriptionMode(),
RegexSubscriptionMode.PersistentOnly);
+
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(),
"dlq");
+
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(),
"retry");
+
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 1);
+ assertFalse(configurationData.isRetryEnable());
+ assertTrue(configurationData.isAutoUpdatePartitions());
+
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
+ assertFalse(configurationData.isReplicateSubscriptionState());
+ assertFalse(configurationData.isResetIncludeHead());
+ assertFalse(configurationData.isBatchIndexAckEnabled());
+ assertFalse(configurationData.isAckReceiptEnabled());
+ assertFalse(configurationData.isPoolMessages());
+
+ assertNull(configurationData.getMessageListener());
+ assertNull(configurationData.getConsumerEventListener());
+ assertNull(configurationData.getMessageListener());
+ assertNull(configurationData.getMessageCrypto());
+ assertNull(configurationData.getCryptoKeyReader());
+ assertNull(configurationData.getBatchReceivePolicy());
+ assertNull(configurationData.getKeySharedPolicy());
+ assertNull(configurationData.getPayloadProcessor());
+ }
+
+ private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
+ ConsumerBuilderImpl<byte[]> consumerBuilder = new
ConsumerBuilderImpl<>(null, Schema.BYTES);
+ Map<String, String> properties = new HashMap<>();
+ properties.put("prop", "prop-value");
+ consumerBuilder
+ .topic("topic")
+ .topicsPattern("topics-pattern")
+ .subscriptionName("subscription")
+ .messageListener((consumer, message) -> {})
+ .consumerEventListener(mock(ConsumerEventListener.class))
+ .consumerName("consumer")
+ .cryptoKeyReader(DefaultCryptoKeyReader.builder().build())
+ .messageCrypto(new MessageCryptoBc("ctx1", true))
+ .properties(properties)
+
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("dlq").retryLetterTopic("retry").maxRedeliverCount(1).build())
+
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build())
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+ .messagePayloadProcessor(mock(MessagePayloadProcessor.class));
+ return consumerBuilder;
+ }
+
}