This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 462f70ee463 [fix][client] Fix exception when calling loadConf on a 
ConsumerBuilder that has a KeySharedPolicy (#18345)
462f70ee463 is described below

commit 462f70ee463e8478a7c045fa77d55dc0349510c8
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 16 01:45:24 2022 +0100

    [fix][client] Fix exception when calling loadConf on a ConsumerBuilder that 
has a KeySharedPolicy (#18345)
    
    (cherry picked from commit 9c2ec5e218b4743430d654c84f7e26f663b126f1)
---
 .../impl/conf/ConsumerConfigurationData.java       |   1 +
 .../client/impl/ConsumerBuilderImplTest.java       | 201 +++++++++++++++++++++
 2 files changed, 202 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 107694637b3..b7d76809f37 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -142,6 +142,7 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private boolean resetIncludeHead = false;
 
+    @JsonIgnore
     private transient KeySharedPolicy keySharedPolicy;
 
     private boolean batchIndexAckEnabled = false;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index d648ee75af7..4d33d00582c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -18,12 +18,18 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -31,14 +37,27 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Unit tests of {@link ConsumerBuilderImpl}.
@@ -317,4 +336,186 @@ public class ConsumerBuilderImplTest {
         consumerBuilderImpl.subscriptionMode(SubscriptionMode.NonDurable)
             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
     }
+    @Test
+    public void testLoadConf() throws Exception {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+
+        String jsonConf = ("{\n"
+                + "    'topicNames' : [ 'new-topic' ],\n"
+                + "    'topicsPattern' : 'new-topics-pattern',\n"
+                + "    'subscriptionName' : 'new-subscription',\n"
+                + "    'subscriptionType' : 'Key_Shared',\n"
+                + "    'subscriptionMode' : 'NonDurable',\n"
+                + "    'receiverQueueSize' : 2,\n"
+                + "    'acknowledgementsGroupTimeMicros' : 2,\n"
+                + "    'negativeAckRedeliveryDelayMicros' : 2,\n"
+                + "    'maxTotalReceiverQueueSizeAcrossPartitions' : 2,\n"
+                + "    'consumerName' : 'new-consumer',\n"
+                + "    'ackTimeoutMillis' : 2,\n"
+                + "    'tickDurationMillis' : 2,\n"
+                + "    'priorityLevel' : 2,\n"
+                + "    'maxPendingChunkedMessage' : 2,\n"
+                + "    'autoAckOldestChunkedMessageOnQueueFull' : true,\n"
+                + "    'expireTimeOfIncompleteChunkedMessageMillis' : 2,\n"
+                + "    'cryptoFailureAction' : 'DISCARD',\n"
+                + "    'properties' : {\n"
+                + "      'new-prop' : 'new-prop-value'\n"
+                + "    },\n"
+                + "    'readCompacted' : true,\n"
+                + "    'subscriptionInitialPosition' : 'Earliest',\n"
+                + "    'patternAutoDiscoveryPeriod' : 2,\n"
+                + "    'regexSubscriptionMode' : 'AllTopics',\n"
+                + "    'deadLetterPolicy' : {\n"
+                + "      'retryLetterTopic' : 'new-retry',\n"
+                + "      'deadLetterTopic' : 'new-dlq',\n"
+                + "      'maxRedeliverCount' : 2\n"
+                + "    },\n"
+                + "    'retryEnable' : true,\n"
+                + "    'autoUpdatePartitions' : false,\n"
+                + "    'autoUpdatePartitionsIntervalSeconds' : 2,\n"
+                + "    'replicateSubscriptionState' : true,\n"
+                + "    'resetIncludeHead' : true,\n"
+                + "    'batchIndexAckEnabled' : true,\n"
+                + "    'ackReceiptEnabled' : true,\n"
+                + "    'poolMessages' : true\n"
+                + "  }").replace("'", "\"");
+
+        Map<String, Object> conf = new ObjectMapper().readValue(jsonConf, new 
TypeReference<HashMap<String,Object>>() {});
+
+        MessageListener<byte[]> messageListener = (consumer, message) -> {};
+        conf.put("messageListener", messageListener);
+        ConsumerEventListener consumerEventListener = 
mock(ConsumerEventListener.class);
+        conf.put("consumerEventListener", consumerEventListener);
+        CryptoKeyReader cryptoKeyReader = 
DefaultCryptoKeyReader.builder().build();
+        conf.put("cryptoKeyReader", cryptoKeyReader);
+        MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true);
+        conf.put("messageCrypto", messageCrypto);
+        BatchReceivePolicy batchReceivePolicy = 
BatchReceivePolicy.builder().maxNumBytes(2).build();
+        conf.put("batchReceivePolicy", batchReceivePolicy);
+        KeySharedPolicy keySharedPolicy = KeySharedPolicy.stickyHashRange();
+        conf.put("keySharedPolicy", keySharedPolicy);
+        MessagePayloadProcessor payloadProcessor = 
mock(MessagePayloadProcessor.class);
+        conf.put("payloadProcessor", payloadProcessor);
+
+        consumerBuilder.loadConf(conf);
+
+        ConsumerConfigurationData<byte[]> configurationData = 
consumerBuilder.getConf();
+        assertEquals(configurationData.getTopicNames(), new 
HashSet<>(Collections.singletonList("new-topic")));
+        assertEquals(configurationData.getTopicsPattern().pattern(), 
"new-topics-pattern");
+        assertEquals(configurationData.getSubscriptionName(), 
"new-subscription");
+        assertEquals(configurationData.getSubscriptionType(), 
SubscriptionType.Key_Shared);
+        assertEquals(configurationData.getSubscriptionMode(), 
SubscriptionMode.NonDurable);
+        assertEquals(configurationData.getReceiverQueueSize(), 2);
+        assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), 
2);
+        assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), 
2);
+        
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 
2);
+        assertEquals(configurationData.getConsumerName(), "new-consumer");
+        assertEquals(configurationData.getAckTimeoutMillis(), 2);
+        assertEquals(configurationData.getTickDurationMillis(), 2);
+        assertEquals(configurationData.getPriorityLevel(), 2);
+        assertEquals(configurationData.getMaxPendingChunkedMessage(), 2);
+        
assertTrue(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
+        
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 
2);
+        assertEquals(configurationData.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.DISCARD);
+        assertThat(configurationData.getProperties()).hasSize(1)
+                .hasFieldOrPropertyWithValue("new-prop", "new-prop-value");
+        assertTrue(configurationData.isReadCompacted());
+        assertEquals(configurationData.getSubscriptionInitialPosition(), 
SubscriptionInitialPosition.Earliest);
+        assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 2);
+        assertEquals(configurationData.getRegexSubscriptionMode(), 
RegexSubscriptionMode.AllTopics);
+        
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), 
"new-dlq");
+        
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), 
"new-retry");
+        
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2);
+        assertTrue(configurationData.isRetryEnable());
+        assertFalse(configurationData.isAutoUpdatePartitions());
+        
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
+        assertTrue(configurationData.isReplicateSubscriptionState());
+        assertTrue(configurationData.isResetIncludeHead());
+        assertTrue(configurationData.isBatchIndexAckEnabled());
+        assertTrue(configurationData.isAckReceiptEnabled());
+        assertTrue(configurationData.isPoolMessages());
+
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getConsumerEventListener());
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getMessageCrypto());
+        assertNull(configurationData.getCryptoKeyReader());
+        assertNull(configurationData.getBatchReceivePolicy());
+        assertNull(configurationData.getKeySharedPolicy());
+        assertNull(configurationData.getPayloadProcessor());
+    }
+
+    @Test
+    public void testLoadConfNotModified() {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+
+        consumerBuilder.loadConf(new HashMap<>());
+
+        ConsumerConfigurationData<byte[]> configurationData = 
consumerBuilder.getConf();
+        assertEquals(configurationData.getTopicNames(), new 
HashSet<>(Collections.singletonList("topic")));
+        assertEquals(configurationData.getTopicsPattern().pattern(), 
"topics-pattern");
+        assertEquals(configurationData.getSubscriptionName(), "subscription");
+        assertEquals(configurationData.getSubscriptionType(), 
SubscriptionType.Exclusive);
+        assertEquals(configurationData.getSubscriptionMode(), 
SubscriptionMode.Durable);
+        assertEquals(configurationData.getReceiverQueueSize(), 1000);
+        assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), 
TimeUnit.MILLISECONDS.toMicros(100));
+        assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), 
TimeUnit.MINUTES.toMicros(1));
+        
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 
50000);
+        assertEquals(configurationData.getConsumerName(), "consumer");
+        assertEquals(configurationData.getAckTimeoutMillis(), 30000);
+        assertEquals(configurationData.getTickDurationMillis(), 1000);
+        assertEquals(configurationData.getPriorityLevel(), 0);
+        assertEquals(configurationData.getMaxPendingChunkedMessage(), 10);
+        
assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
+        
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 
TimeUnit.MINUTES.toMillis(1));
+        assertEquals(configurationData.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+        assertThat(configurationData.getProperties()).hasSize(1)
+                .hasFieldOrPropertyWithValue("prop", "prop-value");
+        assertFalse(configurationData.isReadCompacted());
+        assertEquals(configurationData.getSubscriptionInitialPosition(), 
SubscriptionInitialPosition.Latest);
+        assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 60);
+        assertEquals(configurationData.getRegexSubscriptionMode(), 
RegexSubscriptionMode.PersistentOnly);
+        
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), 
"dlq");
+        
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), 
"retry");
+        
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 1);
+        assertFalse(configurationData.isRetryEnable());
+        assertTrue(configurationData.isAutoUpdatePartitions());
+        
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
+        assertFalse(configurationData.isReplicateSubscriptionState());
+        assertFalse(configurationData.isResetIncludeHead());
+        assertFalse(configurationData.isBatchIndexAckEnabled());
+        assertFalse(configurationData.isAckReceiptEnabled());
+        assertFalse(configurationData.isPoolMessages());
+
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getConsumerEventListener());
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getMessageCrypto());
+        assertNull(configurationData.getCryptoKeyReader());
+        assertNull(configurationData.getBatchReceivePolicy());
+        assertNull(configurationData.getKeySharedPolicy());
+        assertNull(configurationData.getPayloadProcessor());
+    }
+
+    private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = new 
ConsumerBuilderImpl<>(null, Schema.BYTES);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("prop", "prop-value");
+        consumerBuilder
+                .topic("topic")
+                .topicsPattern("topics-pattern")
+                .subscriptionName("subscription")
+                .messageListener((consumer, message) -> {})
+                .consumerEventListener(mock(ConsumerEventListener.class))
+                .consumerName("consumer")
+                .cryptoKeyReader(DefaultCryptoKeyReader.builder().build())
+                .messageCrypto(new MessageCryptoBc("ctx1", true))
+                .properties(properties)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("dlq").retryLetterTopic("retry").maxRedeliverCount(1).build())
+                
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build())
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+                .messagePayloadProcessor(mock(MessagePayloadProcessor.class));
+        return consumerBuilder;
+    }
+
 }

Reply via email to