This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e59d0e164d [fix][client] Make auto partitions update work for old
brokers without PIP-344 (#24822)
5e59d0e164d is described below
commit 5e59d0e164d5c255021302394d9ad70583fa565a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Oct 8 17:56:40 2025 +0800
[fix][client] Make auto partitions update work for old brokers without
PIP-344 (#24822)
---
pip/pip-344.md | 7 ++-
.../apache/pulsar/client/impl/ClientCnxTest.java | 2 +-
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../backwardscompatibility/ClientTest25.java | 54 ++++++++++++++++++++++
4 files changed, 61 insertions(+), 4 deletions(-)
diff --git a/pip/pip-344.md b/pip/pip-344.md
index 5eafc6fd5c2..120a8e1ef0d 100644
--- a/pip/pip-344.md
+++ b/pip/pip-344.md
@@ -122,6 +122,9 @@ message FeatureFlags {
# Backward & Forward Compatibility
-- Old version client and New version Broker: The client will call the old API.
+Old version (`< 3.0.6`) client and New version (`>= 3.0.6`) Broker: The client
will call the old API.
-- New version client and Old version Broker: The feature flag
`supports_binary_api_get_partitioned_meta_with_param_created_false` will be
`false`. The client will get a not-support error if the param
`createIfAutoCreationEnabled` is false.
+New version client and Old version Broker: The feature flag
`supports_binary_api_get_partitioned_meta_with_param_created_false` will be
`false`. The client will get a not-support error if the param
`createIfAutoCreationEnabled` is false in the following cases:
+- The topic is a DLQ topic
+- The topic is non-persistent
+- The topic is in geo-replication that the local cluster is new version and
the remote cluster is old version
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 3b30f0011e5..6e9bc4595a0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -260,7 +260,7 @@ public class ClientCnxTest extends
MockedPulsarServiceBaseTest {
field.set(clientCnxFuture.get(), false);
}
try {
- clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
+ clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false,
false).join();
Assert.fail("Expected an error that the broker version is too
old.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("without auto-creation
is not supported by the broker"));
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6ffdfa55a9b..5f5239131a8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1291,7 +1291,7 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic,
boolean metadataAutoCreationEnabled) {
- return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled,
false).thenApply(metadata -> {
+ return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled,
true).thenApply(metadata -> {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
index 2e153b4ec7f..9fcbf667659 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java
@@ -19,8 +19,19 @@
package org.apache.pulsar.tests.integration.backwardscompatibility;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class ClientTest25 extends PulsarStandaloneTestSuite25 {
@@ -34,4 +45,47 @@ public class ClientTest25 extends
PulsarStandaloneTestSuite25 {
clientTestBase.resetCursorCompatibility(serviceUrl.get(),
httpServiceUrl.get(), topicName);
}
+ @Test(timeOut = 20000)
+ public void testAutoPartitionsUpdate() throws Exception {
+ @Cleanup final var pulsarClient = PulsarClient.builder()
+ .serviceUrl(getContainer().getPlainTextServiceUrl())
+ .build();
+ final var topic = "test-auto-part-update";
+ final var topic2 = "dummy-topic";
+ @Cleanup final var admin =
PulsarAdmin.builder().serviceHttpUrl(getContainer().getHttpServiceUrl()).build();
+ // Use 2 as the initial partition number because old version broker
cannot update partitions on a topic that
+ // has only 1 partition.
+ admin.topics().createPartitionedTopic(topic, 2);
+ admin.topics().createPartitionedTopic(topic2, 2);
+ @Cleanup final var producer =
pulsarClient.newProducer().autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return metadata.numPartitions() - 1;
+ }
+ })
+ .topic(topic)
+ .create();
+ @Cleanup final var consumer =
pulsarClient.newConsumer().autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .topic(topic).subscriptionName("sub")
+ .subscribe();
+ @Cleanup final var multiTopicsConsumer =
pulsarClient.newConsumer().autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .topics(List.of(topic,
topic2)).subscriptionName("sub-2").subscribe();
+
+ admin.topics().updatePartitionedTopic(topic, 3);
+ Thread.sleep(1500);
+ final var msgId = (MessageIdAdv) producer.send("msg".getBytes());
+ Assert.assertEquals(msgId.getPartitionIndex(), 2);
+
+ final var msg = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(((MessageIdAdv)
msg.getMessageId()).getPartitionIndex(), 2);
+ final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg2);
+ Assert.assertEquals(msg2.getMessageId(), msg.getMessageId());
+ }
}