This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 57e48d14effb18d170a3336fda1419fd0f77e619
Author: Joforde <[email protected]>
AuthorDate: Fri Aug 1 11:58:15 2025 +0800

    [fix][broker] Fix REST API to produce messages to single-partitioned topics 
(#24450)
    
    (cherry picked from commit fa28d1c1ef1611bf164d036628b3a5a6dc52adfc)
---
 .../java/org/apache/pulsar/broker/rest/TopicsBase.java |  2 +-
 .../org/apache/pulsar/broker/admin/TopicsTest.java     | 18 ++++++++++++++----
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 95c5f2a590b..47067c55005 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -345,7 +345,7 @@ public class TopicsBase extends PersistentTopicsBase {
         List<String> redirectAddresses = Collections.synchronizedList(new 
ArrayList<>());
         CompletableFuture<Boolean> future = new CompletableFuture<>();
         List<CompletableFuture<Void>> lookupFutures = new ArrayList<>();
-        if (!topicName.isPartitioned() && metadata.partitions > 1) {
+        if (!topicName.isPartitioned() && metadata.partitions > 0) {
             // Partitioned topic with multiple partitions, need to do look up 
for each partition.
             for (int index = 0; index < metadata.partitions; index++) {
                 
lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 5208032f4ab..72426110224 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -96,6 +96,7 @@ import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TopicsTest extends MockedPulsarServiceBaseTest {
@@ -168,10 +169,19 @@ public class TopicsTest extends 
MockedPulsarServiceBaseTest {
                 }).readValue(message);
     }
 
-    @Test
-    public void testProduceToPartitionedTopic() throws Exception {
+    @DataProvider(name = "partitionNumbers")
+    public Object[][] partitionNumbers() {
+        return new Object[][] {
+            //produce to single-partitioned topic
+            {1},
+            {5},
+        };
+    }
+
+    @Test(dataProvider = "partitionNumbers")
+    public void testProduceToPartitionedTopic(int numPartitions) throws 
Exception {
         admin.topics().createPartitionedTopic("persistent://" + testTenant + 
"/" + testNamespace
-                + "/" + testTopicName + "-p", 5);
+                + "/" + testTopicName + "-p", numPartitions);
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
         Schema<String> schema = StringSchema.utf8();
         ProducerMessages producerMessages = new ProducerMessages();
@@ -210,7 +220,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest 
{
         }
         for (int index = 0; index < messagePerPartition.length; index++) {
             // We publish to each partition in round robin mode so each 
partition should get at most 2 message.
-            Assert.assertTrue(messagePerPartition[index] <= 2);
+            Assert.assertTrue(messagePerPartition[index] <= 10 / 
numPartitions);
         }
     }
 

Reply via email to