This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7a2d2db1bdd2b9e85316ff27ec9f22763159953b Author: Joforde <[email protected]> AuthorDate: Fri Aug 1 11:58:15 2025 +0800 [fix][broker] Fix REST API to produce messages to single-partitioned topics (#24450) (cherry picked from commit fa28d1c1ef1611bf164d036628b3a5a6dc52adfc) --- .../java/org/apache/pulsar/broker/rest/TopicsBase.java | 2 +- .../org/apache/pulsar/broker/admin/TopicsTest.java | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index 3205eed6993..d4a92a3ea21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -343,7 +343,7 @@ public class TopicsBase extends PersistentTopicsBase { List<String> redirectAddresses = Collections.synchronizedList(new ArrayList<>()); CompletableFuture<Boolean> future = new CompletableFuture<>(); List<CompletableFuture<Void>> lookupFutures = new ArrayList<>(); - if (!topicName.isPartitioned() && metadata.partitions > 1) { + if (!topicName.isPartitioned() && metadata.partitions > 0) { // Partitioned topic with multiple partitions, need to do look up for each partition. for (int index = 0; index < metadata.partitions; index++) { lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index 19eaf49541c..0ea865bc927 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -96,6 +96,7 @@ import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TopicsTest extends MockedPulsarServiceBaseTest { @@ -166,10 +167,19 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { }).readValue(message); } - @Test - public void testProduceToPartitionedTopic() throws Exception { + @DataProvider(name = "partitionNumbers") + public Object[][] partitionNumbers() { + return new Object[][] { + //produce to single-partitioned topic + {1}, + {5}, + }; + } + + @Test(dataProvider = "partitionNumbers") + public void testProduceToPartitionedTopic(int numPartitions) throws Exception { admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" + testNamespace - + "/" + testTopicName + "-p", 5); + + "/" + testTopicName + "-p", numPartitions); AsyncResponse asyncResponse = mock(AsyncResponse.class); Schema<String> schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); @@ -207,7 +217,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { } for (int index = 0; index < messagePerPartition.length; index++) { // We publish to each partition in round robin mode so each partition should get at most 2 message. - Assert.assertTrue(messagePerPartition[index] <= 2); + Assert.assertTrue(messagePerPartition[index] <= 10 / numPartitions); } }
