This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8db6a3b9b0d [fix][broker] Fix issue that topic policies was deleted 
after a sub topic deleted, even if the partitioned topic still exists (#24350)
8db6a3b9b0d is described below

commit 8db6a3b9b0d03bbaf10d825c82f3fa8f9bd6a5cb
Author: fengyubiao <[email protected]>
AuthorDate: Fri May 30 23:46:48 2025 +0800

    [fix][broker] Fix issue that topic policies was deleted after a sub topic 
deleted, even if the partitioned topic still exists (#24350)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++-
 .../pulsar/broker/service/AbstractTopic.java       |  3 ++
 .../pulsar/broker/service/BrokerService.java       |  8 +++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 54 ++++++++++++++++++++++
 .../apache/pulsar/broker/service/TopicGCTest.java  | 20 +++++---
 5 files changed, 82 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c666a19b61d..978b72697cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -787,8 +787,10 @@ public class PersistentTopicsBase extends AdminResource {
                         })
                 // Only tries to delete the znode for partitioned topic when 
all its partitions are successfully deleted
                 ).thenCompose(ignore ->
-                        
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null))
-                .thenCompose(__ -> 
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                        
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null)
+                ).thenCompose(ignore ->
+                        
pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName).exceptionally(ex
 -> null)
+                ).thenCompose(__ -> 
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
                         .runWithMarkDeleteAsync(topicName, () -> 
namespaceResources()
                                 
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
                 .thenAccept(__ -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ab9913dea48..47e58d848a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1286,6 +1286,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
     }
 
     public CompletableFuture<Void> deleteTopicPolicies() {
+        if (TopicName.get(getName()).isPartitioned()) {
+            return CompletableFuture.completedFuture(null);
+        }
         return 
brokerService.pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(TopicName.get(topic));
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ab0bef93847..45540428afc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1282,7 +1282,13 @@ public class BrokerService implements Closeable {
         deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
         deleteTopicAuthenticationFuture
         .thenCompose(__ -> deleteSchema(tn))
-        .thenCompose(__ -> 
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn)).whenComplete((v, 
ex) -> {
+        .thenCompose(__ -> {
+            if (tn.isPartitioned()) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return 
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn);
+        })
+        .whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index bf2c3e5b2aa..7a0605b6a2d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -96,6 +96,7 @@ import 
org.apache.pulsar.common.policies.data.SubscriptionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
@@ -3525,6 +3526,59 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(tpName, false);
     }
 
+    @DataProvider
+    public Object[][] partitionedTypes() {
+        return new Object[][]{
+            {TopicType.NON_PARTITIONED},
+            {TopicType.PARTITIONED}
+        };
+    }
+
+    @Test(dataProvider = "partitionedTypes")
+    public void testCleanupPoliciesAfterDeletedTopic(TopicType topicType) 
throws Exception {
+        final String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
+        final TopicName tpNameP0 = TopicName.get(tpName).getPartition(0);
+        final String subscriptionName = "s1";
+        final int rateMsgGlobal = 1000;
+        final int rateMsgLocal = 1000;
+        if (TopicType.PARTITIONED.equals(topicType)) {
+            admin.topics().createPartitionedTopic(tpName, 2);
+        } else {
+            admin.topics().createNonPartitionedTopic(tpName);
+        }
+
+        admin.topics().createSubscription(tpName, subscriptionName, 
MessageId.earliest);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+            .getTopicIfExists(TopicType.PARTITIONED.equals(topicType) ? 
tpNameP0.toString(): tpName).get().get();
+
+        // Set global policy.
+        // Verify: policies was affected.
+        DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 
1, false, 1);
+        admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+        DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 2, 
false, 2);
+        admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateLocal);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(),
 dispatchRateLocal);
+        });
+
+        // cleanup.
+        if (TopicType.PARTITIONED.equals(topicType)) {
+            admin.topics().deletePartitionedTopic(tpName, false);
+        } else {
+            admin.topics().delete(tpName, false);
+        }
+
+        // Verify: the topic-level policies will be removed after the topic is 
deleted.
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> topicPoliciesOptional = 
pulsar.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(tpName), 
LOCAL_ONLY).join();
+            Optional<TopicPolicies> topicPoliciesOptionalGlobal = 
pulsar.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(tpName), 
GLOBAL_ONLY).join();
+            assertTrue(topicPoliciesOptional.isEmpty());
+            assertTrue(topicPoliciesOptionalGlobal.isEmpty());
+        });
+    }
+
     @Test
     public void testGlobalTopicPolicies() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
index 172bd3702e1..5fe072f6bf6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.AfterClass;
@@ -162,6 +163,8 @@ public class TopicGCTest extends ProducerConsumerBase {
         final String subscription = "s1";
         admin.topics().createPartitionedTopic(topic, 2);
         admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+        final PublishRate publishRate = new PublishRate(100, 1000);
+        admin.topicPolicies().setPublishRate(topic, publishRate);
 
         // create consumers and producers.
         Producer<String> producer0 = 
pulsarClient.newProducer(Schema.STRING).topic(partition0)
@@ -185,12 +188,17 @@ public class TopicGCTest extends ProducerConsumerBase {
         // Wait for topic GC.
         // Partition 0 will be deleted about 20s later, left 2min to avoid 
flaky.
         producer0.close();
-        Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
-            CompletableFuture<Optional<Topic>> tp1 = 
pulsar.getBrokerService().getTopic(partition0, false);
-            CompletableFuture<Optional<Topic>> tp2 = 
pulsar.getBrokerService().getTopic(partition1, false);
-            assertTrue(tp1 == null || !tp1.get().isPresent());
-            assertTrue(tp2 != null && tp2.get().isPresent());
-        });
+        if 
(!subscribeTopicType.equals(SubscribeTopicType.MULTI_PARTITIONED_TOPIC)) {
+            Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> 
{
+                CompletableFuture<Optional<Topic>> tp1 = 
pulsar.getBrokerService().getTopic(partition0, false);
+                CompletableFuture<Optional<Topic>> tp2 = 
pulsar.getBrokerService().getTopic(partition1, false);
+                assertTrue(tp1 == null || !tp1.get().isPresent());
+                assertTrue(tp2 != null && tp2.get().isPresent());
+                // Verify: topic policies will not be removed after a 
sub-topic GC.
+                PublishRate publishRateGot = 
admin.topicPolicies().getPublishRate(topic);
+                assertEquals(publishRateGot, publishRate);
+            });
+        }
 
         // Verify that the messages under "partition-1" still can be ack.
         for (int i = 0; i < 2; i++) {

Reply via email to