This is an automated email from the ASF dual-hosted git repository. jianghaiting pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 603ce8650b7eacec7d01c2d5ebbc37a1f24c6265 Author: fengyubiao <[email protected]> AuthorDate: Fri Aug 19 16:53:56 2022 +0800 [fix][flaky-test]AdminApi2Test.testDeleteNamespace (#17157) (cherry picked from commit de6948ca7def90921ce7e9325a001bca2b21a1ad) --- .../apache/pulsar/broker/admin/AdminApi2Test.java | 49 ++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 6fc65fb45de..af518a36b31 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.admin; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.common.events.EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -44,6 +46,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.NotAcceptableException; import javax.ws.rs.core.Response.Status; @@ -99,6 +102,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; @@ -1404,6 +1408,9 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { admin.topics().createPartitionedTopic(topic, 10); assertFalse(admin.topics().getList(namespace).isEmpty()); + // Wait for change event topic and compaction create finish. + awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic)); + try { admin.namespaces().deleteNamespace(namespace, false); fail("should have failed due to namespace not empty"); @@ -1427,7 +1434,49 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { final String bundleDataPath = "/loadbalance/bundle-data/" + namespace; assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join()); + } + private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception { + if (!pulsar.getConfiguration().isSystemTopicEnabled()){ + return; + } + // Trigger change event topic create. + SubscribeRate subscribeRate = new SubscribeRate(-1, 60); + admin.topicPolicies().setSubscribeRate(topic, subscribeRate); + // Wait for change event topic and compaction create finish. + String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); + int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); + ArrayList<String> expectChangeEventTopics = new ArrayList<>(); + if ("non-partitioned".equals(allowAutoTopicCreationType)){ + String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME); + expectChangeEventTopics.add(t); + } else { + for (int i = 0; i < defaultNumPartitions; i++){ + String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i); + expectChangeEventTopics.add(t); + } + } + Awaitility.await().until(() -> { + boolean finished = true; + for (String changeEventTopicName : expectChangeEventTopics){ + CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false); + if (completableFuture == null){ + finished = false; + } + Optional<Topic> optionalTopic = completableFuture.get(); + if (!optionalTopic.isPresent()){ + finished = false; + } + PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get(); + if (!changeEventTopic.isCompactionEnabled()){ + continue; + } + if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){ + finished = false; + } + } + return finished; + }); } @Test
