This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 06954d131c6 [fix][broker] Make `deleteTopicPolicies` serialized is 
executed when close topic. (#15811)
06954d131c6 is described below

commit 06954d131c63d9136ab7d895ed83ad67d857463e
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Aug 22 16:44:31 2022 +0800

    [fix][broker] Make `deleteTopicPolicies` serialized is executed when close 
topic. (#15811)
    
    (cherry picked from commit e8ee996dd0c7a3a742117aee399b31e89e6e2d9d)
---
 .../org/apache/pulsar/broker/service/BrokerService.java   | 15 ++++++++++++---
 .../pulsar/broker/service/persistent/PersistentTopic.java |  2 +-
 .../client/api/AuthenticatedProducerConsumerTest.java     |  1 +
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1fbc19657f5..aed023ee5f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2773,11 +2773,20 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
-        if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+        final PulsarService pulsarService = pulsar();
+        if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicName cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-        return 
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+        return pulsarService.getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenComposeAsync(optPolicies -> {
+                    if (optPolicies.isPresent() && optPolicies.get().deleted) {
+                        // We can return the completed future directly if the 
namespace is already deleted.
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    TopicName cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+                    return 
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+                });
     }
 
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 99f12d1b1a9..b3d61115032 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1163,7 +1163,7 @@ public class PersistentTopic extends AbstractTopic
 
                     deleteTopicAuthenticationFuture.thenCompose(
                                     __ -> deleteSchema ? deleteSchema() : 
CompletableFuture.completedFuture(null))
-                            .thenAccept(__ -> deleteTopicPolicies())
+                            .thenCompose(__ -> deleteTopicPolicies())
                             .thenCompose(__ -> 
transactionBufferCleanupAndClose())
                             .whenComplete((v, ex) -> {
                         if (ex != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 046b26846e2..e0cc980991e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -80,6 +80,7 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         conf.setTlsAllowInsecureConnection(true);
+        conf.setTopicLevelPoliciesEnabled(false);
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("localhost");

Reply via email to