This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f91b519f56622602dd2bd18ec3fa55748debd27f Author: fengyubiao <[email protected]> AuthorDate: Mon Apr 15 12:07:48 2024 +0800 Revert "[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)" This reverts commit 7cec82f99c2c729e0e9ac42f7176eadf4039ae1a. --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 +-- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 161 --------------------- 4 files changed, 8 insertions(+), 177 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f19b3436f7b..d798d9e672e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -576,10 +576,8 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { - // We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this - // scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); - if (interval > 0) { + if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d..802dd917961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,14 +157,9 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); - CompletableFuture<Position> future = new CompletableFuture<>(); + CompletableFuture<Void> future = new CompletableFuture<>(); replayCursor(future); - return future.thenAccept(lastPosition -> { - if (lastPosition != null && snapshotCounter >= snapshotInterval) { - snapshotCounter = 0; - takeSnapshot(lastPosition); - } - }); + return future; } /** @@ -173,11 +168,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ - private void replayCursor(CompletableFuture<Position> future) { + private void replayCursor(CompletableFuture<Void> future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List<Entry> entries, Object ctx) { - Position lastPosition = null; + for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -187,8 +182,7 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - snapshotCounter++; - lastPosition = entry.getPosition(); + entry.release(); } @@ -197,7 +191,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying - future.complete(lastPosition); + future.complete(null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 29df6e78cd1..be1b873cc21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -204,7 +204,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private volatile List<String> shadowTopics; private final TopicName shadowSourceTopic; - public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; + static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; public static boolean isDedupCursorName(String name) { return DEDUPLICATION_CURSOR_NAME.equals(name); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java deleted file mode 100644 index 2ce4ea9b00b..00000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.service.persistent.MessageDeduplication; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.Schema; -import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase { - - private int deduplicationSnapshotFrequency = 5; - private int brokerDeduplicationEntriesInterval = 1000; - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - producerBaseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - protected void doInitConf() throws Exception { - this.conf.setBrokerDeduplicationEnabled(false); - this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency); - this.conf.setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval); - } - - @Test - public void testNoBacklogOnDeduplication() throws Exception { - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(topic); - final PersistentTopic persistentTopic = - (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); - final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); - // deduplication enabled: - // broker level: "false" - // topic level: "true". - // So it is enabled. - admin.topicPolicies().setDeduplicationStatus(topic, true); - Awaitility.await().untilAsserted(() -> { - ManagedCursorImpl cursor = - (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); - assertNotNull(cursor); - }); - - // Verify: regarding deduplication cursor, messages will be acknowledged automatically. - Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); - producer.send("1"); - producer.send("2"); - producer.send("3"); - producer.close(); - ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); - Awaitility.await().atMost(Duration.ofSeconds(deduplicationSnapshotFrequency * 3)).untilAsserted(() -> { - PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry(); - PositionImpl cursorMD = (PositionImpl) cursor.getMarkDeletedPosition(); - assertTrue(LAC.compareTo(cursorMD) <= 0); - }); - - // cleanup. - admin.topics().delete(topic); - } - - @Test - public void testSnapshotCounterAfterUnload() throws Exception { - final int originalDeduplicationSnapshotFrequency = deduplicationSnapshotFrequency; - deduplicationSnapshotFrequency = 3600; - cleanup(); - setup(); - - // Create a topic and wait deduplication is started. - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(topic); - final PersistentTopic persistentTopic1 = - (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); - final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); - admin.topicPolicies().setDeduplicationStatus(topic, true); - Awaitility.await().untilAsserted(() -> { - ManagedCursorImpl cursor1 = - (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); - assertNotNull(cursor1); - }); - final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication(); - - // 1. Send 999 messages, it is less than "brokerDeduplicationEntriesIntervaddl". - // 2. Unload topic. - // 3. Send 1 messages, there are 1099 messages have not been snapshot now. - // 4. Verify the snapshot has been taken. - // step 1. - final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); - for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) { - producer.send(i + ""); - } - int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); - assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); - admin.topics().unload(topic); - PersistentTopic persistentTopic2 = - (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); - ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger(); - MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication(); - admin.topicPolicies().setDeduplicationStatus(topic, true); - Awaitility.await().untilAsserted(() -> { - ManagedCursorImpl cursor = - (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME); - assertNotNull(cursor); - }); - // step 3. - producer.send("last message"); - ml2.trimConsumedLedgersInBackground(new CompletableFuture<>()); - // step 4. - Awaitility.await().untilAsserted(() -> { - int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval); - // Verify: the previous ledger will be removed because all messages have been acked. - assertEquals(ml2.getLedgersInfo().size(), 1); - }); - - // cleanup. - producer.close(); - admin.topics().delete(topic); - deduplicationSnapshotFrequency = originalDeduplicationSnapshotFrequency; - cleanup(); - setup(); - } -}
