This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ca889fd14703a4c8eb832a5a0269db114d22aff Author: Xiangying Meng <[email protected]> AuthorDate: Fri Dec 9 14:56:37 2022 +0800 [fix][broker] Fix delete system topic clean topic policy (#18823) If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted. Only change_events topic do not need to clear topic policies. (cherry picked from commit 93c41de8aac7dd655491d3b231468753d2d0a113) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 109 +++++++++++++-------- .../broker/service/persistent/PersistentTopic.java | 101 ++++++++++--------- .../apache/pulsar/broker/admin/NamespacesTest.java | 38 ++++++- 3 files changed, 156 insertions(+), 92 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 87bbc4527f4..59193447fbc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -65,6 +65,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -308,14 +309,24 @@ public abstract class NamespacesBase extends AdminResource { asyncResponse.resume(new RestException(e)); return; } - // remove from owned namespace map and ephemeral node from ZK final List<CompletableFuture<Void>> futures = Lists.newArrayList(); // remove system topics first. + Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>(); + Set<String> partitionedTopicPolicySystemTopic = new HashSet<>(); if (!topics.isEmpty()) { for (String topic : topics) { try { - futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true)); + if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) { + TopicName topicName = TopicName.get(topic); + if (topicName.isPartitioned()) { + partitionedTopicPolicySystemTopic.add(topic); + } else { + noPartitionedTopicPolicySystemTopic.add(topic); + } + } else { + futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true)); + } } catch (Exception ex) { log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex); asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex)); @@ -323,44 +334,49 @@ public abstract class NamespacesBase extends AdminResource { } } } - FutureUtil.waitForAll(futures).thenCompose(__ -> { - List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList(); - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle bundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then we do not need to delete the bundle - deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> { - if (ownership.isPresent()) { - try { - return pulsar().getAdminClient().namespaces() - .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()); - } catch (PulsarServerException e) { - throw new RestException(e); - } + FutureUtil.waitForAll(futures) + .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) + .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic)) + .thenCompose(__ -> { + List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList(); + NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + for (NamespaceBundle bundle : bundles.getBundles()) { + // check if the bundle is owned by any broker, if not then we do not need to delete the bundle + deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle) + .thenCompose(ownership -> { + if (ownership.isPresent()) { + try { + return pulsar().getAdminClient().namespaces() + .deleteNamespaceBundleAsync(namespaceName.toString(), + bundle.getBundleRange()); + } catch (PulsarServerException e) { + throw new RestException(e); + } + } else { + return CompletableFuture.completedFuture(null); + } + })); + } + return FutureUtil.waitForAll(deleteBundleFutures); + }) + .thenCompose(__ -> internalClearZkSources()) + .thenAccept(__ -> { + log.info("[{}] Remove namespace successfully {}", clientAppId(), namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to remove namespace {}", clientAppId(), namespaceName, cause); + if (cause instanceof PulsarAdminException.ConflictException) { + log.info("[{}] There are new topics created during the namespace deletion, " + + "retry to delete the namespace again.", namespaceName); + pulsar().getExecutor().execute(() -> internalDeleteNamespace(asyncResponse, authoritative)); } else { - return CompletableFuture.completedFuture(null); + resumeAsyncResponseExceptionally(asyncResponse, ex); } - })); - } - return FutureUtil.waitForAll(deleteBundleFutures); - }) - .thenCompose(__ -> internalClearZkSources()) - .thenAccept(__ -> { - log.info("[{}] Remove namespace successfully {}", clientAppId(), namespaceName); - asyncResponse.resume(Response.noContent().build()); - }) - .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.error("[{}] Failed to remove namespace {}", clientAppId(), namespaceName, cause); - if (cause instanceof PulsarAdminException.ConflictException) { - log.info("[{}] There are new topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); - pulsar().getExecutor().execute(() -> internalDeleteNamespace(asyncResponse, authoritative)); - } else { - resumeAsyncResponseExceptionally(asyncResponse, ex); - } - return null; - }); + return null; + }); } // clear zk-node resources for deleting namespace @@ -477,13 +493,19 @@ public abstract class NamespacesBase extends AdminResource { Set<String> nonPartitionedTopics = new HashSet<>(); Set<String> allSystemTopics = new HashSet<>(); Set<String> allPartitionedSystemTopics = new HashSet<>(); + Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>(); + Set<String> partitionedTopicPolicySystemTopic = new HashSet<>(); for (String topic : topics) { try { TopicName topicName = TopicName.get(topic); if (topicName.isPartitioned()) { if (pulsar().getBrokerService().isSystemTopic(topicName)) { - allPartitionedSystemTopics.add(topicName.getPartitionedTopicName()); + if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) { + partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName()); + } else { + allPartitionedSystemTopics.add(topicName.getPartitionedTopicName()); + } continue; } String partitionedTopic = topicName.getPartitionedTopicName(); @@ -495,7 +517,11 @@ public abstract class NamespacesBase extends AdminResource { } } else { if (pulsar().getBrokerService().isSystemTopic(topicName)) { - allSystemTopics.add(topic); + if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) { + noPartitionedTopicPolicySystemTopic.add(topic); + } else { + allSystemTopics.add(topic); + } continue; } topicFutures.add(pulsar().getAdminClient().topics().deleteAsync( @@ -525,6 +551,9 @@ public abstract class NamespacesBase extends AdminResource { .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics)) .thenCompose((ignore) -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic)) + .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) .handle((result, exception) -> { if (exception != null) { if (exception.getCause() instanceof PulsarAdminException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5f12bbf5886..e41d2905301 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1178,28 +1178,27 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return null; }); - closeClientFuture.thenAccept(delete -> { - CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>(); - brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? deleteSchema() : - CompletableFuture.completedFuture(null)) - .thenCompose(ignore -> { - if (!this.getBrokerService().getPulsar().getBrokerService() - .isSystemTopic(TopicName.get(topic))) { - return deleteTopicPolicies(); - } else { - return CompletableFuture.completedFuture(null); - } - }) - .thenCompose(__ -> transactionBufferCleanupAndClose()) - .whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(ex); - } else { - List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>(); - subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + closeClientFuture.thenAccept(__ -> { + CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>(); + brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); + deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema()) + .thenCompose(ignore -> { + if (!EventsTopicNames.isTopicPoliciesSystemTopic(topic)) { + return deleteTopicPolicies(); + } else { + return CompletableFuture.completedFuture(null); + } + }) + .thenCompose(ignore -> transactionBufferCleanupAndClose()) + .whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Error deleting topic", topic, ex); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(ex); + } else { + List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>(); + subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { if (e != null) { log.error("[{}] Error deleting topic", topic, e); @@ -1211,36 +1210,36 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public void deleteLedgerComplete(Object ctx) { brokerService.removeTopicFromCache(PersistentTopic.this); - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } - - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, - Object ctx) { - if (exception.getCause() - instanceof MetadataStoreException.NotFoundException) { - log.info("[{}] Topic is already deleted {}", - topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", topic, exception); - deleteFuture.completeExceptionally( - new PersistenceException(exception)); - } + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, + Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", topic, exception); + deleteFuture.completeExceptionally( + new PersistenceException(exception)); } - }, null); - } - }); - } - }); + } + }, null); + } + }); + } + }); }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index fe4a4ab9ed6..fdc218a0f05 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -57,11 +59,13 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; +import lombok.Cleanup; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.v1.Namespaces; import org.apache.pulsar.broker.admin.v1.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -81,6 +85,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.events.EventsTopicNames; @@ -101,6 +106,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; import org.apache.zookeeper.KeeperException.Code; @@ -1822,7 +1828,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { } @Test - public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception { + public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception { String namespace = this.testTenant + "/delete-systemTopic"; String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic", "testNotClearTopicPolicesWhenDeleteSystemTopic").toString(); @@ -1842,4 +1848,34 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { // 4. delete the policies topic and the topic wil not to clear topic polices admin.topics().delete(namespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true); } + @Test + public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception { + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + Field field = PulsarService.class.getDeclaredField("topicPoliciesService"); + field.setAccessible(true); + field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar)); + + String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic"; + admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), + new TenantInfoImpl(Sets.newHashSet("role1", "role2"), + Sets.newHashSet("use", "usc", "usw"))); + + admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString()); + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(systemTopic).create(); + admin.topicPolicies().setMaxConsumers(systemTopic, 5); + + int maxConsumerPerTopic = pulsar + .getTopicPoliciesService() + .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get() + .getMaxConsumerPerTopic(); + + assertEquals(maxConsumerPerTopic, 5); + admin.topics().delete(systemTopic, true); + TopicPolicies topicPolicies = pulsar.getTopicPoliciesService() + .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS); + assertNull(topicPolicies); + } }
