This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e1d8c7d2b0c [fix][broker] Closed topics won't be removed from the 
cache (#23884)
e1d8c7d2b0c is described below

commit e1d8c7d2b0c01c27c87821f21662d4f19c8fe902
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Feb 6 10:14:28 2025 +0800

    [fix][broker] Closed topics won't be removed from the cache (#23884)
    
    (cherry picked from commit 8a40b30cf47a91ec02d931e6371d02409ba5951e)
---
 .../pulsar/broker/service/AbstractTopic.java       |   8 ++
 .../pulsar/broker/service/BrokerService.java       |  51 ++-----
 .../buffer/impl/TopicTransactionBuffer.java        |  27 +++-
 .../impl/TransactionPersistentTopicTest.java       | 148 +++++++++++++++++++++
 4 files changed, 188 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 69a38bc50de..9a115e6d1ca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.ToLongFunction;
 import javax.annotation.Nonnull;
 import lombok.Getter;
+import lombok.Setter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -96,6 +97,13 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
 
     protected final String topic;
 
+    // Reference to the CompletableFuture returned when creating this topic in 
BrokerService.
+    // Used to safely remove the topic from BrokerService's cache by ensuring 
we remove the exact
+    // topic instance that was created.
+    @Getter
+    @Setter
+    protected volatile CompletableFuture<Optional<Topic>> createFuture;
+
     // Producers currently connected to this topic
     protected final ConcurrentHashMap<String, Producer> producers;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5869d8822bc..7998c1954f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1326,6 +1326,7 @@ public class BrokerService implements Closeable {
         NonPersistentTopic nonPersistentTopic;
         try {
             nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
+            nonPersistentTopic.setCreateFuture(topicFuture);
         } catch (Throwable e) {
             log.warn("Failed to create topic {}", topic, e);
             topicFuture.completeExceptionally(e);
@@ -1800,6 +1801,7 @@ public class BrokerService implements Closeable {
                                 PersistentTopic persistentTopic = 
isSystemTopic(topic)
                                         ? new SystemTopic(topic, ledger, 
BrokerService.this)
                                         : newTopic(topic, ledger, 
BrokerService.this, PersistentTopic.class);
+                                persistentTopic.setCreateFuture(topicFuture);
                                 persistentTopic
                                         .initialize()
                                         .thenCompose(__ -> 
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
@@ -2409,47 +2411,18 @@ public class BrokerService implements Closeable {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
-        Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = 
findTopicFutureInCache(topic);
-        if (createTopicFuture.isEmpty()){
-            return CompletableFuture.completedFuture(null);
-        }
-        return removeTopicFutureFromCache(topic.getName(), 
createTopicFuture.get());
-    }
-
-    private Optional<CompletableFuture<Optional<Topic>>> 
findTopicFutureInCache(Topic topic){
-        if (topic == null){
-            return Optional.empty();
-        }
-        final CompletableFuture<Optional<Topic>> createTopicFuture = 
topics.get(topic.getName());
-        // If not exists in cache, do nothing.
-        if (createTopicFuture == null){
-            return Optional.empty();
-        }
-        // If the future in cache is not yet complete, the topic instance in 
the cache is not the same with the topic.
-        if (!createTopicFuture.isDone()){
-            return Optional.empty();
-        }
-        // If the future in cache has exception complete,
-        // the topic instance in the cache is not the same with the topic.
-        if (createTopicFuture.isCompletedExceptionally()){
-            return Optional.empty();
-        }
-        Optional<Topic> optionalTopic = createTopicFuture.join();
-        Topic topicInCache = optionalTopic.orElse(null);
-        if (topicInCache == null || topicInCache != topic){
-            return Optional.empty();
-        } else {
-            return Optional.of(createTopicFuture);
-        }
-    }
-
-    private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
-                                                        
CompletableFuture<Optional<Topic>> createTopicFuture) {
-        TopicName topicName = TopicName.get(topic);
+    /**
+     * Removes the topic from the cache only if the topicName and associated 
createFuture match exactly.
+     * The TopicEvent.UNLOAD event will be triggered before and after removal.
+     *
+     * @param topic The topic to be removed.
+     * @return A CompletableFuture that completes when the operation is done.
+     */
+    public CompletableFuture<Void> removeTopicFromCache(AbstractTopic topic) {
+        TopicName topicName = TopicName.get(topic.getName());
         return pulsar.getNamespaceService().getBundleAsync(topicName)
                 .thenAccept(namespaceBundle -> {
-                    removeTopicFromCache(topic, namespaceBundle, 
createTopicFuture);
+                    removeTopicFromCache(topic.getName(), namespaceBundle, 
topic.getCreateFuture());
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 41977e6b61d..c43f0ed7fb9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     private final AbortedTxnProcessor.SnapshotType snapshotType;
     private final MaxReadPositionCallBack maxReadPositionCallBack;
 
+    private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic 
topic) {
+        return 
topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
+                ? new SnapshotSegmentAbortedTxnProcessorImpl(topic)
+                : new SingleSnapshotAbortedTxnProcessorImpl(topic);
+    }
+
+    private static AbortedTxnProcessor.SnapshotType 
determineSnapshotType(PersistentTopic topic) {
+        return 
topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
+                ? AbortedTxnProcessor.SnapshotType.Segment
+                : AbortedTxnProcessor.SnapshotType.Single;
+    }
+
     public TopicTransactionBuffer(PersistentTopic topic) {
+        this(topic, createSnapshotProcessor(topic), 
determineSnapshotType(topic));
+    }
+
+    @VisibleForTesting
+    TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor 
snapshotAbortedTxnProcessor,
+                           AbortedTxnProcessor.SnapshotType snapshotType) {
         super(State.None);
         this.topic = topic;
         this.timer = 
topic.getBrokerService().getPulsar().getTransactionTimer();
@@ -118,13 +136,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = 
topic.getManagedLedger().getLastConfirmedEntry();
-        if 
(topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled())
 {
-            snapshotAbortedTxnProcessor = new 
SnapshotSegmentAbortedTxnProcessorImpl(topic);
-            snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
-        } else {
-            snapshotAbortedTxnProcessor = new 
SingleSnapshotAbortedTxnProcessorImpl(topic);
-            snapshotType = AbortedTxnProcessor.SnapshotType.Single;
-        }
+        this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor;
+        this.snapshotType = snapshotType;
         this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
         this.recover();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java
new file mode 100644
index 00000000000..508423adce4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TopicFactory;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class TransactionPersistentTopicTest extends ProducerConsumerBase {
+
+    private static CountDownLatch topicInitSuccessSignal = new 
CountDownLatch(1);
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        // Intercept when the `topicFuture` is about to complete and wait 
until the topic close operation finishes.
+        conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+        conf.setTransactionCoordinatorEnabled(true);
+        conf.setBrokerDeduplicationEnabled(false);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testNoOrphanClosedTopicIfTxnInternalFailed() {
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
+
+        BrokerService brokerService = pulsar.getBrokerService();
+
+        // 1. Mock close topic when create transactionBuffer
+        TransactionBufferProvider mockTransactionBufferProvider = originTopic 
-> {
+            AbortedTxnProcessor abortedTxnProcessor = 
mock(AbortedTxnProcessor.class);
+            doAnswer(invocation -> {
+                topicInitSuccessSignal.await();
+                return CompletableFuture.failedFuture(new 
RuntimeException("Mock recovery failed"));
+            }).when(abortedTxnProcessor).recoverFromSnapshot();
+            
when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+            return new TopicTransactionBuffer(
+                    (PersistentTopic) originTopic, abortedTxnProcessor, 
AbortedTxnProcessor.SnapshotType.Single);
+        };
+        TransactionBufferProvider originalTransactionBufferProvider = 
pulsar.getTransactionBufferProvider();
+        pulsar.setTransactionBufferProvider(mockTransactionBufferProvider);
+
+        // 2. Trigger create topic and assert topic load success.
+        CompletableFuture<Optional<Topic>> firstLoad = 
brokerService.getTopic(tpName, true);
+        Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    assertTrue(firstLoad.isDone());
+                    assertFalse(firstLoad.isCompletedExceptionally());
+                });
+
+        // 3. Assert topic removed from cache
+        Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    assertFalse(brokerService.getTopics().containsKey(tpName));
+                });
+
+        // 4. Set txn provider to back
+        pulsar.setTransactionBufferProvider(originalTransactionBufferProvider);
+    }
+
+    public static class MyTopicFactory implements TopicFactory {
+        @Override
+        public <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+                                          Class<T> topicClazz) {
+            try {
+                if (topicClazz == NonPersistentTopic.class) {
+                    return (T) new NonPersistentTopic(topic, brokerService);
+                } else {
+                    return (T) new MyPersistentTopic(topic, ledger, 
brokerService);
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+    }
+
+    public static class MyPersistentTopic extends PersistentTopic {
+
+        public MyPersistentTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService) {
+            super(topic, ledger, brokerService);
+        }
+
+        @SneakyThrows
+        @Override
+        public CompletableFuture<Void> checkDeduplicationStatus() {
+            topicInitSuccessSignal.countDown();
+            // Sleep 1s pending txn buffer recover failed and close topic
+            Thread.sleep(1000);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+}

Reply via email to