This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c9ff8f57a2 [fix][broker] Fix can not delete namespace by force
(#18307)
6c9ff8f57a2 is described below
commit 6c9ff8f57a2aa9861b578bdaf18ef6de53ee3fbe
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Nov 21 11:42:43 2022 +0800
[fix][broker] Fix can not delete namespace by force (#18307)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 51 ++++++++++-------
.../broker/service/persistent/PersistentTopic.java | 29 ++++++----
.../apache/pulsar/broker/admin/NamespacesTest.java | 66 ++++++++++++++++++++++
.../broker/transaction/TransactionProduceTest.java | 29 ----------
.../pulsar/broker/transaction/TransactionTest.java | 5 +-
5 files changed, 117 insertions(+), 63 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 04fdc6d5104..1af182c2d59 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -218,24 +218,29 @@ public abstract class NamespacesBase extends
AdminResource {
}))
.thenCompose(topics -> {
List<String> allTopics = topics.get(0);
+ ArrayList<String> allUserCreatedTopics = new ArrayList<>();
List<String> allPartitionedTopics = topics.get(1);
- if (!force) {
- boolean hasNonSystemTopic = false;
- for (String topic : allTopics) {
- if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
- hasNonSystemTopic = true;
- break;
- }
+ ArrayList<String> allUserCreatedPartitionTopics = new
ArrayList<>();
+ boolean hasNonSystemTopic = false;
+ List<String> allSystemTopics = new ArrayList<>();
+ List<String> allPartitionedSystemTopics = new
ArrayList<>();
+ for (String topic : allTopics) {
+ if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+ hasNonSystemTopic = true;
+ allUserCreatedTopics.add(topic);
+ } else {
+ allSystemTopics.add(topic);
}
- if (!hasNonSystemTopic) {
- for (String topic : allPartitionedTopics) {
- if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
- hasNonSystemTopic = true;
- break;
- }
- }
+ }
+ for (String topic : allPartitionedTopics) {
+ if
(!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+ hasNonSystemTopic = true;
+ allUserCreatedPartitionTopics.add(topic);
+ } else {
+ allPartitionedSystemTopics.add(topic);
}
-
+ }
+ if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot
delete non empty namespace");
}
@@ -243,13 +248,17 @@ public abstract class NamespacesBase extends
AdminResource {
return
namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
- }).thenCompose(__ -> {
- return internalDeleteTopicsAsync(allTopics);
- }).thenCompose(__ -> {
- return
internalDeletePartitionedTopicsAsync(allPartitionedTopics);
+ }).thenCompose(ignore -> {
+ return internalDeleteTopicsAsync(allUserCreatedTopics);
+ }).thenCompose(ignore -> {
+ return
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
+ }).thenCompose(ignore -> {
+ return internalDeleteTopicsAsync(allSystemTopics);
+ }).thenCompose(ignore__ -> {
+ return
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
});
})
- .thenCompose(__ -> pulsar().getNamespaceService()
+ .thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
.thenCompose(bundles ->
FutureUtil.waitForAll(bundles.getBundles().stream()
.map(bundle ->
pulsar().getNamespaceService().getOwnerAsync(bundle)
@@ -271,7 +280,7 @@ public abstract class NamespacesBase extends AdminResource {
return
CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
- .thenCompose(__ -> internalClearZkSources());
+ .thenCompose(ignore -> internalClearZkSources());
}
private CompletableFuture<Void>
internalDeletePartitionedTopicsAsync(List<String> topicNames) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 418da33aea0..e4bcb92b58d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1219,17 +1219,24 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Void> deleteTopicAuthenticationFuture =
new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
- deleteTopicAuthenticationFuture.thenCompose(ignore ->
deleteSchema())
- .thenCompose(ignore -> deleteTopicPolicies())
- .thenCompose(ignore ->
transactionBufferCleanupAndClose())
- .whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic",
topic, ex);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(ex);
- } else {
- List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
- subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
+ deleteTopicAuthenticationFuture.thenCompose(ignore ->
deleteSchema())
+ .thenCompose(ignore -> {
+ if
(!this.getBrokerService().getPulsar().getBrokerService()
+
.isSystemTopic(TopicName.get(topic))) {
+ return deleteTopicPolicies();
+ } else {
+ return
CompletableFuture.completedFuture(null);
+ }
+ })
+ .thenCompose(ignore ->
transactionBufferCleanupAndClose())
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic",
topic, ex);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
+ subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0b640c16776..b4019c317c9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -76,6 +76,7 @@ import
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -92,6 +93,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -119,6 +122,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -2018,4 +2022,66 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(forceDeleteNamespaceAllowedOriginalValue);
}
+ @Test
+ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws
Exception {
+ String namespace = this.testTenant + "/delete-namespace";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-namespace",
+ "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+ Field policesService =
pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new
SystemTopicBasedTopicPoliciesService(pulsar));
+
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. change the order of the topics in this namespace.
+ List<String> topics =
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+ Assert.assertTrue(topics.size() >= 2);
+ for (int i = 0; i < topics.size(); i++) {
+ if
(topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+ String systemTopic = topics.get(i);
+ topics.set(i, topics.get(0));
+ topics.set(0, systemTopic);
+ }
+ }
+ NamespaceService mockNamespaceService =
spy(pulsar.getNamespaceService());
+ Field namespaceServiceField =
pulsar.getClass().getDeclaredField("nsService");
+ namespaceServiceField.setAccessible(true);
+ namespaceServiceField.set(pulsar, mockNamespaceService);
+
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+ // 5. delete the namespace
+ admin.namespaces().deleteNamespace(namespace, true);
+ // cleanup
+ resetBroker();
+ }
+
+ @Test
+ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws
Exception {
+ String namespace = this.testTenant + "/delete-systemTopic";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-systemTopic",
+ "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ Field policesService =
pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new
SystemTopicBasedTopicPoliciesService(pulsar));
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. delete the policies topic and the topic wil not to clear topic
polices
+ admin.topics().delete(namespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 06bb92890b6..cdbb1563280 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends
TransactionTestBase {
produceTest(true);
}
- @Test
- public void testDeleteNamespaceBeforeCommit() throws Exception {
- final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
- PulsarClient pulsarClient = this.pulsarClient;
- Transaction tnx = pulsarClient.newTransaction()
- .withTransactionTimeout(60, TimeUnit.SECONDS)
- .build().get();
- long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
- long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
- Assert.assertTrue(txnIdMostBits > -1);
- Assert.assertTrue(txnIdLeastBits > -1);
-
- @Cleanup
- Producer<byte[]> outProducer = pulsarClient
- .newProducer()
- .topic(topic)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(false)
- .create();
-
- String content = "Hello Txn";
- outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
- try {
- deleteNamespaceGraceFully(NAMESPACE1, true);
- } catch (Exception ignore) {}
- tnx.commit().get();
- }
-
@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 33305a2b8df..1dd6feb4762 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -215,7 +215,8 @@ public class TransactionTest extends TransactionTestBase {
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" +
"testCreateTransactionSystemTopic").toString();
-
+ admin.namespaces().deleteNamespace(NAMESPACE1, true);
+ admin.namespaces().createNamespace(NAMESPACE1);
try {
// init pending ack
@Cleanup
@@ -231,7 +232,7 @@ public class TransactionTest extends TransactionTestBase {
// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
- assertEquals(list.size(), 2);
+ assertFalse(list.isEmpty());
list.forEach(topic ->
assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
try {