This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9677d569a69d6bf5afe1822044416474d6ab6aa0 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Tue May 28 11:13:12 2024 +0800 [fix] [broker] fix topic partitions was expanded even if disabled topic level replication (#22769) (cherry picked from commit 55ad4b22ba2e94029c2e1c01b67b22cb237e5ecc) --- .../broker/admin/impl/PersistentTopicsBase.java | 11 +++- .../broker/service/OneWayReplicatorTest.java | 65 +++++++++++++++++++++- .../broker/service/OneWayReplicatorTestBase.java | 13 ++++- .../service/OneWayReplicatorUsingGlobalZKTest.java | 10 ++++ 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9c2746a0293..eec6370cec6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Base64; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -452,7 +453,14 @@ public class PersistentTopicsBase extends AdminResource { if (!policies.isPresent()) { return CompletableFuture.completedFuture(null); } - final Set<String> replicationClusters = policies.get().replication_clusters; + // Combine namespace level policies and topic level policies. + Set<String> replicationClusters = policies.get().replication_clusters; + TopicPolicies topicPolicies = + pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName); + if (topicPolicies != null) { + replicationClusters = new HashSet<>(topicPolicies.getReplicationClusters()); + } + // Do check replicated clusters. if (replicationClusters.size() == 0) { return CompletableFuture.completedFuture(null); } @@ -468,6 +476,7 @@ public class PersistentTopicsBase extends AdminResource { // The replication clusters just has the current cluster itself. return CompletableFuture.completedFuture(null); } + // Do sync operation to other clusters. List<CompletableFuture<Void>> futures = replicationClusters.stream() .map(replicationCluster -> admin.clusters().getClusterAsync(replicationCluster) .thenCompose(clusterData -> pulsarService.getBrokerService() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index a5f1339e95f..3dcd787a0cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -495,8 +495,17 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { admin2.topics().createPartitionedTopic(topicName, 2); admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); // Check the partitioned topic has been created at the remote cluster. - PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); - assertEquals(topicMetadata2.partitions, 2); + Awaitility.await().untilAsserted(() -> { + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 2); + }); + + // Expand partitions + admin2.topics().updatePartitionedTopic(topicName, 3); + Awaitility.await().untilAsserted(() -> { + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 3); + }); // cleanup. admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); waitReplicatorStopped(partition0); @@ -748,4 +757,56 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); } } + + @Test + public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createPartitionedTopic(topicName, 2); + + // Verify replicator works. + verifyReplicationWorks(topicName); + + // Disable topic level replication. + setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); + setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); + + // Expand topic. + admin1.topics().updatePartitionedTopic(topicName, 3); + assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + + // Wait for async tasks that were triggered by expanding topic partitions. + Thread.sleep(3 * 1000); + + + // Verify: the topics on the remote cluster did not been expanded. + assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 2); + + cleanupTopics(() -> { + admin1.topics().deletePartitionedTopic(topicName, false); + admin2.topics().deletePartitionedTopic(topicName, false); + }); + } + + @Test + public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createPartitionedTopic(topicName, 2); + + // Verify replicator works. + verifyReplicationWorks(topicName); + + // Expand topic. + admin1.topics().updatePartitionedTopic(topicName, 3); + assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + + // Verify: the topics on the remote cluster will be expanded. + Awaitility.await().untilAsserted(() -> { + assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + }); + + cleanupTopics(() -> { + admin1.topics().deletePartitionedTopic(topicName, false); + admin2.topics().deletePartitionedTopic(topicName, false); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 6a84432890c..7372b2e4784 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -196,9 +196,16 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { } protected void waitChangeEventsInit(String namespace) { - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() - .getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false) - .join().get(); + CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService() + .getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false); + if (future == null) { + return; + } + Optional<Topic> optional = future.join(); + if (!optional.isPresent()) { + return; + } + PersistentTopic topic = (PersistentTopic) optional.get(); Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> { TopicStatsImpl topicStats = topic.getStats(true, false, false); topicStats.getSubscriptions().entrySet().forEach(entry -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index d827235bc32..b4747a8bd0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -94,4 +94,14 @@ public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest { public void testDeletePartitionedTopic() throws Exception { super.testDeletePartitionedTopic(); } + + @Test(enabled = false) + public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception { + super.testNoExpandTopicPartitionsWhenDisableTopicLevelReplication(); + } + + @Test(enabled = false) + public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception { + super.testExpandTopicPartitionsOnNamespaceLevelReplication(); + } }