TakaHiR07 commented on code in PR #23004:
URL: https://github.com/apache/pulsar/pull/23004#discussion_r1666175402


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java:
##########
@@ -529,6 +538,88 @@ public void 
testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() thro
         persistentTopic.checkDeduplicationSnapshot();
     }
 
+    @Test
+    public void testFinishTakeSnapshotWhenTopicLoading() throws Exception {
+        cleanup();
+        setup();
+
+        // Create a topic and wait deduplication is started.
+        int brokerDeduplicationEntriesInterval = 1000;
+        pulsar.getConfiguration().setBrokerDeduplicationEnabled(true);
+        
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        final PersistentTopic persistentTopic1 =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl cursor1 =
+                    (ManagedCursorImpl) 
ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+            assertNotNull(cursor1);
+        });
+        final MessageDeduplication deduplication1 = 
persistentTopic1.getMessageDeduplication();
+
+
+        // Send 999 messages, it is less than 
"brokerDeduplicationEntriesInterval".
+        // So it would not trigger takeSnapshot
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).enableBatching(false).create();
+        for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
+            producer.send(i + "");
+        }
+        producer.close();
+        int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, 
"snapshotCounter");
+        assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
+
+
+        // Unload and load topic, simulate topic load is timeout.
+        // SetBrokerDeduplicationEntriesInterval to 10, therefore 
recoverSequenceIdsMap#takeSnapshot
+        // would trigger and should update the snapshot position.
+        // However, if topic close and takeSnapshot are concurrent,
+        // it would result in takeSnapshot throw exception
+        admin.topics().unload(topic);
+        pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10);
+
+        Field field2 = BrokerService.class.getDeclaredField("topics");
+        field2.setAccessible(true);
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics =
+                (ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>>)
+                        field2.get(pulsar.getBrokerService());
+
+        pulsar.getBrokerService().getTopic(topic, false);
+        Assert.assertTrue(topics.containsKey(topic));
+        CompletableFuture<Optional<Topic>> future = topics.get(topic);
+        future.completeExceptionally(FutureUtil.createTimeoutException("Failed 
to load topic within timeout",

Review Comment:
   That's helpful. Thx. Have updated the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to