This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2acefde47e61efd84bcd381efdfb5f4b95c76265 Author: Nicklee007 <84127069+nicklee...@users.noreply.github.com> AuthorDate: Wed Aug 25 14:02:48 2021 +0800 fix the bug, can not update topic when the update topicName is contained by an existed topic as a part (#11686) Fixes #11685 validatePartitionTopicUpdate use contain to check if there has a exist topic will cause conflict, which will cause a failed when exist a topic which contain the new topic's prefix and we want to update the new topic partition; we have a those topic "persistent://public/default/113p-partition-0" "persistent://public/default/113p-partition-1" "persistent://public/default/113p-partition-2" "persistent://public/default/3p-partition-0" when we want to update topic 3p to more partitions ,we failed because "persistent://public/default/113p-partition-0" contain "3p-partition" Modifications use the startwith to check if exist the same topic. * fix the bug, the old topic contain a same strSub cause couldn't add new partitions * add update the partitioned topic which a part is coontained in old topic test Co-authored-by: nicklixinyang <nicklixiny...@didiglobal.com> (cherry picked from commit 241de4b8550237458e96bf4227185243d5c8a550) --- .../broker/admin/impl/PersistentTopicsBase.java | 6 ++--- .../org/apache/pulsar/broker/admin/AdminTest.java | 31 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9bbf018..f63d318 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3721,9 +3721,9 @@ public class PersistentTopicsBase extends AdminResource { TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName); PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false); int oldPartition = metadata.partitions; - String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX; + String prefix = partitionTopicName.getPartitionedTopicName() + TopicName.PARTITIONED_TOPIC_SUFFIX; for (String exsitingTopicName : existingTopicList) { - if (exsitingTopicName.contains(prefix)) { + if (exsitingTopicName.startsWith(prefix)) { try { long suffix = Long.parseLong(exsitingTopicName.substring( exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX) @@ -3738,7 +3738,7 @@ public class PersistentTopicsBase extends AdminResource { clientAppId(), exsitingTopicName, topicName); throw new RestException(Status.PRECONDITION_FAILED, - "Already have non partition topic" + exsitingTopicName + "Already have non partition topic " + exsitingTopicName + " which contains partition suffix '-partition-' " + "and end with numeric value and end with numeric value smaller than the new " + "number of partition. Update of partitioned topic " diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 81ec0d5..8afb61b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -103,6 +103,7 @@ import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -821,6 +822,36 @@ public class AdminTest extends MockedPulsarServiceBaseTest { } + + @Test + public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception { + + final String property = "prop-xyz"; + final String cluster = "use"; + final String namespace = "ns"; + final String partitionedTopicName = "old-special-topic"; + final String partitionedTopicName2 = "special-topic"; + + ZkUtils.createFullPathOptimistic(mockZooKeeperGlobal, PulsarWebResource.path(POLICIES, property, cluster, namespace), + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + AsyncResponse response1 = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response1, property, cluster, namespace, partitionedTopicName, 5, false); + verify(response1, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + AsyncResponse response2 = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response2, property, cluster, namespace, partitionedTopicName2, 2, false); + verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, 10); + } + + static class TestAsyncResponse implements AsyncResponse { Object response;