This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c9ff8f57a2 [fix][broker] Fix can not delete namespace by force 
(#18307)
6c9ff8f57a2 is described below

commit 6c9ff8f57a2aa9861b578bdaf18ef6de53ee3fbe
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Nov 21 11:42:43 2022 +0800

    [fix][broker] Fix can not delete namespace by force (#18307)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 51 ++++++++++-------
 .../broker/service/persistent/PersistentTopic.java | 29 ++++++----
 .../apache/pulsar/broker/admin/NamespacesTest.java | 66 ++++++++++++++++++++++
 .../broker/transaction/TransactionProduceTest.java | 29 ----------
 .../pulsar/broker/transaction/TransactionTest.java |  5 +-
 5 files changed, 117 insertions(+), 63 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 04fdc6d5104..1af182c2d59 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -218,24 +218,29 @@ public abstract class NamespacesBase extends 
AdminResource {
                          }))
                 .thenCompose(topics -> {
                     List<String> allTopics = topics.get(0);
+                    ArrayList<String> allUserCreatedTopics = new ArrayList<>();
                     List<String> allPartitionedTopics = topics.get(1);
-                    if (!force) {
-                        boolean hasNonSystemTopic = false;
-                        for (String topic : allTopics) {
-                            if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
-                                hasNonSystemTopic = true;
-                                break;
-                            }
+                    ArrayList<String> allUserCreatedPartitionTopics = new 
ArrayList<>();
+                    boolean hasNonSystemTopic = false;
+                    List<String> allSystemTopics = new ArrayList<>();
+                    List<String> allPartitionedSystemTopics = new 
ArrayList<>();
+                    for (String topic : allTopics) {
+                        if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+                            hasNonSystemTopic = true;
+                            allUserCreatedTopics.add(topic);
+                        } else {
+                            allSystemTopics.add(topic);
                         }
-                        if (!hasNonSystemTopic) {
-                            for (String topic : allPartitionedTopics) {
-                                if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
-                                    hasNonSystemTopic = true;
-                                    break;
-                                }
-                            }
+                    }
+                    for (String topic : allPartitionedTopics) {
+                        if 
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+                            hasNonSystemTopic = true;
+                            allUserCreatedPartitionTopics.add(topic);
+                        } else {
+                            allPartitionedSystemTopics.add(topic);
                         }
-
+                    }
+                    if (!force) {
                         if (hasNonSystemTopic) {
                             throw new RestException(Status.CONFLICT, "Cannot 
delete non empty namespace");
                         }
@@ -243,13 +248,17 @@ public abstract class NamespacesBase extends 
AdminResource {
                     return 
namespaceResources().setPoliciesAsync(namespaceName, old -> {
                         old.deleted = true;
                         return  old;
-                    }).thenCompose(__ -> {
-                        return internalDeleteTopicsAsync(allTopics);
-                    }).thenCompose(__ -> {
-                        return 
internalDeletePartitionedTopicsAsync(allPartitionedTopics);
+                    }).thenCompose(ignore -> {
+                        return internalDeleteTopicsAsync(allUserCreatedTopics);
+                    }).thenCompose(ignore -> {
+                        return 
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
+                    }).thenCompose(ignore -> {
+                        return internalDeleteTopicsAsync(allSystemTopics);
+                    }).thenCompose(ignore__ -> {
+                        return 
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
                     });
                 })
-                .thenCompose(__ -> pulsar().getNamespaceService()
+                .thenCompose(ignore -> pulsar().getNamespaceService()
                         
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
                 .thenCompose(bundles -> 
FutureUtil.waitForAll(bundles.getBundles().stream()
                         .map(bundle -> 
pulsar().getNamespaceService().getOwnerAsync(bundle)
@@ -271,7 +280,7 @@ public abstract class NamespacesBase extends AdminResource {
                                     return 
CompletableFuture.completedFuture(null);
                                 })
                         ).collect(Collectors.toList())))
-                .thenCompose(__ -> internalClearZkSources());
+                .thenCompose(ignore -> internalClearZkSources());
     }
 
     private CompletableFuture<Void> 
internalDeletePartitionedTopicsAsync(List<String> topicNames) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 418da33aea0..e4bcb92b58d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1219,17 +1219,24 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
                     brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
 
-                    deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema())
-                            .thenCompose(ignore -> deleteTopicPolicies())
-                            .thenCompose(ignore -> 
transactionBufferCleanupAndClose())
-                            .whenComplete((v, ex) -> {
-                                if (ex != null) {
-                                    log.error("[{}] Error deleting topic", 
topic, ex);
-                                    unfenceTopicToResume();
-                                    deleteFuture.completeExceptionally(ex);
-                                } else {
-                                    List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
-                                    subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+                        deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema())
+                                .thenCompose(ignore -> {
+                                    if 
(!this.getBrokerService().getPulsar().getBrokerService()
+                                            
.isSystemTopic(TopicName.get(topic))) {
+                                        return deleteTopicPolicies();
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                })
+                                .thenCompose(ignore -> 
transactionBufferCleanupAndClose())
+                                .whenComplete((v, ex) -> {
+                                    if (ex != null) {
+                                        log.error("[{}] Error deleting topic", 
topic, ex);
+                                        unfenceTopicToResume();
+                                        deleteFuture.completeExceptionally(ex);
+                                    } else {
+                                        List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
+                                        subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
 
                                     
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                         if (e != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0b640c16776..b4019c317c9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -76,6 +76,7 @@ import 
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -92,6 +93,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -119,6 +122,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -2018,4 +2022,66 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(forceDeleteNamespaceAllowedOriginalValue);
     }
 
+    @Test
+    public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws 
Exception {
+        String namespace = this.testTenant + "/delete-namespace";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-namespace",
+                "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+        Field policesService = 
pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new 
SystemTopicBasedTopicPoliciesService(pulsar));
+
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. change the order of the topics in this namespace.
+        List<String> topics = 
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+        Assert.assertTrue(topics.size() >= 2);
+        for (int i = 0; i < topics.size(); i++) {
+            if 
(topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+                String systemTopic = topics.get(i);
+                topics.set(i, topics.get(0));
+                topics.set(0, systemTopic);
+            }
+        }
+        NamespaceService mockNamespaceService = 
spy(pulsar.getNamespaceService());
+        Field namespaceServiceField = 
pulsar.getClass().getDeclaredField("nsService");
+        namespaceServiceField.setAccessible(true);
+        namespaceServiceField.set(pulsar, mockNamespaceService);
+        
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+        // 5. delete the namespace
+        admin.namespaces().deleteNamespace(namespace, true);
+        // cleanup
+        resetBroker();
+    }
+
+    @Test
+    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws 
Exception {
+        String namespace = this.testTenant + "/delete-systemTopic";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-systemTopic",
+                "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        Field policesService = 
pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new 
SystemTopicBasedTopicPoliciesService(pulsar));
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. delete the policies topic and the topic wil not to clear topic 
polices
+        admin.topics().delete(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 06bb92890b6..cdbb1563280 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         produceTest(true);
     }
 
-    @Test
-    public void testDeleteNamespaceBeforeCommit() throws Exception {
-        final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
-        PulsarClient pulsarClient = this.pulsarClient;
-        Transaction tnx = pulsarClient.newTransaction()
-                .withTransactionTimeout(60, TimeUnit.SECONDS)
-                .build().get();
-        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
-        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
-        Assert.assertTrue(txnIdMostBits > -1);
-        Assert.assertTrue(txnIdLeastBits > -1);
-
-        @Cleanup
-        Producer<byte[]> outProducer = pulsarClient
-                .newProducer()
-                .topic(topic)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .enableBatching(false)
-                .create();
-
-        String content = "Hello Txn";
-        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
-        try {
-            deleteNamespaceGraceFully(NAMESPACE1, true);
-        } catch (Exception ignore) {}
-        tnx.commit().get();
-    }
-
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 33305a2b8df..1dd6feb4762 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -215,7 +215,8 @@ public class TransactionTest extends TransactionTestBase {
     public void testCreateTransactionSystemTopic() throws Exception {
         String subName = "test";
         String topicName = TopicName.get(NAMESPACE1 + "/" + 
"testCreateTransactionSystemTopic").toString();
-
+        admin.namespaces().deleteNamespace(NAMESPACE1, true);
+        admin.namespaces().createNamespace(NAMESPACE1);
         try {
             // init pending ack
             @Cleanup
@@ -231,7 +232,7 @@ public class TransactionTest extends TransactionTestBase {
 
         // getList does not include transaction system topic
         List<String> list = admin.topics().getList(NAMESPACE1);
-        assertEquals(list.size(), 2);
+        assertFalse(list.isEmpty());
         list.forEach(topic -> 
assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
 
         try {

Reply via email to