This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8db6a3b9b0d [fix][broker] Fix issue that topic policies was deleted
after a sub topic deleted, even if the partitioned topic still exists (#24350)
8db6a3b9b0d is described below
commit 8db6a3b9b0d03bbaf10d825c82f3fa8f9bd6a5cb
Author: fengyubiao <[email protected]>
AuthorDate: Fri May 30 23:46:48 2025 +0800
[fix][broker] Fix issue that topic policies was deleted after a sub topic
deleted, even if the partitioned topic still exists (#24350)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++-
.../pulsar/broker/service/AbstractTopic.java | 3 ++
.../pulsar/broker/service/BrokerService.java | 8 +++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 54 ++++++++++++++++++++++
.../apache/pulsar/broker/service/TopicGCTest.java | 20 +++++---
5 files changed, 82 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c666a19b61d..978b72697cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -787,8 +787,10 @@ public class PersistentTopicsBase extends AdminResource {
})
// Only tries to delete the znode for partitioned topic when
all its partitions are successfully deleted
).thenCompose(ignore ->
-
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null))
- .thenCompose(__ ->
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null)
+ ).thenCompose(ignore ->
+
pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName).exceptionally(ex
-> null)
+ ).thenCompose(__ ->
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName, () ->
namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
.thenAccept(__ -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ab9913dea48..47e58d848a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1286,6 +1286,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
}
public CompletableFuture<Void> deleteTopicPolicies() {
+ if (TopicName.get(getName()).isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ }
return
brokerService.pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(TopicName.get(topic));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ab0bef93847..45540428afc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1282,7 +1282,13 @@ public class BrokerService implements Closeable {
deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
deleteTopicAuthenticationFuture
.thenCompose(__ -> deleteSchema(tn))
- .thenCompose(__ ->
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn)).whenComplete((v,
ex) -> {
+ .thenCompose(__ -> {
+ if (tn.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn);
+ })
+ .whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index bf2c3e5b2aa..7a0605b6a2d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -96,6 +96,7 @@ import
org.apache.pulsar.common.policies.data.SubscriptionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
@@ -3525,6 +3526,59 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete(tpName, false);
}
+ @DataProvider
+ public Object[][] partitionedTypes() {
+ return new Object[][]{
+ {TopicType.NON_PARTITIONED},
+ {TopicType.PARTITIONED}
+ };
+ }
+
+ @Test(dataProvider = "partitionedTypes")
+ public void testCleanupPoliciesAfterDeletedTopic(TopicType topicType)
throws Exception {
+ final String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
+ final TopicName tpNameP0 = TopicName.get(tpName).getPartition(0);
+ final String subscriptionName = "s1";
+ final int rateMsgGlobal = 1000;
+ final int rateMsgLocal = 1000;
+ if (TopicType.PARTITIONED.equals(topicType)) {
+ admin.topics().createPartitionedTopic(tpName, 2);
+ } else {
+ admin.topics().createNonPartitionedTopic(tpName);
+ }
+
+ admin.topics().createSubscription(tpName, subscriptionName,
MessageId.earliest);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopicIfExists(TopicType.PARTITIONED.equals(topicType) ?
tpNameP0.toString(): tpName).get().get();
+
+ // Set global policy.
+ // Verify: policies was affected.
+ DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal,
1, false, 1);
+ admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+ DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 2,
false, 2);
+ admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateLocal);
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(),
dispatchRateLocal);
+ });
+
+ // cleanup.
+ if (TopicType.PARTITIONED.equals(topicType)) {
+ admin.topics().deletePartitionedTopic(tpName, false);
+ } else {
+ admin.topics().delete(tpName, false);
+ }
+
+ // Verify: the topic-level policies will be removed after the topic is
deleted.
+ Awaitility.await().untilAsserted(() -> {
+ Optional<TopicPolicies> topicPoliciesOptional =
pulsar.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(tpName),
LOCAL_ONLY).join();
+ Optional<TopicPolicies> topicPoliciesOptionalGlobal =
pulsar.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(tpName),
GLOBAL_ONLY).join();
+ assertTrue(topicPoliciesOptional.isEmpty());
+ assertTrue(topicPoliciesOptionalGlobal.isEmpty());
+ });
+ }
+
@Test
public void testGlobalTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
index 172bd3702e1..5fe072f6bf6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
@@ -162,6 +163,8 @@ public class TopicGCTest extends ProducerConsumerBase {
final String subscription = "s1";
admin.topics().createPartitionedTopic(topic, 2);
admin.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ final PublishRate publishRate = new PublishRate(100, 1000);
+ admin.topicPolicies().setPublishRate(topic, publishRate);
// create consumers and producers.
Producer<String> producer0 =
pulsarClient.newProducer(Schema.STRING).topic(partition0)
@@ -185,12 +188,17 @@ public class TopicGCTest extends ProducerConsumerBase {
// Wait for topic GC.
// Partition 0 will be deleted about 20s later, left 2min to avoid
flaky.
producer0.close();
- Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
- CompletableFuture<Optional<Topic>> tp1 =
pulsar.getBrokerService().getTopic(partition0, false);
- CompletableFuture<Optional<Topic>> tp2 =
pulsar.getBrokerService().getTopic(partition1, false);
- assertTrue(tp1 == null || !tp1.get().isPresent());
- assertTrue(tp2 != null && tp2.get().isPresent());
- });
+ if
(!subscribeTopicType.equals(SubscribeTopicType.MULTI_PARTITIONED_TOPIC)) {
+ Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() ->
{
+ CompletableFuture<Optional<Topic>> tp1 =
pulsar.getBrokerService().getTopic(partition0, false);
+ CompletableFuture<Optional<Topic>> tp2 =
pulsar.getBrokerService().getTopic(partition1, false);
+ assertTrue(tp1 == null || !tp1.get().isPresent());
+ assertTrue(tp2 != null && tp2.get().isPresent());
+ // Verify: topic policies will not be removed after a
sub-topic GC.
+ PublishRate publishRateGot =
admin.topicPolicies().getPublishRate(topic);
+ assertEquals(publishRateGot, publishRate);
+ });
+ }
// Verify that the messages under "partition-1" still can be ack.
for (int i = 0; i < 2; i++) {