This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new e8b70bfbf6e [fix][broker] Fix updatePartitionedTopic when replication at ns level and topic policy is set (#22971) e8b70bfbf6e is described below commit e8b70bfbf6e3ff06a3a8e1c9eababf1cc177a9fe Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Tue Jun 25 09:38:56 2024 +0300 [fix][broker] Fix updatePartitionedTopic when replication at ns level and topic policy is set (#22971) (cherry picked from commit 1c44fbb8a03e583e94aa9dbef87dfa0a165e1cd8) --- .../broker/admin/impl/PersistentTopicsBase.java | 5 ++-- .../broker/service/OneWayReplicatorTest.java | 27 ++++++++++++++++++++-- .../broker/service/OneWayReplicatorTestBase.java | 9 ++++---- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index eec6370cec6..ebb92679599 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Base64; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -457,8 +456,8 @@ public class PersistentTopicsBase extends AdminResource { Set<String> replicationClusters = policies.get().replication_clusters; TopicPolicies topicPolicies = pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName); - if (topicPolicies != null) { - replicationClusters = new HashSet<>(topicPolicies.getReplicationClusters()); + if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) { + replicationClusters = topicPolicies.getReplicationClustersSet(); } // Do check replicated clusters. if (replicationClusters.size() == 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index c9b23c6437a..b751d269d1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -71,11 +71,11 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; @@ -491,6 +491,29 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { admin2.topics().deletePartitionedTopic(topicName); } + // https://github.com/apache/pulsar/issues/22967 + @Test + public void testPartitionedTopicWithTopicPolicyAndNoReplicationClusters() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createPartitionedTopic(topicName, 2); + try { + admin1.topicPolicies().setMessageTTL(topicName, 5); + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 2); + }); + admin1.topics().updatePartitionedTopic(topicName, 3, false); + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + }); + } finally { + // cleanup. + admin1.topics().deletePartitionedTopic(topicName, true); + if (!usingGlobalZK) { + admin2.topics().deletePartitionedTopic(topicName, true); + } + } + } + @Test public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index ffe6147412e..d66e666e3a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -266,6 +266,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadBalancerSheddingEnabled(false); + config.setForceDeleteNamespaceAllowed(true); } @Override @@ -276,11 +277,11 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { if (!usingGlobalZK) { admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2)); } - admin1.namespaces().deleteNamespace(replicatedNamespace); - admin1.namespaces().deleteNamespace(nonReplicatedNamespace); + admin1.namespaces().deleteNamespace(replicatedNamespace, true); + admin1.namespaces().deleteNamespace(nonReplicatedNamespace, true); if (!usingGlobalZK) { - admin2.namespaces().deleteNamespace(replicatedNamespace); - admin2.namespaces().deleteNamespace(nonReplicatedNamespace); + admin2.namespaces().deleteNamespace(replicatedNamespace, true); + admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true); } // shutdown.