This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 736265cb29a [fix][broker] fix MessageDeduplication throw NPE when enable broker dedup and set namespace disable deduplication. (#20905) 736265cb29a is described below commit 736265cb29ad2251286e04bee762eec8442368ea Author: lifepuzzlefun <wjl_is_...@163.com> AuthorDate: Tue Aug 1 13:16:25 2023 +0800 [fix][broker] fix MessageDeduplication throw NPE when enable broker dedup and set namespace disable deduplication. (#20905) --- .../service/persistent/MessageDeduplication.java | 4 +++ .../service/persistent/TopicDuplicationTest.java | 37 ++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index ed4e70bfd29..490be4a8876 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -482,6 +482,10 @@ public class MessageDeduplication { } public void takeSnapshot() { + if (!isEnabled()) { + return; + } + Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get(); long currentTimeStamp = System.currentTimeMillis(); if (interval == null || interval <= 0 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index e57092d02dd..16721ca1203 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -492,6 +492,43 @@ public class TopicDuplicationTest extends ProducerConsumerBase { } + @Test(timeOut = 30000) + public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() throws Exception { + cleanup(); + conf.setBrokerDeduplicationEnabled(true); + conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1); + conf.setBrokerDeduplicationSnapshotIntervalSeconds(1); + conf.setBrokerDeduplicationEntriesInterval(20000); + setup(); + + final String topicName = testTopic + UUID.randomUUID().toString(); + final String producerName = "my-producer"; + @Cleanup + Producer<String> producer = pulsarClient + .newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create(); + + // disable deduplication + admin.namespaces().setDeduplicationStatus(myNamespace, false); + + int msgNum = 50; + CountDownLatch countDownLatch = new CountDownLatch(msgNum); + for (int i = 0; i < msgNum; i++) { + producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown()); + } + countDownLatch.await(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicIfExists(topicName).get().get(); + ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor(); + + // when disable topic deduplication the cursor should be deleted. + assertNull(managedCursor); + + // this method will be called at brokerService forEachTopic. + // if topic level disable deduplication. + // this method should be skipped without throw exception. + persistentTopic.checkDeduplicationSnapshot(); + } + private void waitCacheInit(String topicName) throws Exception { pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close(); TopicName topic = TopicName.get(topicName);