This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new e8b70bfbf6e [fix][broker] Fix updatePartitionedTopic when replication 
at ns level and topic policy is set (#22971)
e8b70bfbf6e is described below

commit e8b70bfbf6e3ff06a3a8e1c9eababf1cc177a9fe
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Tue Jun 25 09:38:56 2024 +0300

    [fix][broker] Fix updatePartitionedTopic when replication at ns level and 
topic policy is set (#22971)
    
    (cherry picked from commit 1c44fbb8a03e583e94aa9dbef87dfa0a165e1cd8)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 ++--
 .../broker/service/OneWayReplicatorTest.java       | 27 ++++++++++++++++++++--
 .../broker/service/OneWayReplicatorTestBase.java   |  9 ++++----
 3 files changed, 32 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eec6370cec6..ebb92679599 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -457,8 +456,8 @@ public class PersistentTopicsBase extends AdminResource {
                                 Set<String> replicationClusters = 
policies.get().replication_clusters;
                                 TopicPolicies topicPolicies =
                                         
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
-                                if (topicPolicies != null) {
-                                    replicationClusters = new 
HashSet<>(topicPolicies.getReplicationClusters());
+                                if (topicPolicies != null && 
topicPolicies.getReplicationClusters() != null) {
+                                    replicationClusters = 
topicPolicies.getReplicationClustersSet();
                                 }
                                 // Do check replicated clusters.
                                 if (replicationClusters.size() == 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index c9b23c6437a..b751d269d1f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -71,11 +71,11 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
@@ -491,6 +491,29 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin2.topics().deletePartitionedTopic(topicName);
     }
 
+    // https://github.com/apache/pulsar/issues/22967
+    @Test
+    public void testPartitionedTopicWithTopicPolicyAndNoReplicationClusters() 
throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        try {
+            admin1.topicPolicies().setMessageTTL(topicName, 5);
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 
2);
+            });
+            admin1.topics().updatePartitionedTopic(topicName, 3, false);
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+            });
+        } finally {
+            // cleanup.
+            admin1.topics().deletePartitionedTopic(topicName, true);
+            if (!usingGlobalZK) {
+                admin2.topics().deletePartitionedTopic(topicName, true);
+            }
+        }
+    }
+
     @Test
     public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index ffe6147412e..d66e666e3a0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -266,6 +266,7 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         config.setEnableReplicatedSubscriptions(true);
         config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
         config.setLoadBalancerSheddingEnabled(false);
+        config.setForceDeleteNamespaceAllowed(true);
     }
 
     @Override
@@ -276,11 +277,11 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         if (!usingGlobalZK) {
             
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster2));
         }
-        admin1.namespaces().deleteNamespace(replicatedNamespace);
-        admin1.namespaces().deleteNamespace(nonReplicatedNamespace);
+        admin1.namespaces().deleteNamespace(replicatedNamespace, true);
+        admin1.namespaces().deleteNamespace(nonReplicatedNamespace, true);
         if (!usingGlobalZK) {
-            admin2.namespaces().deleteNamespace(replicatedNamespace);
-            admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
+            admin2.namespaces().deleteNamespace(replicatedNamespace, true);
+            admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
         }
 
         // shutdown.

Reply via email to