This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f91b519f56622602dd2bd18ec3fa55748debd27f
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 15 12:07:48 2024 +0800

    Revert "[fix] [broker] Prevent long deduplication cursor backlog so that 
topic loading wouldn't timeout (#22479)"
    
    This reverts commit 7cec82f99c2c729e0e9ac42f7176eadf4039ae1a.
---
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 +--
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java      | 161 ---------------------
 4 files changed, 8 insertions(+), 177 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f19b3436f7b..d798d9e672e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -576,10 +576,8 @@ public class BrokerService implements Closeable {
     }
 
     protected void startDeduplicationSnapshotMonitor() {
-        // We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
-        // scheduled task runs.
         int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-        if (interval > 0) {
+        if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
             this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
                     .name("deduplication-snapshot-monitor")
                     .numThreads(1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e508661364d..802dd917961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,14 +157,9 @@ public class MessageDeduplication {
 
         // Replay all the entries and apply all the sequence ids updates
         log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-        CompletableFuture<Position> future = new CompletableFuture<>();
+        CompletableFuture<Void> future = new CompletableFuture<>();
         replayCursor(future);
-        return future.thenAccept(lastPosition -> {
-            if (lastPosition != null && snapshotCounter >= snapshotInterval) {
-                snapshotCounter = 0;
-                takeSnapshot(lastPosition);
-            }
-        });
+        return future;
     }
 
     /**
@@ -173,11 +168,11 @@ public class MessageDeduplication {
      *
      * @param future future to trigger when the replay is complete
      */
-    private void replayCursor(CompletableFuture<Position> future) {
+    private void replayCursor(CompletableFuture<Void> future) {
         managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
             @Override
             public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                Position lastPosition = null;
+
                 for (Entry entry : entries) {
                     ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
                     MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -187,8 +182,7 @@ public class MessageDeduplication {
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
                     producerRemoved(producerName);
-                    snapshotCounter++;
-                    lastPosition = entry.getPosition();
+
                     entry.release();
                 }
 
@@ -197,7 +191,7 @@ public class MessageDeduplication {
                     pulsar.getExecutor().execute(() -> replayCursor(future));
                 } else {
                     // Done replaying
-                    future.complete(lastPosition);
+                    future.complete(null);
                 }
             }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 29df6e78cd1..be1b873cc21 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -204,7 +204,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private volatile List<String> shadowTopics;
     private final TopicName shadowSourceTopic;
 
-    public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
+    static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
 
     public static boolean isDedupCursorName(String name) {
         return DEDUPLICATION_CURSOR_NAME.equals(name);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
deleted file mode 100644
index 2ce4ea9b00b..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.Schema;
-import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase 
{
-
-    private int deduplicationSnapshotFrequency = 5;
-    private int brokerDeduplicationEntriesInterval = 1000;
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    protected void doInitConf() throws Exception {
-        this.conf.setBrokerDeduplicationEnabled(false);
-        
this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency);
-        
this.conf.setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
-    }
-
-    @Test
-    public void testNoBacklogOnDeduplication() throws Exception {
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        admin.topics().createNonPartitionedTopic(topic);
-        final PersistentTopic persistentTopic =
-                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
-        final ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
-        // deduplication enabled:
-        //   broker level: "false"
-        //   topic level: "true".
-        // So it is enabled.
-        admin.topicPolicies().setDeduplicationStatus(topic, true);
-        Awaitility.await().untilAsserted(() -> {
-            ManagedCursorImpl cursor =
-                    (ManagedCursorImpl) 
ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
-            assertNotNull(cursor);
-        });
-
-        // Verify: regarding deduplication cursor, messages will be 
acknowledged automatically.
-        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
-        producer.send("1");
-        producer.send("2");
-        producer.send("3");
-        producer.close();
-        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
-        
Awaitility.await().atMost(Duration.ofSeconds(deduplicationSnapshotFrequency * 
3)).untilAsserted(() -> {
-            PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry();
-            PositionImpl cursorMD = (PositionImpl) 
cursor.getMarkDeletedPosition();
-            assertTrue(LAC.compareTo(cursorMD) <= 0);
-        });
-
-        // cleanup.
-        admin.topics().delete(topic);
-    }
-
-    @Test
-    public void testSnapshotCounterAfterUnload() throws Exception {
-        final int originalDeduplicationSnapshotFrequency = 
deduplicationSnapshotFrequency;
-        deduplicationSnapshotFrequency = 3600;
-        cleanup();
-        setup();
-
-        // Create a topic and wait deduplication is started.
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
-        admin.topics().createNonPartitionedTopic(topic);
-        final PersistentTopic persistentTopic1 =
-                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
-        final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
-        admin.topicPolicies().setDeduplicationStatus(topic, true);
-        Awaitility.await().untilAsserted(() -> {
-            ManagedCursorImpl cursor1 =
-                    (ManagedCursorImpl) 
ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
-            assertNotNull(cursor1);
-        });
-        final MessageDeduplication deduplication1 = 
persistentTopic1.getMessageDeduplication();
-
-        // 1. Send 999 messages, it is less than 
"brokerDeduplicationEntriesIntervaddl".
-        // 2. Unload topic.
-        // 3. Send 1 messages, there are 1099 messages have not been snapshot 
now.
-        // 4. Verify the snapshot has been taken.
-        // step 1.
-        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
-        for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
-            producer.send(i + "");
-        }
-        int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, 
"snapshotCounter");
-        assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
-        admin.topics().unload(topic);
-        PersistentTopic persistentTopic2 =
-                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
-        ManagedLedgerImpl ml2 = (ManagedLedgerImpl) 
persistentTopic2.getManagedLedger();
-        MessageDeduplication deduplication2 = 
persistentTopic2.getMessageDeduplication();
-        admin.topicPolicies().setDeduplicationStatus(topic, true);
-        Awaitility.await().untilAsserted(() -> {
-            ManagedCursorImpl cursor =
-                    (ManagedCursorImpl) 
ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
-            assertNotNull(cursor);
-        });
-        // step 3.
-        producer.send("last message");
-        ml2.trimConsumedLedgersInBackground(new CompletableFuture<>());
-        // step 4.
-        Awaitility.await().untilAsserted(() -> {
-            int snapshotCounter3 = 
WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
-            assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval);
-            // Verify: the previous ledger will be removed because all 
messages have been acked.
-            assertEquals(ml2.getLedgersInfo().size(), 1);
-        });
-
-        // cleanup.
-        producer.close();
-        admin.topics().delete(topic);
-        deduplicationSnapshotFrequency = 
originalDeduplicationSnapshotFrequency;
-        cleanup();
-        setup();
-    }
-}

Reply via email to