This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0252671d329 [fix][broker] Fix MessageDeduplication replay timeout cause topic loading stuck (#23004) 0252671d329 is described below commit 0252671d32976bbae9a8f083648bfccbabbec402 Author: ken <1647023...@qq.com> AuthorDate: Sat Jul 6 06:26:28 2024 +0800 [fix][broker] Fix MessageDeduplication replay timeout cause topic loading stuck (#23004) Co-authored-by: fanjianye <fanjia...@bigo.sg> (cherry picked from commit 41ef3f6fb1c0b209307d7b4e14300a377c52c5ab) --- .../service/persistent/MessageDeduplication.java | 14 ++- .../service/persistent/TopicDuplicationTest.java | 103 +++++++++++++++++++++ 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index ab3b799093b..1715e09dc7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -159,11 +159,12 @@ public class MessageDeduplication { log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); CompletableFuture<Position> future = new CompletableFuture<>(); replayCursor(future); - return future.thenAccept(lastPosition -> { + return future.thenCompose(lastPosition -> { if (lastPosition != null && snapshotCounter >= snapshotInterval) { snapshotCounter = 0; - takeSnapshot(lastPosition); + return takeSnapshot(lastPosition); } + return CompletableFuture.completedFuture(null); }); } @@ -438,13 +439,15 @@ public class MessageDeduplication { } } - private void takeSnapshot(Position position) { + private CompletableFuture<Void> takeSnapshot(Position position) { + CompletableFuture<Void> future = new CompletableFuture<>(); if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } if (!snapshotTaking.compareAndSet(false, true)) { - return; + future.complete(null); + return future; } Map<String, Long> snapshot = new TreeMap<>(); @@ -462,14 +465,17 @@ public class MessageDeduplication { } lastSnapshotTimestamp = System.currentTimeMillis(); snapshotTaking.set(false); + future.complete(null); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position); snapshotTaking.set(false); + future.completeExceptionally(exception); } }, null); + return future; } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index 16721ca1203..7069a843e98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -25,6 +26,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Field; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -33,12 +35,18 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -529,6 +537,101 @@ public class TopicDuplicationTest extends ProducerConsumerBase { persistentTopic.checkDeduplicationSnapshot(); } + @Test + public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { + cleanup(); + setup(); + + // Create a topic and wait deduplication is started. + int brokerDeduplicationEntriesInterval = 1000; + pulsar.getConfiguration().setBrokerDeduplicationEnabled(true); + pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval); + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + final PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + Awaitility.await().untilAsserted(() -> { + ManagedCursorImpl cursor1 = + (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); + assertNotNull(cursor1); + }); + final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication(); + + + // Send 999 messages, it is less than "brokerDeduplicationEntriesInterval". + // So it would not trigger takeSnapshot + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic).enableBatching(false).create(); + for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) { + producer.send(i + ""); + } + producer.close(); + int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); + assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + + + // Unload and load topic, simulate topic load is timeout. + // SetBrokerDeduplicationEntriesInterval to 10, therefore recoverSequenceIdsMap#takeSnapshot + // would trigger and should update the snapshot position. + // However, if topic close and takeSnapshot are concurrent, + // it would result in takeSnapshot throw exception + admin.topics().unload(topic); + pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10); + + // Mock message deduplication recovery speed topicLoadTimeoutSeconds + pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1); + String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + + TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + mockZooKeeper.delay(2 * 1000, (op, path) -> { + if (mlPath.equals(path)) { + return true; + } + return false; + }); + + Field field2 = BrokerService.class.getDeclaredField("topics"); + field2.setAccessible(true); + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = + (ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) + field2.get(pulsar.getBrokerService()); + + try { + pulsar.getBrokerService().getTopic(topic, false).join().get(); + Assert.fail(); + } catch (Exception e) { + // topic loading should timeout. + } + Awaitility.await().untilAsserted(() -> { + // topic loading timeout then close topic and remove from topicsMap + Assert.assertFalse(topics.containsKey(topic)); + }); + + + // Load topic again, setBrokerDeduplicationEntriesInterval to 10000, + // make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot. + // But actually it should not replay again in recoverSequenceIdsMap, + // since previous topic loading should finish the replay process. + pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000); + pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger(); + MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication(); + + Awaitility.await().untilAsserted(() -> { + int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); + Assert.assertEquals(snapshotCounter3, 0); + Assert.assertEquals(ml2.getLedgersInfo().size(), 1); + }); + + + // cleanup. + admin.topics().delete(topic); + cleanup(); + setup(); + } + private void waitCacheInit(String topicName) throws Exception { pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close(); TopicName topic = TopicName.get(topicName);