This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9677d569a69d6bf5afe1822044416474d6ab6aa0
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue May 28 11:13:12 2024 +0800

    [fix] [broker] fix topic partitions was expanded even if disabled topic 
level replication (#22769)
    
    (cherry picked from commit 55ad4b22ba2e94029c2e1c01b67b22cb237e5ecc)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 11 +++-
 .../broker/service/OneWayReplicatorTest.java       | 65 +++++++++++++++++++++-
 .../broker/service/OneWayReplicatorTestBase.java   | 13 ++++-
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 10 ++++
 4 files changed, 93 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9c2746a0293..eec6370cec6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -452,7 +453,14 @@ public class PersistentTopicsBase extends AdminResource {
                                 if (!policies.isPresent()) {
                                     return 
CompletableFuture.completedFuture(null);
                                 }
-                                final Set<String> replicationClusters = 
policies.get().replication_clusters;
+                                // Combine namespace level policies and topic 
level policies.
+                                Set<String> replicationClusters = 
policies.get().replication_clusters;
+                                TopicPolicies topicPolicies =
+                                        
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
+                                if (topicPolicies != null) {
+                                    replicationClusters = new 
HashSet<>(topicPolicies.getReplicationClusters());
+                                }
+                                // Do check replicated clusters.
                                 if (replicationClusters.size() == 0) {
                                     return 
CompletableFuture.completedFuture(null);
                                 }
@@ -468,6 +476,7 @@ public class PersistentTopicsBase extends AdminResource {
                                     // The replication clusters just has the 
current cluster itself.
                                     return 
CompletableFuture.completedFuture(null);
                                 }
+                                // Do sync operation to other clusters.
                                 List<CompletableFuture<Void>> futures = 
replicationClusters.stream()
                                         .map(replicationCluster -> 
admin.clusters().getClusterAsync(replicationCluster)
                                                 .thenCompose(clusterData -> 
pulsarService.getBrokerService()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index a5f1339e95f..3dcd787a0cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -495,8 +495,17 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin2.topics().createPartitionedTopic(topicName, 2);
         admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
         // Check the partitioned topic has been created at the remote cluster.
-        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
-        assertEquals(topicMetadata2.partitions, 2);
+        Awaitility.await().untilAsserted(() -> {
+            PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+            assertEquals(topicMetadata2.partitions, 2);
+        });
+
+        // Expand partitions
+        admin2.topics().updatePartitionedTopic(topicName, 3);
+        Awaitility.await().untilAsserted(() -> {
+            PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+            assertEquals(topicMetadata2.partitions, 3);
+        });
         // cleanup.
         admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
         waitReplicatorStopped(partition0);
@@ -748,4 +757,56 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
                     
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
         }
     }
+
+    @Test
+    public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() 
throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        // Verify replicator works.
+        verifyReplicationWorks(topicName);
+
+        // Disable topic level replication.
+        setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
+        setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
+
+        // Expand topic.
+        admin1.topics().updatePartitionedTopic(topicName, 3);
+        
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+
+        // Wait for async tasks that were triggered by expanding topic 
partitions.
+        Thread.sleep(3 * 1000);
+
+
+        // Verify: the topics on the remote cluster did not been expanded.
+        
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 
2);
+
+        cleanupTopics(() -> {
+            admin1.topics().deletePartitionedTopic(topicName, false);
+            admin2.topics().deletePartitionedTopic(topicName, false);
+        });
+    }
+
+    @Test
+    public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        // Verify replicator works.
+        verifyReplicationWorks(topicName);
+
+        // Expand topic.
+        admin1.topics().updatePartitionedTopic(topicName, 3);
+        
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+
+        // Verify: the topics on the remote cluster will be expanded.
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+        });
+
+        cleanupTopics(() -> {
+            admin1.topics().deletePartitionedTopic(topicName, false);
+            admin2.topics().deletePartitionedTopic(topicName, false);
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 6a84432890c..7372b2e4784 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -196,9 +196,16 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     }
 
     protected void waitChangeEventsInit(String namespace) {
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
-                .getTopic(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false)
-                .join().get();
+        CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService()
+                .getTopic(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false);
+        if (future == null) {
+            return;
+        }
+        Optional<Topic> optional = future.join();
+        if (!optional.isPresent()) {
+            return;
+        }
+        PersistentTopic topic = (PersistentTopic) optional.get();
         Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> 
{
             TopicStatsImpl topicStats = topic.getStats(true, false, false);
             topicStats.getSubscriptions().entrySet().forEach(entry -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index d827235bc32..b4747a8bd0e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -94,4 +94,14 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testDeletePartitionedTopic() throws Exception {
         super.testDeletePartitionedTopic();
     }
+
+    @Test(enabled = false)
+    public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() 
throws Exception {
+        super.testNoExpandTopicPartitionsWhenDisableTopicLevelReplication();
+    }
+
+    @Test(enabled = false)
+    public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws 
Exception {
+        super.testExpandTopicPartitionsOnNamespaceLevelReplication();
+    }
 }

Reply via email to