This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new e146f16e352 cherry-pick -x "de6948c"
e146f16e352 is described below
commit e146f16e352fea03f9dd1fdadfa6a6f2478b2a70
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 19 16:53:56 2022 +0800
cherry-pick -x "de6948c"
---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 51 ++++++++++++++++++++++
1 file changed, 51 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index cf0ab1914f9..72b317a80fa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -44,6 +45,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response.Status;
import lombok.Cleanup;
@@ -75,6 +77,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -93,6 +96,7 @@ import
org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -1339,6 +1343,9 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
+ // Wait for change event topic and compaction create finish.
+ awaitChangeEventTopicAndCompactionCreateFinish(namespace,
String.format("persistent://%s", topic));
+
try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
@@ -1363,6 +1370,50 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}
+ private void awaitChangeEventTopicAndCompactionCreateFinish(String ns,
String topic) throws Exception {
+ if (!pulsar.getConfiguration().isSystemTopicEnabled()){
+ return;
+ }
+ // Trigger change event topic create.
+ SubscribeRate subscribeRate = new SubscribeRate(-1, 60);
+ admin.topicPolicies().setSubscribeRate(topic, subscribeRate);
+ // Wait for change event topic and compaction create finish.
+ String allowAutoTopicCreationType =
pulsar.getConfiguration().getAllowAutoTopicCreationType();
+ int defaultNumPartitions =
pulsar.getConfiguration().getDefaultNumPartitions();
+ ArrayList<String> expectChangeEventTopics = new ArrayList<>();
+ if ("non-partitioned".equals(allowAutoTopicCreationType)){
+ String t = String.format("persistent://%s/%s", ns,
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ expectChangeEventTopics.add(t);
+ } else {
+ for (int i = 0; i < defaultNumPartitions; i++){
+ String t = String.format("persistent://%s/%s-partition-%s", ns,
+ EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, i);
+ expectChangeEventTopics.add(t);
+ }
+ }
+ Awaitility.await().until(() -> {
+ boolean finished = true;
+ for (String changeEventTopicName : expectChangeEventTopics){
+ CompletableFuture<Optional<Topic>> completableFuture =
pulsar.getBrokerService().getTopic(changeEventTopicName, false);
+ if (completableFuture == null){
+ finished = false;
+ }
+ Optional<Topic> optionalTopic = completableFuture.get();
+ if (!optionalTopic.isPresent()){
+ finished = false;
+ }
+ PersistentTopic changeEventTopic = (PersistentTopic)
optionalTopic.get();
+ if (!changeEventTopic.isCompactionEnabled().get()){
+ continue;
+ }
+ if
(!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
+ finished = false;
+ }
+ }
+ return finished;
+ });
+ }
+
@Test
public void testDeleteNamespaceWithTopicPolicies() throws Exception {
stopBroker();