shibd commented on code in PR #23004: URL: https://github.com/apache/pulsar/pull/23004#discussion_r1666137904
########## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java: ########## @@ -529,6 +538,88 @@ public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() thro persistentTopic.checkDeduplicationSnapshot(); } + @Test + public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { + cleanup(); + setup(); + + // Create a topic and wait deduplication is started. + int brokerDeduplicationEntriesInterval = 1000; + pulsar.getConfiguration().setBrokerDeduplicationEnabled(true); + pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval); + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + final PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + ManagedCursorImpl cursor1 = + (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + assertNotNull(cursor1); + }); + final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication(); + + + // Send 999 messages, it is less than "brokerDeduplicationEntriesInterval". + // So it would not trigger takeSnapshot + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic).enableBatching(false).create(); + for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) { + producer.send(i + ""); + } + producer.close(); + int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); + assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + + + // Unload and load topic, simulate topic load is timeout. + // SetBrokerDeduplicationEntriesInterval to 10, therefore recoverSequenceIdsMap#takeSnapshot + // would trigger and should update the snapshot position. + // However, if topic close and takeSnapshot are concurrent, + // it would result in takeSnapshot throw exception + admin.topics().unload(topic); + pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10); + + Field field2 = BrokerService.class.getDeclaredField("topics"); + field2.setAccessible(true); + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = + (ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) + field2.get(pulsar.getBrokerService()); + + pulsar.getBrokerService().getTopic(topic, false); + Assert.assertTrue(topics.containsKey(topic)); + CompletableFuture<Optional<Topic>> future = topics.get(topic); + future.completeExceptionally(FutureUtil.createTimeoutException("Failed to load topic within timeout", Review Comment: https://github.com/apache/pulsar/blob/a91a172b4ee6d8b974a3fa905e435975557fcc57/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java#L127-L137 Maybe we can refer to these code to mock topic load timeout and deduplication take snapshot timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org