This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0252671d329 [fix][broker] Fix MessageDeduplication replay timeout 
cause topic loading stuck (#23004)
0252671d329 is described below

commit 0252671d32976bbae9a8f083648bfccbabbec402
Author: ken <1647023...@qq.com>
AuthorDate: Sat Jul 6 06:26:28 2024 +0800

    [fix][broker] Fix MessageDeduplication replay timeout cause topic loading 
stuck (#23004)
    
    Co-authored-by: fanjianye <fanjia...@bigo.sg>
    (cherry picked from commit 41ef3f6fb1c0b209307d7b4e14300a377c52c5ab)
---
 .../service/persistent/MessageDeduplication.java   |  14 ++-
 .../service/persistent/TopicDuplicationTest.java   | 103 +++++++++++++++++++++
 2 files changed, 113 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ab3b799093b..1715e09dc7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -159,11 +159,12 @@ public class MessageDeduplication {
         log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
         CompletableFuture<Position> future = new CompletableFuture<>();
         replayCursor(future);
-        return future.thenAccept(lastPosition -> {
+        return future.thenCompose(lastPosition -> {
             if (lastPosition != null && snapshotCounter >= snapshotInterval) {
                 snapshotCounter = 0;
-                takeSnapshot(lastPosition);
+                return takeSnapshot(lastPosition);
             }
+            return CompletableFuture.completedFuture(null);
         });
     }
 
@@ -438,13 +439,15 @@ public class MessageDeduplication {
         }
     }
 
-    private void takeSnapshot(Position position) {
+    private CompletableFuture<Void> takeSnapshot(Position position) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Taking snapshot of sequence ids map", 
topic.getName());
         }
 
         if (!snapshotTaking.compareAndSet(false, true)) {
-            return;
+            future.complete(null);
+            return future;
         }
 
         Map<String, Long> snapshot = new TreeMap<>();
@@ -462,14 +465,17 @@ public class MessageDeduplication {
                 }
                 lastSnapshotTimestamp = System.currentTimeMillis();
                 snapshotTaking.set(false);
+                future.complete(null);
             }
 
             @Override
             public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
                 log.warn("[{}] Failed to store new deduplication snapshot at 
{}", topic.getName(), position);
                 snapshotTaking.set(false);
+                future.completeExceptionally(exception);
             }
         }, null);
+        return future;
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index 16721ca1203..7069a843e98 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -25,6 +26,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import java.lang.reflect.Field;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -33,12 +35,18 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -529,6 +537,101 @@ public class TopicDuplicationTest extends 
ProducerConsumerBase {
         persistentTopic.checkDeduplicationSnapshot();
     }
 
+    @Test
+    public void testFinishTakeSnapshotWhenTopicLoading() throws Exception {
+        cleanup();
+        setup();
+
+        // Create a topic and wait deduplication is started.
+        int brokerDeduplicationEntriesInterval = 1000;
+        pulsar.getConfiguration().setBrokerDeduplicationEnabled(true);
+        
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        final PersistentTopic persistentTopic1 =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl cursor1 =
+                    (ManagedCursorImpl) 
ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+            assertNotNull(cursor1);
+        });
+        final MessageDeduplication deduplication1 = 
persistentTopic1.getMessageDeduplication();
+
+
+        // Send 999 messages, it is less than 
"brokerDeduplicationEntriesInterval".
+        // So it would not trigger takeSnapshot
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).enableBatching(false).create();
+        for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
+            producer.send(i + "");
+        }
+        producer.close();
+        int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, 
"snapshotCounter");
+        assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
+
+
+        // Unload and load topic, simulate topic load is timeout.
+        // SetBrokerDeduplicationEntriesInterval to 10, therefore 
recoverSequenceIdsMap#takeSnapshot
+        // would trigger and should update the snapshot position.
+        // However, if topic close and takeSnapshot are concurrent,
+        // it would result in takeSnapshot throw exception
+        admin.topics().unload(topic);
+        pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10);
+
+        // Mock message deduplication recovery speed topicLoadTimeoutSeconds
+        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1);
+        String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
+                TopicName.get(topic).getPersistenceNamingEncoding() + "/" + 
DEDUPLICATION_CURSOR_NAME;
+        mockZooKeeper.delay(2 * 1000, (op, path) -> {
+            if (mlPath.equals(path)) {
+                return true;
+            }
+            return false;
+        });
+
+        Field field2 = BrokerService.class.getDeclaredField("topics");
+        field2.setAccessible(true);
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics =
+                (ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>>)
+                        field2.get(pulsar.getBrokerService());
+
+        try {
+            pulsar.getBrokerService().getTopic(topic, false).join().get();
+            Assert.fail();
+        } catch (Exception e) {
+            // topic loading should timeout.
+        }
+        Awaitility.await().untilAsserted(() -> {
+            // topic loading timeout then close topic and remove from topicsMap
+            Assert.assertFalse(topics.containsKey(topic));
+        });
+
+
+        // Load topic again, setBrokerDeduplicationEntriesInterval to 10000,
+        // make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot.
+        // But actually it should not replay again in recoverSequenceIdsMap,
+        // since previous topic loading should finish the replay process.
+        pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000);
+        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60);
+        PersistentTopic persistentTopic2 =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        ManagedLedgerImpl ml2 = (ManagedLedgerImpl) 
persistentTopic2.getManagedLedger();
+        MessageDeduplication deduplication2 = 
persistentTopic2.getMessageDeduplication();
+
+        Awaitility.await().untilAsserted(() -> {
+            int snapshotCounter3 = 
WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
+            Assert.assertEquals(snapshotCounter3, 0);
+            Assert.assertEquals(ml2.getLedgersInfo().size(), 1);
+        });
+
+
+        // cleanup.
+        admin.topics().delete(topic);
+        cleanup();
+        setup();
+    }
+
     private void waitCacheInit(String topicName) throws Exception {
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);

Reply via email to