This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86311b191da [feat][client] PIP-475: V5 SDK consumes synthetic layouts
for regular topics (#25850)
86311b191da is described below
commit 86311b191da1df90e0d84d9f1b067bf52f04a133
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 21 16:29:18 2026 +0100
[feat][client] PIP-475: V5 SDK consumes synthetic layouts for regular
topics (#25850)
---
.../client/api/v5/V5RegularTopicInteropTest.java | 197 +++++++++++++++++++++
.../impl/v5/CheckpointConsumerBuilderV5.java | 2 +-
.../pulsar/client/impl/v5/ClientSegmentLayout.java | 6 +-
.../pulsar/client/impl/v5/DagWatchClient.java | 28 ++-
.../client/impl/v5/MultiTopicQueueConsumer.java | 2 +-
.../client/impl/v5/MultiTopicStreamConsumer.java | 2 +-
.../pulsar/client/impl/v5/ProducerBuilderV5.java | 2 +-
.../client/impl/v5/QueueConsumerBuilderV5.java | 2 +-
.../client/impl/v5/ScalableCheckpointConsumer.java | 4 +-
.../client/impl/v5/ScalableConsumerClient.java | 3 +-
.../client/impl/v5/ScalableQueueConsumer.java | 4 +-
.../client/impl/v5/ScalableStreamConsumer.java | 4 +-
.../client/impl/v5/ScalableTopicProducer.java | 12 +-
.../pulsar/client/impl/v5/SegmentRouter.java | 60 ++++++-
.../client/impl/v5/StreamConsumerBuilderV5.java | 2 +-
.../org/apache/pulsar/client/impl/v5/V5Utils.java | 33 ++--
.../pulsar/client/impl/v5/SegmentRouterTest.java | 63 ++++++-
.../apache/pulsar/client/impl/v5/V5UtilsTest.java | 55 +++---
.../org/apache/pulsar/client/impl/ClientCnx.java | 3 +-
.../apache/pulsar/client/impl/DagWatchSession.java | 7 +-
20 files changed, 430 insertions(+), 61 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5RegularTopicInteropTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5RegularTopicInteropTest.java
new file mode 100644
index 00000000000..c58f5e7d4c2
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5RegularTopicInteropTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end interop tests for V5 clients against regular (non-scalable)
topics.
+ *
+ * <p>PIP-475: when a V5 client looks up a {@code persistent://...} topic that
has not
+ * yet been migrated to a scalable topic, the broker returns a synthetic
layout that
+ * wraps the existing partitions as legacy segments. The V5 SDK must:
+ * <ul>
+ * <li>Route to legacy segments using v4 partitioned-topic mod-N routing so
per-key
+ * destinations match what a v4 producer would do.</li>
+ * <li>Attach per-segment v4 producers/consumers to the underlying
+ * {@code persistent://...-partition-K} URIs.</li>
+ * </ul>
+ * These tests verify the SDK side end-to-end against a real broker.
+ */
+public class V5RegularTopicInteropTest extends V5ClientBaseTest {
+
+ @Test
+ public void testV5ProducerToPartitionedRegularTopicRoutesV4Compatibly()
throws Exception {
+ // V5 producer writes to a 4-partition regular topic. The broker
returns a
+ // synthetic layout; the V5 router uses mod-N over segment_id, which
matches
+ // v4 partitioned-topic routing. Each v4 consumer (one per partition)
should
+ // see exactly the keys whose v4 routing puts them in its partition.
+ String regular = "persistent://" + getNamespace() + "/regular-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ admin.topics().createPartitionedTopic(regular, 4);
+
+ List<org.apache.pulsar.client.api.Consumer<String>> v4Consumers = new
ArrayList<>();
+ for (int k = 0; k < 4; k++) {
+ v4Consumers.add(track(pulsarClient
+ .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+ .topic(regular + "-partition-" + k)
+ .subscriptionName("v4-sub")
+ .subscriptionInitialPosition(
+
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
+ .subscribe()));
+ }
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(regular)
+ .create();
+
+ final int n = 64;
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().key("k-" + i).value("v-" + i).send();
+ }
+
+ Set<String> receivedValues = new HashSet<>();
+ for (int k = 0; k < 4; k++) {
+ org.apache.pulsar.client.api.Message<String> m;
+ while ((m = v4Consumers.get(k).receive(2,
java.util.concurrent.TimeUnit.SECONDS)) != null) {
+ int hash32 = Murmur3_32Hash.getInstance()
+ .makeHash(m.getKey().getBytes(StandardCharsets.UTF_8));
+ assertEquals(signSafeMod(hash32, 4), k,
+ "key=" + m.getKey() + " arrived in partition " + k
+ + " but v4 routing places it in " +
signSafeMod(hash32, 4));
+ receivedValues.add(m.getValue());
+ }
+ }
+ assertEquals(receivedValues.size(), n,
+ "every published message must be received exactly once across
all partitions");
+ }
+
+ @Test
+ public void testV5ProducerToNonPartitionedRegularTopicV4ConsumerReceives()
throws Exception {
+ // V5 producer writes to a non-partitioned regular topic. The broker
returns a
+ // synthetic layout with a single legacy segment wrapping the
persistent:// URI.
+ // A v4 consumer on the same URI should receive everything.
+ String regular = "persistent://" + getNamespace() + "/regular-np-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ admin.topics().createNonPartitionedTopic(regular);
+
+ @Cleanup
+ org.apache.pulsar.client.api.Consumer<String> v4Consumer = pulsarClient
+ .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+ .topic(regular)
+ .subscriptionName("v4-sub")
+ .subscriptionInitialPosition(
+
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(regular)
+ .create();
+
+ final int n = 16;
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ org.apache.pulsar.client.api.Message<String> m =
+ v4Consumer.receive(5,
java.util.concurrent.TimeUnit.SECONDS);
+ assertNotNull(m, "v4 consumer must receive every V5 send");
+ received.add(m.getValue());
+ }
+ assertEquals(received.size(), n);
+ // No straggler messages beyond the n sends.
+ assertNull(v4Consumer.receive(500,
java.util.concurrent.TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testV5QueueConsumerFromPartitionedRegularTopic() throws
Exception {
+ // v4 producer writes to a 4-partition regular topic; V5 queue consumer
+ // subscribes via the synthetic layout (one per-segment v4 consumer per
+ // partition) and must receive every published message.
+ String regular = "persistent://" + getNamespace() + "/regular-q-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ admin.topics().createPartitionedTopic(regular, 4);
+
+ @Cleanup
+ org.apache.pulsar.client.api.Producer<String> v4Producer = pulsarClient
+ .newProducer(org.apache.pulsar.client.api.Schema.STRING)
+ .topic(regular)
+ .create();
+
+ @Cleanup
+ QueueConsumer<String> v5Consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(regular)
+ .subscriptionName("v5-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ final int n = 64;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ v4Producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ org.apache.pulsar.client.api.v5.Message<String> m =
v5Consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(m, "V5 queue consumer must drain every v4 send");
+ received.add(m.value());
+ v5Consumer.acknowledge(m.id());
+ }
+ assertEquals(received, sent);
+ }
+
+ @Test
+ public void testV5ProducerBuilderRejectsNonPersistent() {
+ // The V5 builder must reject non-persistent:// inputs synchronously,
with a
+ // clear error rather than deferring to a broker-side failure.
+ String topic = "non-persistent://" + getNamespace() + "/regular-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ UnsupportedOperationException ex =
expectThrows(UnsupportedOperationException.class,
+ () ->
v5Client.newProducer(Schema.string()).topic(topic).create());
+ assertTrue(ex.getMessage().contains("non-persistent"),
ex.getMessage());
+ }
+
+ private static int signSafeMod(int dividend, int divisor) {
+ int mod = dividend % divisor;
+ return mod < 0 ? mod + divisor : mod;
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
index 119702e9497..ce66b6fb608 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
@@ -66,7 +66,7 @@ final class CheckpointConsumerBuilderV5<T> implements
CheckpointConsumerBuilder<
new
PulsarClientException.InvalidConfigurationException("Topic name is required"));
}
- TopicName topic = V5Utils.asScalableTopicName(topicName);
+ TopicName topic = V5Utils.parseScalableTopicInput(topicName);
if (consumerGroup != null && !consumerGroup.isEmpty()) {
// Managed: register with the broker's subscription coordinator
under the
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
index e4c043af6db..7da1ee33d64 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
@@ -79,7 +79,11 @@ final class ClientSegmentLayout {
HashRange range = HashRange.of((int) seg.getHashStart(), (int)
seg.getHashEnd());
String segTopicName = SegmentTopicName.fromParent(
parentTopic, range, seg.getSegmentId()).toString();
- ActiveSegment ref = new ActiveSegment(seg.getSegmentId(), range,
segTopicName);
+ // Legacy segments (synthetic-layout entries wrapping an existing,
externally
+ // managed persistent:// topic) carry that URI. Regular
controller-managed
+ // segments leave it null and attach to segTopicName instead.
+ String legacy = seg.hasLegacyTopicName() ?
seg.getLegacyTopicName() : null;
+ ActiveSegment ref = new ActiveSegment(seg.getSegmentId(), range,
segTopicName, legacy);
if (seg.getState() ==
org.apache.pulsar.common.api.proto.SegmentState.ACTIVE) {
activeSegments.add(ref);
} else if (seg.getState() ==
org.apache.pulsar.common.api.proto.SegmentState.SEALED) {
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index 2041dab9b57..7fc31f2ad7a 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -65,6 +65,9 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
private volatile LayoutChangeListener listener;
private volatile ClientCnx cnx;
private volatile boolean closed = false;
+ /** Canonical topic://t/n/x identity returned by the broker. Resolved on
the first
+ * update; used as the parent topic when computing segment:// URIs for
real DAGs. */
+ private volatile TopicName resolvedTopicName;
DagWatchClient(PulsarClientImpl v4Client, TopicName topicName) {
this.v4Client = v4Client;
@@ -138,12 +141,24 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
* This is invoked from the Netty I/O thread.
*/
@Override
- public void onUpdate(ScalableTopicDAG dag) {
+ public void onUpdate(ScalableTopicDAG dag, String resolvedTopicName) {
if (closed) {
return;
}
- ClientSegmentLayout newLayout = ClientSegmentLayout.fromProto(dag,
topicName);
+ // Cache the canonical topic://... identity returned by the broker so
segment://
+ // URIs are computed against the resolved parent regardless of the
input domain.
+ // The broker should always set this on success; fall back to the
input name if
+ // an older broker doesn't.
+ TopicName resolvedTn;
+ if (resolvedTopicName != null) {
+ resolvedTn = TopicName.get(resolvedTopicName);
+ this.resolvedTopicName = resolvedTn;
+ } else {
+ resolvedTn = this.resolvedTopicName != null ?
this.resolvedTopicName : topicName;
+ }
+
+ ClientSegmentLayout newLayout = ClientSegmentLayout.fromProto(dag,
resolvedTn);
ClientSegmentLayout oldLayout = currentLayout.getAndSet(newLayout);
log.info().attr("oldEpoch", oldLayout != null ? oldLayout.epoch() :
"none")
@@ -236,8 +251,15 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
return sessionId;
}
+ /**
+ * Returns the canonical scalable-topic identity, falling back to the
user's input
+ * before the first response arrives. Once the broker has returned a
+ * {@code resolved_topic_name}, this always reflects the resolved form so
callers
+ * see the canonical {@code topic://...} name regardless of how they
spelled the input.
+ */
TopicName topicName() {
- return topicName;
+ TopicName resolved = resolvedTopicName;
+ return resolved != null ? resolved : topicName;
}
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
index fed80479a42..e407e912f2e 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
@@ -159,7 +159,7 @@ final class MultiTopicQueueConsumer<T> implements
QueueConsumerImpl<T> {
if (perTopic.containsKey(topicName)) {
return CompletableFuture.completedFuture(null);
}
- TopicName topic = V5Utils.asScalableTopicName(topicName);
+ TopicName topic = V5Utils.parseScalableTopicInput(topicName);
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
// Per-topic message sink: tag each delivered message with the parent
scalable
// topic for ack routing + display, and forward into the shared mux.
No pump
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
index e49b755cca4..50990b18dd0 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
@@ -152,7 +152,7 @@ final class MultiTopicStreamConsumer<T> implements
StreamConsumer<T> {
if (perTopic.containsKey(topicName)) {
return CompletableFuture.completedFuture(null);
}
- TopicName topic = V5Utils.asScalableTopicName(topicName);
+ TopicName topic = V5Utils.parseScalableTopicInput(topicName);
// One ScalableConsumerClient session per topic, same as the
single-topic builder.
ScalableConsumerClient session = new ScalableConsumerClient(
client.v4Client(), topic,
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
index 922eb95885c..be9949fdb48 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
@@ -70,7 +70,7 @@ final class ProducerBuilderV5<T> implements
ProducerBuilder<T> {
new
PulsarClientException.InvalidConfigurationException("Topic name is required"));
}
- TopicName topicName = V5Utils.asScalableTopicName(topicStr);
+ TopicName topicName = V5Utils.parseScalableTopicInput(topicStr);
// Create DAG watch client and start the session
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(),
topicName);
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index d153a77b0f4..c1d8bdf261f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -91,7 +91,7 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
return MultiTopicQueueConsumer.createAsync(
client, v5Schema, conf, namespaceName, propertyFilters);
}
- TopicName topic = V5Utils.asScalableTopicName(topicName);
+ TopicName topic = V5Utils.parseScalableTopicInput(topicName);
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
return dagWatch.start()
.thenCompose(initialLayout ->
ScalableQueueConsumer.createAsync(
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 4bdefb216a7..8f8540af4ea 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -344,7 +344,9 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T> {
org.apache.pulsar.client.api.MessageId startMsgId =
resolveStartPosition(segment.segmentId());
var segConf = new
org.apache.pulsar.client.impl.conf.ReaderConfigurationData<T>();
- segConf.getTopicNames().add(segment.segmentTopicName());
+ // Legacy segments wrap an externally managed persistent:// topic;
regular ones use the
+ // computed segment:// URI. attachTopicName() collapses both into the
right URI.
+ segConf.getTopicNames().add(segment.attachTopicName());
segConf.setStartMessageId(startMsgId);
if (consumerName != null) {
segConf.setReaderName(consumerName + "-seg-" +
segment.segmentId());
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index 30b6d32872b..ad9e49ecb15 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -310,7 +310,8 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
segments.add(new ActiveSegment(
s.getSegmentId(),
HashRange.of((int) s.getHashStart(), (int) s.getHashEnd()),
- s.getSegmentTopic()));
+ s.getSegmentTopic(),
+ /*legacyTopicName*/ null));
}
return Collections.unmodifiableList(segments);
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 2952318936a..8eef5183ead 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -422,7 +422,9 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
var segConf = consumerConf.clone();
segConf.getTopicNames().clear();
segConf.setTopicsPattern(null);
- segConf.getTopicNames().add(segment.segmentTopicName());
+ // Legacy segments wrap an externally managed persistent:// topic;
regular ones use the
+ // computed segment:// URI. attachTopicName() collapses both into the
right URI.
+ segConf.getTopicNames().add(segment.attachTopicName());
segConf.setSubscriptionType(SubscriptionType.Shared);
if (consumerConf.getConsumerName() != null) {
segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index afd703ae9a4..c6e70553002 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -373,7 +373,9 @@ final class ScalableStreamConsumer<T>
var segConf = consumerConf.clone();
segConf.getTopicNames().clear();
segConf.setTopicsPattern(null);
- segConf.getTopicNames().add(segment.segmentTopicName());
+ // Legacy segments wrap an externally managed persistent:// topic;
regular ones use the
+ // computed segment:// URI. attachTopicName() collapses both into the
right URI.
+ segConf.getTopicNames().add(segment.attachTopicName());
segConf.setSubscriptionType(SubscriptionType.Exclusive);
if (consumerConf.getConsumerName() != null) {
segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 8d88490794e..70d0f7fd3ee 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -487,15 +487,17 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
private CompletableFuture<org.apache.pulsar.client.api.Producer<T>>
getOrCreateSegmentProducerAsync(
long segmentId) {
return segmentProducers.computeIfAbsent(segmentId, id -> {
- // Find the segment topic name
- String segmentTopicName = null;
+ // Find the segment and the URI to attach the per-segment v4
producer to.
+ // Regular segments use the computed segment:// URI; legacy
segments (synthetic
+ // layouts wrapping an externally managed persistent:// topic) use
that URI directly.
+ String attachTopicName = null;
for (var seg : activeSegments) {
if (seg.segmentId() == id) {
- segmentTopicName = seg.segmentTopicName();
+ attachTopicName = seg.attachTopicName();
break;
}
}
- if (segmentTopicName == null) {
+ if (attachTopicName == null) {
return CompletableFuture.failedFuture(
new PulsarClientException("Segment " + id + " not
found in active segments"));
}
@@ -506,7 +508,7 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
// initialSequenceId, accessMode, properties, ...) and not just
the few
// fields explicitly carried over.
var segConf = producerConf.clone();
- segConf.setTopicName(segmentTopicName);
+ segConf.setTopicName(attachTopicName);
if (producerConf.getProducerName() != null
&& !producerConf.getProducerName().isEmpty()) {
segConf.setProducerName(producerConf.getProducerName() +
"-seg-" + id);
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
index efa206f701a..1cf9cdf4abc 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
@@ -35,6 +35,11 @@ final class SegmentRouter {
/**
* Route a message key to the segment that owns its hash range.
*
+ * <p>If every active segment is a legacy segment (synthetic layout for a
not-yet-migrated
+ * regular topic), the routing switches to {@code
signSafeMod(murmurHash3_32(key), N)} over
+ * {@code segment_id} so V5 producers route the same way v4
partitioned-topic producers do
+ * — preserving per-key destinations while clients gradually upgrade.
+ *
* @param key the message key
* @param activeSegments the currently active segments (sorted by hash
range)
* @return the segment ID to route to
@@ -44,6 +49,9 @@ final class SegmentRouter {
if (activeSegments.isEmpty()) {
throw new IllegalStateException("No active segments");
}
+ if (allLegacy(activeSegments)) {
+ return routeModN(key, activeSegments);
+ }
int hash = hash(key);
for (var segment : activeSegments) {
if (segment.hashRange().contains(hash)) {
@@ -64,6 +72,39 @@ final class SegmentRouter {
return activeSegments.get(idx).segmentId();
}
+ /** True iff every active segment is a legacy segment — signals a
synthetic-layout topic. */
+ private static boolean allLegacy(List<ActiveSegment> activeSegments) {
+ for (var s : activeSegments) {
+ if (!s.isLegacy()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Mod-N routing over {@code segment_id}, matching v4 partitioned-topic
routing
+ * ({@code signSafeMod(murmurHash3_32(key), N)}).
+ */
+ private static long routeModN(String key, List<ActiveSegment>
activeSegments) {
+ int hash32 = org.apache.pulsar.common.util.Murmur3_32Hash.getInstance()
+
.makeHash(key.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ int n = activeSegments.size();
+ int partition = signSafeMod(hash32, n);
+ for (var segment : activeSegments) {
+ if (segment.segmentId() == partition) {
+ return segment.segmentId();
+ }
+ }
+ throw new IllegalStateException(
+ "Synthetic layout missing segment_id=" + partition + " (N=" +
n + ")");
+ }
+
+ private static int signSafeMod(int dividend, int divisor) {
+ int mod = dividend % divisor;
+ return mod < 0 ? mod + divisor : mod;
+ }
+
/**
* Compute the 16-bit hash for a key using MurmurHash3.
*/
@@ -81,7 +122,24 @@ final class SegmentRouter {
/**
* Represents an active segment with its hash range and ID.
+ *
+ * <p>{@code legacyTopicName} is non-null for <i>legacy segments</i> —
entries in a
+ * synthetic layout that wrap an existing, externally managed {@code
persistent://...}
+ * topic (e.g. one partition of a not-yet-migrated regular topic). The
per-segment v4
+ * producer/consumer attaches to {@link #attachTopicName()}, which returns
+ * {@code legacyTopicName} for legacy segments and the computed
+ * {@code segmentTopicName} otherwise.
*/
- record ActiveSegment(long segmentId, HashRange hashRange, String
segmentTopicName) {
+ record ActiveSegment(long segmentId, HashRange hashRange, String
segmentTopicName,
+ String legacyTopicName) {
+
+ boolean isLegacy() {
+ return legacyTopicName != null && !legacyTopicName.isEmpty();
+ }
+
+ /** The topic URI the per-segment v4 producer/consumer should attach
to. */
+ String attachTopicName() {
+ return isLegacy() ? legacyTopicName : segmentTopicName;
+ }
}
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index d18b0351f00..13ab896800c 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -87,7 +87,7 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
client, v5Schema, conf, namespaceName, propertyFilters);
}
- TopicName topic = V5Utils.asScalableTopicName(topicName);
+ TopicName topic = V5Utils.parseScalableTopicInput(topicName);
ScalableConsumerClient session = new ScalableConsumerClient(
client.v4Client(),
topic,
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
index f4c8ae38235..2031f83b2cb 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
@@ -30,21 +30,32 @@ final class V5Utils {
}
/**
- * Parse a topic string and ensure it uses the {@code topic://} domain.
+ * Parse a topic string passed to a V5 builder and validate it for the
scalable-topic
+ * lookup session.
*
- * <p>V5 scalable topics always use the {@code topic://} domain. If the
user passes
- * a bare name (e.g. {@code "my-topic"} or {@code "tenant/ns/my-topic"}),
the standard
- * {@link TopicName#get(String)} would default to {@code persistent://}.
This method
- * re-wraps the parsed name with the correct domain.
+ * <p>Accepts three input forms, preserving the domain so the broker can
decide
+ * whether the lookup resolves to a real DAG (natively scalable topic) or
a synthetic
+ * layout (regular topic that has not yet been migrated):
+ * <ul>
+ * <li>{@code topic://tenant/ns/x} — explicitly scalable.</li>
+ * <li>{@code persistent://tenant/ns/x} — regular topic; if it has been
migrated the
+ * broker promotes it to its {@code topic://} identity and returns
the real DAG,
+ * otherwise it returns a synthetic layout that wraps the existing
partitions.</li>
+ * <li>Short forms ({@code my-topic} or {@code tenant/ns/my-topic}) —
normalised by
+ * {@link TopicName#get(String)} to {@code
persistent://public/default/...} or
+ * {@code persistent://tenant/ns/...}; treated like any other
persistent input.</li>
+ * </ul>
*
- * <p>If the input already has the {@code topic://} scheme it is returned
as-is.
+ * <p>Rejects {@code non-persistent://} with {@link
UnsupportedOperationException}:
+ * scalable topics are always backed by managed ledgers, and the V5 SDK
has no path
+ * to a non-persistent topic.
*/
- static TopicName asScalableTopicName(String topic) {
+ static TopicName parseScalableTopicInput(String topic) {
TopicName tn = TopicName.get(topic);
- if (tn.getDomain() == TopicDomain.topic) {
- return tn;
+ if (tn.getDomain() == TopicDomain.non_persistent) {
+ throw new UnsupportedOperationException(
+ "V5 does not support non-persistent:// topics: " + topic);
}
- return TopicName.get(TopicDomain.topic.value(),
- tn.getNamespaceObject(), tn.getLocalName());
+ return tn;
}
}
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
index 107499c5e0f..d0f7db03185 100644
---
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
@@ -31,7 +31,12 @@ import org.testng.annotations.Test;
public class SegmentRouterTest {
private static ActiveSegment seg(long id, int start, int end) {
- return new ActiveSegment(id, HashRange.of(start, end),
"persistent://t/n/seg-" + id);
+ return new ActiveSegment(id, HashRange.of(start, end),
"persistent://t/n/seg-" + id, null);
+ }
+
+ /** Build a legacy segment (synthetic-layout entry wrapping an externally
managed persistent:// topic). */
+ private static ActiveSegment legacySeg(long id, int start, int end, String
underlying) {
+ return new ActiveSegment(id, HashRange.of(start, end),
"segment://t/n/x/" + id, underlying);
}
// --- route(key, ...) ---
@@ -123,6 +128,62 @@ public class SegmentRouterTest {
assertThrows(IllegalStateException.class, () ->
router.routeRoundRobin(List.of()));
}
+ // --- mod-N routing for synthetic layouts (all legacy segments) ---
+
+ @Test
+ public void testAllLegacySegmentsRouteModN() {
+ // Synthetic layout for a 4-partition regular topic: 4 legacy segments
+ // with segment_id == partition_index. routing must match v4
partitioned-topic
+ // routing (signSafeMod(murmurHash3_32(key), N)).
+ SegmentRouter router = new SegmentRouter();
+ int n = 4;
+ List<ActiveSegment> segments = List.of(
+ legacySeg(0, 0x0000, 0x3FFF, "persistent://t/n/x-partition-0"),
+ legacySeg(1, 0x4000, 0x7FFF, "persistent://t/n/x-partition-1"),
+ legacySeg(2, 0x8000, 0xBFFF, "persistent://t/n/x-partition-2"),
+ legacySeg(3, 0xC000, 0xFFFF,
"persistent://t/n/x-partition-3"));
+
+ // For a synthetic layout, the router must NOT use hash ranges — it
must do
+ // mod-N over segment_id. Verify a handful of keys land on the expected
+ // partition computed exactly as v4 would.
+ for (String key : new String[]{"a", "customer-1", "customer-2",
"order-99", ""}) {
+ int hash32 =
org.apache.pulsar.common.util.Murmur3_32Hash.getInstance()
+
.makeHash(key.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ int mod = hash32 % n;
+ int expected = mod < 0 ? mod + n : mod;
+ assertEquals(router.route(key, segments), expected,
+ "key=" + key + " expected v4 mod-N partition " + expected);
+ }
+ }
+
+ @Test
+ public void testMixedLegacyAndRegularStillUsesHashRouting() {
+ // After migration, the DAG has sealed legacy parents + active
range-based children.
+ // Routing on the active set is range-based — the all-legacy
special-case only
+ // triggers when *every* active segment is a legacy segment.
+ SegmentRouter router = new SegmentRouter();
+ List<ActiveSegment> mixed = List.of(
+ legacySeg(0, 0x0000, 0x7FFF, "persistent://t/n/x-partition-0"),
+ seg(1, 0x8000, 0xFFFF));
+
+ // pick a key whose hash falls in the range owned by the *regular*
segment;
+ // it must land there, not on segment_id=mod.
+ String regularKey = findKeyInRange(0x8000, 0xFFFF);
+ assertEquals(router.route(regularKey, mixed), 1L);
+ }
+
+ @Test
+ public void testAllLegacyRoutingIsDeterministic() {
+ SegmentRouter router = new SegmentRouter();
+ List<ActiveSegment> segments = List.of(
+ legacySeg(0, 0x0000, 0x7FFF, "persistent://t/n/x-partition-0"),
+ legacySeg(1, 0x8000, 0xFFFF,
"persistent://t/n/x-partition-1"));
+ long first = router.route("stable", segments);
+ for (int i = 0; i < 20; i++) {
+ assertEquals(router.route("stable", segments), first);
+ }
+ }
+
// --- hash ---
@Test
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
index b1374896202..f8422564a5c 100644
---
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.impl.v5;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertSame;
+import static org.testng.Assert.expectThrows;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.Test;
@@ -27,8 +27,9 @@ import org.testng.annotations.Test;
public class V5UtilsTest {
@Test
- public void testAsScalableTopicNameKeepsExistingTopicDomain() {
- TopicName tn =
V5Utils.asScalableTopicName("topic://tenant/ns/my-topic");
+ public void testParseTopicInputAcceptsTopicDomain() {
+ // topic://... is the canonical scalable form — passed through
unchanged.
+ TopicName tn =
V5Utils.parseScalableTopicInput("topic://tenant/ns/my-topic");
assertEquals(tn.getDomain(), TopicDomain.topic);
assertEquals(tn.getTenant(), "tenant");
assertEquals(tn.getNamespacePortion(), "ns");
@@ -36,43 +37,43 @@ public class V5UtilsTest {
}
@Test
- public void
testAsScalableTopicNameReturnsSameInstanceWhenAlreadyTopicDomain() {
- // asScalableTopicName returns the parsed TopicName as-is when it
already has the
- // topic:// domain. TopicName.get caches by fully-qualified name, so
the same string
- // should yield an identical cached instance on the second call too.
- TopicName first =
V5Utils.asScalableTopicName("topic://tenant/ns/my-topic");
- TopicName second =
V5Utils.asScalableTopicName("topic://tenant/ns/my-topic");
- assertSame(first, second);
+ public void testParseTopicInputAcceptsPersistentDomain() {
+ // persistent://... is preserved — the broker decides whether the
lookup
+ // resolves to a real DAG (migrated topic) or a synthetic layout
(regular topic).
+ TopicName tn =
V5Utils.parseScalableTopicInput("persistent://tenant/ns/my-topic");
+ assertEquals(tn.getDomain(), TopicDomain.persistent);
+ assertEquals(tn.getTenant(), "tenant");
+ assertEquals(tn.getNamespacePortion(), "ns");
+ assertEquals(tn.getLocalName(), "my-topic");
}
@Test
- public void testAsScalableTopicNameRewrapsBareLocalName() {
- // A bare name parses as persistent:// by default; V5 must rewrap it
as topic://.
- TopicName tn = V5Utils.asScalableTopicName("my-topic");
- assertEquals(tn.getDomain(), TopicDomain.topic);
- assertEquals(tn.getLocalName(), "my-topic");
- // default namespace is public/default
+ public void testParseTopicInputNormalisesBareNameToPersistentDefault() {
+ // A bare local name normalises to persistent://public/default/... —
same as v4.
+ TopicName tn = V5Utils.parseScalableTopicInput("my-topic");
+ assertEquals(tn.getDomain(), TopicDomain.persistent);
assertEquals(tn.getTenant(), "public");
assertEquals(tn.getNamespacePortion(), "default");
+ assertEquals(tn.getLocalName(), "my-topic");
}
@Test
- public void testAsScalableTopicNameRewrapsFullyQualifiedNonTopicDomain() {
- // A persistent://... name must be rewrapped as topic://...
- TopicName tn =
V5Utils.asScalableTopicName("persistent://tenant/ns/my-topic");
- assertEquals(tn.getDomain(), TopicDomain.topic);
+ public void testParseTopicInputNormalisesShortFormToPersistent() {
+ // tenant/ns/my-topic (no scheme) normalises to
persistent://tenant/ns/my-topic.
+ TopicName tn = V5Utils.parseScalableTopicInput("tenant/ns/my-topic");
+ assertEquals(tn.getDomain(), TopicDomain.persistent);
assertEquals(tn.getTenant(), "tenant");
assertEquals(tn.getNamespacePortion(), "ns");
assertEquals(tn.getLocalName(), "my-topic");
}
@Test
- public void testAsScalableTopicNameRewrapsShortForm() {
- // tenant/ns/my-topic (no scheme) is parsed with the default
persistent:// prefix.
- TopicName tn = V5Utils.asScalableTopicName("tenant/ns/my-topic");
- assertEquals(tn.getDomain(), TopicDomain.topic);
- assertEquals(tn.getTenant(), "tenant");
- assertEquals(tn.getNamespacePortion(), "ns");
- assertEquals(tn.getLocalName(), "my-topic");
+ public void testParseTopicInputRejectsNonPersistent() {
+ // Scalable topics are always backed by managed ledgers —
non-persistent must be
+ // rejected at the SDK builder boundary, not deferred to a broker
error.
+ UnsupportedOperationException ex =
expectThrows(UnsupportedOperationException.class, () ->
+
V5Utils.parseScalableTopicInput("non-persistent://tenant/ns/my-topic"));
+ assertEquals(ex.getMessage(),
+ "V5 does not support non-persistent:// topics:
non-persistent://tenant/ns/my-topic");
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 8b5623b4bb7..97430e72a22 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1367,7 +1367,8 @@ public class ClientCnx extends PulsarHandler {
DagWatchSession session = dagWatchSessions.get(sessionId);
if (session != null) {
- session.onUpdate(cmd.getDag());
+ String resolvedTopicName = cmd.hasResolvedTopicName() ?
cmd.getResolvedTopicName() : null;
+ session.onUpdate(cmd.getDag(), resolvedTopicName);
} else {
log.warn().attr("sessionId", sessionId)
.log("Received scalable topic update for unknown session");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
index f48681ff6d0..fe25bf166a5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
@@ -29,8 +29,13 @@ public interface DagWatchSession {
/**
* Called when the broker sends a DAG update for this session.
+ *
+ * @param dag the new DAG layout.
+ * @param resolvedTopicName the canonical {@code topic://t/n/x} identity
of the topic
+ * this session is watching, as resolved by the
broker.
+ * Set on every successful response regardless of
input form.
*/
- void onUpdate(ScalableTopicDAG dag);
+ void onUpdate(ScalableTopicDAG dag, String resolvedTopicName);
/**
* Called when the broker sends an error for this session.