This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 736265cb29a [fix][broker] fix MessageDeduplication throw NPE when 
enable broker dedup and set namespace disable deduplication. (#20905)
736265cb29a is described below

commit 736265cb29ad2251286e04bee762eec8442368ea
Author: lifepuzzlefun <wjl_is_...@163.com>
AuthorDate: Tue Aug 1 13:16:25 2023 +0800

    [fix][broker] fix MessageDeduplication throw NPE when enable broker dedup 
and set namespace disable deduplication. (#20905)
---
 .../service/persistent/MessageDeduplication.java   |  4 +++
 .../service/persistent/TopicDuplicationTest.java   | 37 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ed4e70bfd29..490be4a8876 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -482,6 +482,10 @@ public class MessageDeduplication {
     }
 
     public void takeSnapshot() {
+        if (!isEnabled()) {
+            return;
+        }
+
         Integer interval = 
topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
         long currentTimeStamp = System.currentTimeMillis();
         if (interval == null || interval <= 0
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index e57092d02dd..16721ca1203 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -492,6 +492,43 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
 
     }
 
+    @Test(timeOut = 30000)
+    public void 
testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() throws 
Exception {
+        cleanup();
+        conf.setBrokerDeduplicationEnabled(true);
+        conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
+        conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
+        conf.setBrokerDeduplicationEntriesInterval(20000);
+        setup();
+
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String producerName = "my-producer";
+        @Cleanup
+        Producer<String> producer = pulsarClient
+                
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
+
+        // disable deduplication
+        admin.namespaces().setDeduplicationStatus(myNamespace, false);
+
+        int msgNum = 50;
+        CountDownLatch countDownLatch = new CountDownLatch(msgNum);
+        for (int i = 0; i < msgNum; i++) {
+            producer.newMessage().value("msg" + 
i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
+        }
+        countDownLatch.await();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopicIfExists(topicName).get().get();
+        ManagedCursor managedCursor = 
persistentTopic.getMessageDeduplication().getManagedCursor();
+
+        // when disable topic deduplication the cursor should be deleted.
+        assertNull(managedCursor);
+
+        // this method will be called at brokerService forEachTopic.
+        // if topic level disable deduplication.
+        // this method should be skipped without throw exception.
+        persistentTopic.checkDeduplicationSnapshot();
+    }
+
     private void waitCacheInit(String topicName) throws Exception {
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);

Reply via email to