This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 868458a8c60 [improve][test] Add subscribing regex topic test for
`delete_when_subscriptions_caught_up`. (#18368)
868458a8c60 is described below
commit 868458a8c60ea0b3b66716895d0965da177e7c90
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Nov 15 21:30:24 2022 +0800
[improve][test] Add subscribing regex topic test for
`delete_when_subscriptions_caught_up`. (#18368)
---
.../broker/service/InactiveTopicDeleteTest.java | 39 ++++++++++++++++++----
.../policies/data/InactiveTopicDeleteMode.java | 2 +-
2 files changed, 34 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index f1dccce96c8..ebd53d65a74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -33,7 +33,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
@@ -314,30 +317,54 @@ public class InactiveTopicDeleteTest extends
BrokerTestBase {
super.baseSetup();
final String topic =
"persistent://prop/ns-abc/testDeleteWhenNoBacklogs";
-
+ final String topic2 =
"persistent://prop/ns-abc/testDeleteWhenNoBacklogsB";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
+ Producer<byte[]> producer2 = pulsarClient.newProducer()
+ .topic(topic2)
+ .create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
- for (int i = 0; i < 10; i++) {
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topicsPattern("persistent://prop/ns-abc/test.*")
+ .subscriptionName("sub2")
+ .subscribe();
+
+ int producedCount = 10;
+ for (int i = 0; i < producedCount; i++) {
producer.send("Pulsar".getBytes());
+ producer2.send("Pulsar".getBytes());
}
producer.close();
+ producer2.close();
+ int receivedCount = 0;
+ Message<byte[]> msg;
+ while((msg = consumer2.receive(1, TimeUnit.SECONDS)) != null) {
+ consumer2.acknowledge(msg);
+ receivedCount ++;
+ }
+ assertEquals(producedCount * 2, receivedCount);
Thread.sleep(2000);
- Assert.assertTrue(admin.topics().getList("prop/ns-abc")
- .contains(topic));
+
Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(topic));
admin.topics().skipAllMessages(topic, "sub");
- Awaitility.await()
- .untilAsserted(() ->
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(consumer.isConnected());
+ final List<ConsumerImpl> consumers = ((MultiTopicsConsumerImpl)
consumer2).getConsumers();
+ consumers.forEach(c -> Assert.assertFalse(c.isConnected()));
+ Assert.assertFalse(consumer2.isConnected());
+
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic));
+
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic2));
+ });
consumer.close();
+ consumer2.close();
}
@Test
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java
index e1fecb39417..6c00004d596 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java
@@ -29,7 +29,7 @@ public enum InactiveTopicDeleteMode {
delete_when_no_subscriptions,
/**
- * The topic can be deleted when all subscriptions catchup and no active
producers/consumers.
+ * The topic can be deleted when all subscriptions catchup and no active
producers.
*/
delete_when_subscriptions_caught_up
}