This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new e146f16e352 cherry-pick -x "de6948c"
e146f16e352 is described below

commit e146f16e352fea03f9dd1fdadfa6a6f2478b2a70
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 19 16:53:56 2022 +0800

    cherry-pick -x "de6948c"
---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 51 ++++++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index cf0ab1914f9..72b317a80fa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -44,6 +45,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.core.Response.Status;
 import lombok.Cleanup;
@@ -75,6 +77,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -93,6 +96,7 @@ import 
org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -1339,6 +1343,9 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic(topic, 10);
         assertFalse(admin.topics().getList(namespace).isEmpty());
 
+        // Wait for change event topic and compaction create finish.
+        awaitChangeEventTopicAndCompactionCreateFinish(namespace, 
String.format("persistent://%s", topic));
+
         try {
             admin.namespaces().deleteNamespace(namespace, false);
             fail("should have failed due to namespace not empty");
@@ -1363,6 +1370,50 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
     }
 
+    private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, 
String topic) throws Exception {
+        if (!pulsar.getConfiguration().isSystemTopicEnabled()){
+            return;
+        }
+        // Trigger change event topic create.
+        SubscribeRate subscribeRate = new SubscribeRate(-1, 60);
+        admin.topicPolicies().setSubscribeRate(topic, subscribeRate);
+        // Wait for change event topic and compaction create finish.
+        String allowAutoTopicCreationType = 
pulsar.getConfiguration().getAllowAutoTopicCreationType();
+        int defaultNumPartitions = 
pulsar.getConfiguration().getDefaultNumPartitions();
+        ArrayList<String> expectChangeEventTopics = new ArrayList<>();
+        if ("non-partitioned".equals(allowAutoTopicCreationType)){
+            String t = String.format("persistent://%s/%s", ns, 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+            expectChangeEventTopics.add(t);
+        } else {
+            for (int i = 0; i < defaultNumPartitions; i++){
+                String t = String.format("persistent://%s/%s-partition-%s", ns,
+                        EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, i);
+                expectChangeEventTopics.add(t);
+            }
+        }
+        Awaitility.await().until(() -> {
+            boolean finished = true;
+            for (String changeEventTopicName : expectChangeEventTopics){
+                CompletableFuture<Optional<Topic>> completableFuture = 
pulsar.getBrokerService().getTopic(changeEventTopicName, false);
+                if (completableFuture == null){
+                    finished = false;
+                }
+                Optional<Topic> optionalTopic = completableFuture.get();
+                if (!optionalTopic.isPresent()){
+                    finished = false;
+                }
+                PersistentTopic changeEventTopic = (PersistentTopic) 
optionalTopic.get();
+                if (!changeEventTopic.isCompactionEnabled().get()){
+                    continue;
+                }
+                if 
(!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
+                    finished = false;
+                }
+            }
+            return finished;
+        });
+    }
+
     @Test
     public void testDeleteNamespaceWithTopicPolicies() throws Exception {
         stopBroker();

Reply via email to