This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ca889fd14703a4c8eb832a5a0269db114d22aff
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Dec 9 14:56:37 2022 +0800

    [fix][broker] Fix delete system topic clean topic policy (#18823)
    
    If users set topic policy for system topic, then delete this system topic, 
the topic policy should be deleted.
    
    Only change_events topic do not need to clear topic policies.
    
    (cherry picked from commit 93c41de8aac7dd655491d3b231468753d2d0a113)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 109 +++++++++++++--------
 .../broker/service/persistent/PersistentTopic.java | 101 ++++++++++---------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  38 ++++++-
 3 files changed, 156 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 87bbc4527f4..59193447fbc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -308,14 +309,24 @@ public abstract class NamespacesBase extends 
AdminResource {
             asyncResponse.resume(new RestException(e));
             return;
         }
-
         // remove from owned namespace map and ephemeral node from ZK
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         // remove system topics first.
+        Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+        Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
         if (!topics.isEmpty()) {
             for (String topic : topics) {
                 try {
-                    
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                        TopicName topicName = TopicName.get(topic);
+                        if (topicName.isPartitioned()) {
+                            partitionedTopicPolicySystemTopic.add(topic);
+                        } else {
+                            noPartitionedTopicPolicySystemTopic.add(topic);
+                        }
+                    } else {
+                        
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    }
                 } catch (Exception ex) {
                     log.error("[{}] Failed to delete system topic {}", 
clientAppId(), topic, ex);
                     asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, ex));
@@ -323,44 +334,49 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
             }
         }
-        FutureUtil.waitForAll(futures).thenCompose(__ -> {
-            List<CompletableFuture<Void>> deleteBundleFutures = 
Lists.newArrayList();
-            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                            .getBundles(namespaceName);
-            for (NamespaceBundle bundle : bundles.getBundles()) {
-                        // check if the bundle is owned by any broker, if not 
then we do not need to delete the bundle
-                
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership
 -> {
-                    if (ownership.isPresent()) {
-                        try {
-                            return pulsar().getAdminClient().namespaces()
-                                    
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange());
-                        } catch (PulsarServerException e) {
-                            throw new RestException(e);
-                        }
+        FutureUtil.waitForAll(futures)
+                .thenCompose(ignore -> 
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+                .thenCompose(ignore -> 
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                .thenCompose(__ -> {
+                    List<CompletableFuture<Void>> deleteBundleFutures = 
Lists.newArrayList();
+                    NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                    .getBundles(namespaceName);
+                    for (NamespaceBundle bundle : bundles.getBundles()) {
+                                // check if the bundle is owned by any broker, 
if not then we do not need to delete the bundle
+                        
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle)
+                                .thenCompose(ownership -> {
+                                    if (ownership.isPresent()) {
+                                        try {
+                                            return 
pulsar().getAdminClient().namespaces()
+                                                    
.deleteNamespaceBundleAsync(namespaceName.toString(),
+                                                            
bundle.getBundleRange());
+                                        } catch (PulsarServerException e) {
+                                            throw new RestException(e);
+                                        }
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                }));
+                    }
+                    return FutureUtil.waitForAll(deleteBundleFutures);
+                })
+                .thenCompose(__ -> internalClearZkSources())
+                .thenAccept(__ -> {
+                    log.info("[{}] Remove namespace successfully {}", 
clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to remove namespace {}", 
clientAppId(), namespaceName, cause);
+                    if (cause instanceof 
PulsarAdminException.ConflictException) {
+                        log.info("[{}] There are new topics created during the 
namespace deletion, "
+                                + "retry to delete the namespace again.", 
namespaceName);
+                        pulsar().getExecutor().execute(() -> 
internalDeleteNamespace(asyncResponse, authoritative));
                     } else {
-                        return CompletableFuture.completedFuture(null);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
                     }
-                }));
-            }
-            return FutureUtil.waitForAll(deleteBundleFutures);
-        })
-        .thenCompose(__ -> internalClearZkSources())
-        .thenAccept(__ -> {
-            log.info("[{}] Remove namespace successfully {}", clientAppId(), 
namespaceName);
-            asyncResponse.resume(Response.noContent().build());
-        })
-        .exceptionally(ex -> {
-            Throwable cause = FutureUtil.unwrapCompletionException(ex);
-            log.error("[{}] Failed to remove namespace {}", clientAppId(), 
namespaceName, cause);
-            if (cause instanceof PulsarAdminException.ConflictException) {
-                log.info("[{}] There are new topics created during the 
namespace deletion, "
-                        + "retry to delete the namespace again.", 
namespaceName);
-                pulsar().getExecutor().execute(() -> 
internalDeleteNamespace(asyncResponse, authoritative));
-            } else {
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-            }
-            return null;
-        });
+                    return null;
+                });
     }
 
     // clear zk-node resources for deleting namespace
@@ -477,13 +493,19 @@ public abstract class NamespacesBase extends 
AdminResource {
                 Set<String> nonPartitionedTopics = new HashSet<>();
                 Set<String> allSystemTopics = new HashSet<>();
                 Set<String> allPartitionedSystemTopics = new HashSet<>();
+                Set<String> noPartitionedTopicPolicySystemTopic = new 
HashSet<>();
+                Set<String> partitionedTopicPolicySystemTopic = new 
HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
                             if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                if 
(EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+                                } else {
+                                    
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                }
                                 continue;
                             }
                             String partitionedTopic = 
