This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 57e48d14effb18d170a3336fda1419fd0f77e619 Author: Joforde <[email protected]> AuthorDate: Fri Aug 1 11:58:15 2025 +0800 [fix][broker] Fix REST API to produce messages to single-partitioned topics (#24450) (cherry picked from commit fa28d1c1ef1611bf164d036628b3a5a6dc52adfc) --- .../java/org/apache/pulsar/broker/rest/TopicsBase.java | 2 +- .../org/apache/pulsar/broker/admin/TopicsTest.java | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index 95c5f2a590b..47067c55005 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -345,7 +345,7 @@ public class TopicsBase extends PersistentTopicsBase { List<String> redirectAddresses = Collections.synchronizedList(new ArrayList<>()); CompletableFuture<Boolean> future = new CompletableFuture<>(); List<CompletableFuture<Void>> lookupFutures = new ArrayList<>(); - if (!topicName.isPartitioned() && metadata.partitions > 1) { + if (!topicName.isPartitioned() && metadata.partitions > 0) { // Partitioned topic with multiple partitions, need to do look up for each partition. for (int index = 0; index < metadata.partitions; index++) { lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index 5208032f4ab..72426110224 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -96,6 +96,7 @@ import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TopicsTest extends MockedPulsarServiceBaseTest { @@ -168,10 +169,19 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { }).readValue(message); } - @Test - public void testProduceToPartitionedTopic() throws Exception { + @DataProvider(name = "partitionNumbers") + public Object[][] partitionNumbers() { + return new Object[][] { + //produce to single-partitioned topic + {1}, + {5}, + }; + } + + @Test(dataProvider = "partitionNumbers") + public void testProduceToPartitionedTopic(int numPartitions) throws Exception { admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" + testNamespace - + "/" + testTopicName + "-p", 5); + + "/" + testTopicName + "-p", numPartitions); AsyncResponse asyncResponse = mock(AsyncResponse.class); Schema<String> schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); @@ -210,7 +220,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { } for (int index = 0; index < messagePerPartition.length; index++) { // We publish to each partition in round robin mode so each partition should get at most 2 message. - Assert.assertTrue(messagePerPartition[index] <= 2); + Assert.assertTrue(messagePerPartition[index] <= 10 / numPartitions); } }
