This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6ec15d88e99 [improve] [broker] [break change] Do not create 
partitioned DLQ/Retry topic automatically (#22705)
6ec15d88e99 is described below

commit 6ec15d88e99a640f74777d09a1a1ff2945dff517
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu May 16 02:47:26 2024 +0800

    [improve] [broker] [break change] Do not create partitioned DLQ/Retry topic 
automatically (#22705)
    
    (cherry picked from commit f07b3a030179c38f9786b3e26c82aa13e00b34a6)
---
 .../pulsar/broker/service/BrokerService.java       |   6 +
 .../DeadLetterTopicDefaultMultiPartitionsTest.java | 251 +++++++++++++++++++++
 2 files changed, 257 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b74e6ee2c4e..85930f06856 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,6 +23,8 @@ import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
 import static org.apache.commons.collections4.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+import static 
org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -3525,6 +3527,10 @@ public class BrokerService implements Closeable {
     }
 
     public boolean isDefaultTopicTypePartitioned(final TopicName topicName, 
final Optional<Policies> policies) {
+        if 
(topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX)
+                || 
topicName.getPartitionedTopicName().endsWith(RETRY_GROUP_TOPIC_SUFFIX)) {
+            return false;
+        }
         AutoTopicCreationOverride autoTopicCreationOverride = 
getAutoTopicCreationOverride(topicName, policies);
         if (autoTopicCreationOverride != null) {
             return 
TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicDefaultMultiPartitionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicDefaultMultiPartitionsTest.java
new file mode 100644
index 00000000000..b8bccb79372
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicDefaultMultiPartitionsTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static 
org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class DeadLetterTopicDefaultMultiPartitionsTest extends 
ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setMaxMessageSize(5 * 1024);
+        this.conf.setAllowAutoTopicCreation(true);
+        this.conf.setDefaultNumPartitions(2);
+        this.conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void triggerDLQGenerate(String topic, String subscription) throws 
Exception {
+        String DLQ = getDLQName(topic, subscription);
+        String p0OfDLQ = TopicName.get(DLQ).getPartition(0).toString();
+        Consumer consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName(subscription)
+                .ackTimeout(1000, TimeUnit.MILLISECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(10)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscribe();
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+        producer.newMessage().value(new byte[]{1}).send();
+
+        Message<Integer> message1 = consumer.receive();
+        consumer.negativeAcknowledge(message1);
+        Message<Integer> message2 = consumer.receive();
+        consumer.negativeAcknowledge(message2);
+
+        Awaitility.await().atMost(Duration.ofSeconds(1500)).until(() -> {
+            Message<Integer> message3 = consumer.receive(2, TimeUnit.SECONDS);
+            if (message3 != null) {
+                log.info("===> {}", message3.getRedeliveryCount());
+                consumer.negativeAcknowledge(message3);
+            }
+            List<String> topicList = 
pulsar.getPulsarResources().getTopicResources()
+                    
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
+            if (topicList.contains(DLQ) || topicList.contains(p0OfDLQ)) {
+                return true;
+            }
+            int partitions = 
admin.topics().getPartitionedTopicMetadata(topic).partitions;
+            for (int i = 0; i < partitions; i++) {
+                for (int j = -1; j < 
pulsar.getConfig().getDefaultNumPartitions(); j++) {
+                    String p0OfDLQ2;
+                    if (j == -1) {
+                        p0OfDLQ2 = TopicName
+                                
.get(getDLQName(TopicName.get(topic).getPartition(i).toString(), subscription))
+                                .toString();
+                    } else {
+                        p0OfDLQ2 = TopicName
+                                
.get(getDLQName(TopicName.get(topic).getPartition(i).toString(), subscription))
+                                .getPartition(j).toString();
+                    }
+                    if (topicList.contains(p0OfDLQ2)) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        });
+        producer.close();
+        consumer.close();
+        admin.topics().unload(topic);
+    }
+
+    private static String getDLQName(String primaryTopic, String subscription) 
{
+        String domain = TopicName.get(primaryTopic).getDomain().toString();
+        return domain + "://" + TopicName.get(primaryTopic)
+                .toString().substring(( domain + "://").length())
+                + "-" + subscription + DLQ_GROUP_TOPIC_SUFFIX;
+    }
+
+    @DataProvider(name = "topicCreationTypes")
+    public Object[][] topicCreationTypes() {
+        return new Object[][]{
+                //{TopicType.NON_PARTITIONED},
+                {TopicType.PARTITIONED}
+        };
+    }
+
+    @Test(dataProvider = "topicCreationTypes")
+    public void testGenerateNonPartitionedDLQ(TopicType topicType) throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName( 
"persistent://public/default/tp");
+        final String subscription = "s1";
+        switch (topicType) {
+            case PARTITIONED: {
+                admin.topics().createPartitionedTopic(topic, 2);
+                break;
+            }
+            case NON_PARTITIONED: {
+                admin.topics().createNonPartitionedTopic(topic);
+            }
+        }
+
+        triggerDLQGenerate(topic, subscription);
+
+        // Verify: no partitioned DLQ.
+        List<String> partitionedTopics = 
pulsar.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.listPartitionedTopicsAsync(TopicName.get(topic).getNamespaceObject(), 
TopicDomain.persistent).join();
+        for (String tp : partitionedTopics) {
+            assertFalse(tp.endsWith("-DLQ"));
+        }
+        // Verify: non-partitioned DLQ exists.
+        List<String> partitions = 
pulsar.getPulsarResources().getTopicResources()
+                
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
+        List<String> DLQCreated = new ArrayList<>();
+        for (String tp : partitions) {
+            if (tp.endsWith("-DLQ")) {
+                DLQCreated.add(tp);
+            }
+            assertFalse(tp.endsWith("-partition-0-DLQ"));
+        }
+        assertTrue(!DLQCreated.isEmpty());
+
+        // cleanup.
+        switch (topicType) {
+            case PARTITIONED: {
+                admin.topics().deletePartitionedTopic(topic);
+                break;
+            }
+            case NON_PARTITIONED: {
+                admin.topics().delete(topic, false);
+            }
+        }
+        for (String t : DLQCreated) {
+            try {
+                
admin.topics().delete(TopicName.get(t).getPartitionedTopicName(), false);
+            } catch (Exception ex) {}
+            try {
+                
admin.topics().deletePartitionedTopic(TopicName.get(t).getPartitionedTopicName(),
 false);
+            } catch (Exception ex) {}
+        }
+    }
+
+    @Test
+    public void testManuallyCreatePartitionedDLQ() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName( 
"persistent://public/default/tp");
+        final String subscription = "s1";
+        String DLQ = getDLQName(topic, subscription);
+        String p0OfDLQ = TopicName.get(DLQ).getPartition(0).toString();
+        String p1OfDLQ = TopicName.get(DLQ).getPartition(1).toString();
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createPartitionedTopic(DLQ, 2);
+
+        Awaitility.await().untilAsserted(() -> {
+            // Verify: partitioned DLQ exists.
+            List<String> partitionedTopics = 
pulsar.getPulsarResources().getNamespaceResources()
+                    .getPartitionedTopicResources()
+                    
.listPartitionedTopicsAsync(TopicName.get(topic).getNamespaceObject(), 
TopicDomain.persistent).join();
+            assertTrue(partitionedTopics.contains(DLQ));
+            assertFalse(partitionedTopics.contains(p0OfDLQ));
+            // Verify: DLQ partitions exists.
+            List<String> partitions = 
pulsar.getPulsarResources().getTopicResources()
+                    
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
+            assertFalse(partitions.contains(DLQ));
+            assertTrue(partitions.contains(p0OfDLQ));
+            assertTrue(partitions.contains(p1OfDLQ));
+        });
+
+        // cleanup.
+        admin.topics().delete(topic, false);
+        admin.topics().deletePartitionedTopic(DLQ, false);
+    }
+
+    @Test
+    public void testManuallyCreatePartitionedDLQ2() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName( 
"persistent://public/default/tp");
+        final String subscription = "s1";
+        final String p0OfTopic = 
TopicName.get(topic).getPartition(0).toString();
+        String DLQ = getDLQName(p0OfTopic, subscription);
+        String p0OfDLQ = TopicName.get(DLQ).getPartition(0).toString();
+        admin.topics().createPartitionedTopic(topic, 10);
+        try {
+            admin.topics().createPartitionedTopic(DLQ, 2);
+        } catch (Exception ex) {
+            // Keep multiple versions compatible.
+            if (ex.getMessage().contains("Partitioned Topic Name should not 
contain '-partition-'")){
+                return;
+            } else {
+                fail("Failed to create partitioned DLQ");
+            }
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            // Verify: partitioned DLQ exists.
+            List<String> partitionedTopics = 
pulsar.getPulsarResources().getNamespaceResources()
+                    .getPartitionedTopicResources()
+                    
.listPartitionedTopicsAsync(TopicName.get(topic).getNamespaceObject(), 
TopicDomain.persistent).join();
+            assertTrue(partitionedTopics.contains(DLQ));
+            assertFalse(partitionedTopics.contains(p0OfDLQ));
+            // Verify: DLQ partitions exists.
+            List<String> partitions = 
pulsar.getPulsarResources().getTopicResources()
+                    
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
+            assertFalse(partitions.contains(DLQ));
+        });
+
+        // cleanup.
+        admin.topics().deletePartitionedTopic(topic, false);
+        admin.topics().deletePartitionedTopic(DLQ, false);
+    }
+}

Reply via email to