topicName.getPartitionedTopicName();
@@ -495,7 +517,11 @@ public abstract class NamespacesBase extends AdminResource 
{
                             }
                         } else {
                             if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allSystemTopics.add(topic);
+                                if 
(EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    
noPartitionedTopicPolicySystemTopic.add(topic);
+                                } else {
+                                    allSystemTopics.add(topic);
+                                }
                                 continue;
                             }
                             
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
@@ -525,6 +551,9 @@ public abstract class NamespacesBase extends AdminResource {
                                 .thenCompose((ignore) -> 
internalDeleteTopicsAsync(allSystemTopics))
                                 .thenCompose((ignore) ->
                                         
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .thenCompose(ignore ->
+                                        
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                                .thenCompose(ignore -> 
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
                                 .handle((result, exception) -> {
                                     if (exception != null) {
                                         if (exception.getCause() instanceof 
PulsarAdminException) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5f12bbf5886..e41d2905301 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1178,28 +1178,27 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 return null;
             });
 
-            closeClientFuture.thenAccept(delete -> {
-                CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
-                brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-                deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema 
? deleteSchema() :
-                                CompletableFuture.completedFuture(null))
-                        .thenCompose(ignore -> {
-                            if 
(!this.getBrokerService().getPulsar().getBrokerService()
-                                    .isSystemTopic(TopicName.get(topic))) {
-                                return deleteTopicPolicies();
-                            } else {
-                                return CompletableFuture.completedFuture(null);
-                            }
-                        })
-                        .thenCompose(__ -> transactionBufferCleanupAndClose())
-                        .whenComplete((v, ex) -> {
-                            if (ex != null) {
-                                log.error("[{}] Error deleting topic", topic, 
ex);
-                                unfenceTopicToResume();
-                                deleteFuture.completeExceptionally(ex);
-                            } else {
-                                List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
-                                subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+                closeClientFuture.thenAccept(__ -> {
+                    CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
+                    brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+                    deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema())
+                            .thenCompose(ignore -> {
+                                if 
(!EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    return deleteTopicPolicies();
+                                } else {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                            })
+                            .thenCompose(ignore -> 
transactionBufferCleanupAndClose())
+                            .whenComplete((v, ex) -> {
+                                if (ex != null) {
+                                    log.error("[{}] Error deleting topic", 
topic, ex);
+                                    unfenceTopicToResume();
+                                    deleteFuture.completeExceptionally(ex);
+                                } else {
+                                    List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
+                                    subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+
                                 
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                     if (e != null) {
                                         log.error("[{}] Error deleting topic", 
topic, e);
@@ -1211,36 +1210,36 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                             public void 
deleteLedgerComplete(Object ctx) {
                                                 
brokerService.removeTopicFromCache(PersistentTopic.this);
 
-                                                
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                                
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                                
unregisterTopicPolicyListener();
-
-                                                log.info("[{}] Topic deleted", 
topic);
-                                                deleteFuture.complete(null);
-                                            }
-
-                                            @Override
-                                            public void 
deleteLedgerFailed(ManagedLedgerException exception,
-                                                                           
Object ctx) {
-                                                if (exception.getCause()
-                                                        instanceof 
MetadataStoreException.NotFoundException) {
-                                                    log.info("[{}] Topic is 
already deleted {}",
-                                                            topic, 
exception.getMessage());
-                                                    deleteLedgerComplete(ctx);
-                                                } else {
-                                                    unfenceTopicToResume();
-                                                    log.error("[{}] Error 
deleting topic", topic, exception);
-                                                    
deleteFuture.completeExceptionally(
-                                                            new 
PersistenceException(exception));
-                                                }
+                                            
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                            
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                            unregisterTopicPolicyListener();
+
+                                            log.info("[{}] Topic deleted", 
topic);
+                                            deleteFuture.complete(null);
+                                        }
+
+                                        @Override
+                                        public void 
deleteLedgerFailed(ManagedLedgerException exception,
+                                                                       Object 
ctx) {
+                                            if (exception.getCause()
+                                                    instanceof 
MetadataStoreException.NotFoundException) {
+                                                log.info("[{}] Topic is 
already deleted {}",
+                                                        topic, 
exception.getMessage());
+                                                deleteLedgerComplete(ctx);
+                                            } else {
+                                                unfenceTopicToResume();
+                                                log.error("[{}] Error deleting 
topic", topic, exception);
+                                                
deleteFuture.completeExceptionally(
+                                                        new 
PersistenceException(exception));
                                             }
-                                        }, null);
-                                    }
-                                });
-                            }
-                        });
+                                        }
+                                    }, null);
+                                }
+                            });
+                        }
+                    });
             }).exceptionally(ex->{
                 unfenceTopicToResume();
                 deleteFuture.completeExceptionally(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index fe4a4ab9ed6..fdc218a0f05 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -57,11 +59,13 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -81,6 +85,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.events.EventsTopicNames;
@@ -101,6 +106,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.zookeeper.KeeperException.Code;
@@ -1822,7 +1828,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws 
Exception {
+    public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws 
Exception {
         String namespace = this.testTenant + "/delete-systemTopic";
         String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-systemTopic",
                 "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1842,4 +1848,34 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // 4. delete the policies topic and the topic wil not to clear topic 
polices
         admin.topics().delete(namespace + "/" + 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
     }
+    @Test
+    public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        Field field = 
PulsarService.class.getDeclaredField("topicPoliciesService");
+        field.setAccessible(true);
+        field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + 
"testDeleteTopicPolicyWhenDeleteSystemTopic";
+        admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                        Sets.newHashSet("use", "usc", "usw")));
+
+        admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(systemTopic).create();
+        admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+        int maxConsumerPerTopic = pulsar
+                .getTopicPoliciesService()
+                
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+                .getMaxConsumerPerTopic();
+
+        assertEquals(maxConsumerPerTopic, 5);
+        admin.topics().delete(systemTopic, true);
+        TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+                
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, 
TimeUnit.SECONDS);
+        assertNull(topicPolicies);
+    }
 }

Reply via email to