This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86311b191da [feat][client] PIP-475: V5 SDK consumes synthetic layouts 
for regular topics (#25850)
86311b191da is described below

commit 86311b191da1df90e0d84d9f1b067bf52f04a133
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 21 16:29:18 2026 +0100

    [feat][client] PIP-475: V5 SDK consumes synthetic layouts for regular 
topics (#25850)
---
 .../client/api/v5/V5RegularTopicInteropTest.java   | 197 +++++++++++++++++++++
 .../impl/v5/CheckpointConsumerBuilderV5.java       |   2 +-
 .../pulsar/client/impl/v5/ClientSegmentLayout.java |   6 +-
 .../pulsar/client/impl/v5/DagWatchClient.java      |  28 ++-
 .../client/impl/v5/MultiTopicQueueConsumer.java    |   2 +-
 .../client/impl/v5/MultiTopicStreamConsumer.java   |   2 +-
 .../pulsar/client/impl/v5/ProducerBuilderV5.java   |   2 +-
 .../client/impl/v5/QueueConsumerBuilderV5.java     |   2 +-
 .../client/impl/v5/ScalableCheckpointConsumer.java |   4 +-
 .../client/impl/v5/ScalableConsumerClient.java     |   3 +-
 .../client/impl/v5/ScalableQueueConsumer.java      |   4 +-
 .../client/impl/v5/ScalableStreamConsumer.java     |   4 +-
 .../client/impl/v5/ScalableTopicProducer.java      |  12 +-
 .../pulsar/client/impl/v5/SegmentRouter.java       |  60 ++++++-
 .../client/impl/v5/StreamConsumerBuilderV5.java    |   2 +-
 .../org/apache/pulsar/client/impl/v5/V5Utils.java  |  33 ++--
 .../pulsar/client/impl/v5/SegmentRouterTest.java   |  63 ++++++-
 .../apache/pulsar/client/impl/v5/V5UtilsTest.java  |  55 +++---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   3 +-
 .../apache/pulsar/client/impl/DagWatchSession.java |   7 +-
 20 files changed, 430 insertions(+), 61 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5RegularTopicInteropTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5RegularTopicInteropTest.java
new file mode 100644
index 00000000000..c58f5e7d4c2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5RegularTopicInteropTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end interop tests for V5 clients against regular (non-scalable) 
topics.
+ *
+ * <p>PIP-475: when a V5 client looks up a {@code persistent://...} topic that 
has not
+ * yet been migrated to a scalable topic, the broker returns a synthetic 
layout that
+ * wraps the existing partitions as legacy segments. The V5 SDK must:
+ * <ul>
+ *   <li>Route to legacy segments using v4 partitioned-topic mod-N routing so 
per-key
+ *       destinations match what a v4 producer would do.</li>
+ *   <li>Attach per-segment v4 producers/consumers to the underlying
+ *       {@code persistent://...-partition-K} URIs.</li>
+ * </ul>
+ * These tests verify the SDK side end-to-end against a real broker.
+ */
+public class V5RegularTopicInteropTest extends V5ClientBaseTest {
+
+    @Test
+    public void testV5ProducerToPartitionedRegularTopicRoutesV4Compatibly() 
throws Exception {
+        // V5 producer writes to a 4-partition regular topic. The broker 
returns a
+        // synthetic layout; the V5 router uses mod-N over segment_id, which 
matches
+        // v4 partitioned-topic routing. Each v4 consumer (one per partition) 
should
+        // see exactly the keys whose v4 routing puts them in its partition.
+        String regular = "persistent://" + getNamespace() + "/regular-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        admin.topics().createPartitionedTopic(regular, 4);
+
+        List<org.apache.pulsar.client.api.Consumer<String>> v4Consumers = new 
ArrayList<>();
+        for (int k = 0; k < 4; k++) {
+            v4Consumers.add(track(pulsarClient
+                    .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+                    .topic(regular + "-partition-" + k)
+                    .subscriptionName("v4-sub")
+                    .subscriptionInitialPosition(
+                            
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
+                    .subscribe()));
+        }
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(regular)
+                .create();
+
+        final int n = 64;
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().key("k-" + i).value("v-" + i).send();
+        }
+
+        Set<String> receivedValues = new HashSet<>();
+        for (int k = 0; k < 4; k++) {
+            org.apache.pulsar.client.api.Message<String> m;
+            while ((m = v4Consumers.get(k).receive(2, 
java.util.concurrent.TimeUnit.SECONDS)) != null) {
+                int hash32 = Murmur3_32Hash.getInstance()
+                        .makeHash(m.getKey().getBytes(StandardCharsets.UTF_8));
+                assertEquals(signSafeMod(hash32, 4), k,
+                        "key=" + m.getKey() + " arrived in partition " + k
+                                + " but v4 routing places it in " + 
signSafeMod(hash32, 4));
+                receivedValues.add(m.getValue());
+            }
+        }
+        assertEquals(receivedValues.size(), n,
+                "every published message must be received exactly once across 
all partitions");
+    }
+
+    @Test
+    public void testV5ProducerToNonPartitionedRegularTopicV4ConsumerReceives() 
throws Exception {
+        // V5 producer writes to a non-partitioned regular topic. The broker 
returns a
+        // synthetic layout with a single legacy segment wrapping the 
persistent:// URI.
+        // A v4 consumer on the same URI should receive everything.
+        String regular = "persistent://" + getNamespace() + "/regular-np-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        admin.topics().createNonPartitionedTopic(regular);
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<String> v4Consumer = pulsarClient
+                .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+                .topic(regular)
+                .subscriptionName("v4-sub")
+                .subscriptionInitialPosition(
+                        
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(regular)
+                .create();
+
+        final int n = 16;
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            org.apache.pulsar.client.api.Message<String> m =
+                    v4Consumer.receive(5, 
java.util.concurrent.TimeUnit.SECONDS);
+            assertNotNull(m, "v4 consumer must receive every V5 send");
+            received.add(m.getValue());
+        }
+        assertEquals(received.size(), n);
+        // No straggler messages beyond the n sends.
+        assertNull(v4Consumer.receive(500, 
java.util.concurrent.TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testV5QueueConsumerFromPartitionedRegularTopic() throws 
Exception {
+        // v4 producer writes to a 4-partition regular topic; V5 queue consumer
+        // subscribes via the synthetic layout (one per-segment v4 consumer per
+        // partition) and must receive every published message.
+        String regular = "persistent://" + getNamespace() + "/regular-q-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        admin.topics().createPartitionedTopic(regular, 4);
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<String> v4Producer = pulsarClient
+                .newProducer(org.apache.pulsar.client.api.Schema.STRING)
+                .topic(regular)
+                .create();
+
+        @Cleanup
+        QueueConsumer<String> v5Consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(regular)
+                .subscriptionName("v5-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        final int n = 64;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            v4Producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            org.apache.pulsar.client.api.v5.Message<String> m = 
v5Consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(m, "V5 queue consumer must drain every v4 send");
+            received.add(m.value());
+            v5Consumer.acknowledge(m.id());
+        }
+        assertEquals(received, sent);
+    }
+
+    @Test
+    public void testV5ProducerBuilderRejectsNonPersistent() {
+        // The V5 builder must reject non-persistent:// inputs synchronously, 
with a
+        // clear error rather than deferring to a broker-side failure.
+        String topic = "non-persistent://" + getNamespace() + "/regular-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        UnsupportedOperationException ex = 
expectThrows(UnsupportedOperationException.class,
+                () -> 
v5Client.newProducer(Schema.string()).topic(topic).create());
+        assertTrue(ex.getMessage().contains("non-persistent"), 
ex.getMessage());
+    }
+
+    private static int signSafeMod(int dividend, int divisor) {
+        int mod = dividend % divisor;
+        return mod < 0 ? mod + divisor : mod;
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
index 119702e9497..ce66b6fb608 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
@@ -66,7 +66,7 @@ final class CheckpointConsumerBuilderV5<T> implements 
CheckpointConsumerBuilder<
                     new 
PulsarClientException.InvalidConfigurationException("Topic name is required"));
         }
 
-        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        TopicName topic = V5Utils.parseScalableTopicInput(topicName);
 
         if (consumerGroup != null && !consumerGroup.isEmpty()) {
             // Managed: register with the broker's subscription coordinator 
under the
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
index e4c043af6db..7da1ee33d64 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
@@ -79,7 +79,11 @@ final class ClientSegmentLayout {
             HashRange range = HashRange.of((int) seg.getHashStart(), (int) 
seg.getHashEnd());
             String segTopicName = SegmentTopicName.fromParent(
                     parentTopic, range, seg.getSegmentId()).toString();
-            ActiveSegment ref = new ActiveSegment(seg.getSegmentId(), range, 
segTopicName);
+            // Legacy segments (synthetic-layout entries wrapping an existing, 
externally
+            // managed persistent:// topic) carry that URI. Regular 
controller-managed
+            // segments leave it null and attach to segTopicName instead.
+            String legacy = seg.hasLegacyTopicName() ? 
seg.getLegacyTopicName() : null;
+            ActiveSegment ref = new ActiveSegment(seg.getSegmentId(), range, 
segTopicName, legacy);
             if (seg.getState() == 
org.apache.pulsar.common.api.proto.SegmentState.ACTIVE) {
                 activeSegments.add(ref);
             } else if (seg.getState() == 
org.apache.pulsar.common.api.proto.SegmentState.SEALED) {
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index 2041dab9b57..7fc31f2ad7a 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -65,6 +65,9 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
     private volatile LayoutChangeListener listener;
     private volatile ClientCnx cnx;
     private volatile boolean closed = false;
+    /** Canonical topic://t/n/x identity returned by the broker. Resolved on 
the first
+     *  update; used as the parent topic when computing segment:// URIs for 
real DAGs. */
+    private volatile TopicName resolvedTopicName;
 
     DagWatchClient(PulsarClientImpl v4Client, TopicName topicName) {
         this.v4Client = v4Client;
@@ -138,12 +141,24 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
      * This is invoked from the Netty I/O thread.
      */
     @Override
-    public void onUpdate(ScalableTopicDAG dag) {
+    public void onUpdate(ScalableTopicDAG dag, String resolvedTopicName) {
         if (closed) {
             return;
         }
 
-        ClientSegmentLayout newLayout = ClientSegmentLayout.fromProto(dag, 
topicName);
+        // Cache the canonical topic://... identity returned by the broker so 
segment://
+        // URIs are computed against the resolved parent regardless of the 
input domain.
+        // The broker should always set this on success; fall back to the 
input name if
+        // an older broker doesn't.
+        TopicName resolvedTn;
+        if (resolvedTopicName != null) {
+            resolvedTn = TopicName.get(resolvedTopicName);
+            this.resolvedTopicName = resolvedTn;
+        } else {
+            resolvedTn = this.resolvedTopicName != null ? 
this.resolvedTopicName : topicName;
+        }
+
+        ClientSegmentLayout newLayout = ClientSegmentLayout.fromProto(dag, 
resolvedTn);
         ClientSegmentLayout oldLayout = currentLayout.getAndSet(newLayout);
 
         log.info().attr("oldEpoch", oldLayout != null ? oldLayout.epoch() : 
"none")
@@ -236,8 +251,15 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
         return sessionId;
     }
 
+    /**
+     * Returns the canonical scalable-topic identity, falling back to the 
user's input
+     * before the first response arrives. Once the broker has returned a
+     * {@code resolved_topic_name}, this always reflects the resolved form so 
callers
+     * see the canonical {@code topic://...} name regardless of how they 
spelled the input.
+     */
     TopicName topicName() {
-        return topicName;
+        TopicName resolved = resolvedTopicName;
+        return resolved != null ? resolved : topicName;
     }
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
index fed80479a42..e407e912f2e 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
@@ -159,7 +159,7 @@ final class MultiTopicQueueConsumer<T> implements 
QueueConsumerImpl<T> {
         if (perTopic.containsKey(topicName)) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        TopicName topic = V5Utils.parseScalableTopicInput(topicName);
         DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
         // Per-topic message sink: tag each delivered message with the parent 
scalable
         // topic for ack routing + display, and forward into the shared mux. 
No pump
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
index e49b755cca4..50990b18dd0 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
@@ -152,7 +152,7 @@ final class MultiTopicStreamConsumer<T> implements 
StreamConsumer<T> {
         if (perTopic.containsKey(topicName)) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        TopicName topic = V5Utils.parseScalableTopicInput(topicName);
         // One ScalableConsumerClient session per topic, same as the 
single-topic builder.
         ScalableConsumerClient session = new ScalableConsumerClient(
                 client.v4Client(), topic,
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
index 922eb95885c..be9949fdb48 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
@@ -70,7 +70,7 @@ final class ProducerBuilderV5<T> implements 
ProducerBuilder<T> {
                     new 
PulsarClientException.InvalidConfigurationException("Topic name is required"));
         }
 
-        TopicName topicName = V5Utils.asScalableTopicName(topicStr);
+        TopicName topicName = V5Utils.parseScalableTopicInput(topicStr);
 
         // Create DAG watch client and start the session
         DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), 
topicName);
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index d153a77b0f4..c1d8bdf261f 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -91,7 +91,7 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
             return MultiTopicQueueConsumer.createAsync(
                     client, v5Schema, conf, namespaceName, propertyFilters);
         }
-        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        TopicName topic = V5Utils.parseScalableTopicInput(topicName);
         DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
         return dagWatch.start()
                 .thenCompose(initialLayout -> 
ScalableQueueConsumer.createAsync(
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 4bdefb216a7..8f8540af4ea 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -344,7 +344,9 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T> {
         org.apache.pulsar.client.api.MessageId startMsgId = 
resolveStartPosition(segment.segmentId());
 
         var segConf = new 
org.apache.pulsar.client.impl.conf.ReaderConfigurationData<T>();
-        segConf.getTopicNames().add(segment.segmentTopicName());
+        // Legacy segments wrap an externally managed persistent:// topic; 
regular ones use the
+        // computed segment:// URI. attachTopicName() collapses both into the 
right URI.
+        segConf.getTopicNames().add(segment.attachTopicName());
         segConf.setStartMessageId(startMsgId);
         if (consumerName != null) {
             segConf.setReaderName(consumerName + "-seg-" + 
segment.segmentId());
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index 30b6d32872b..ad9e49ecb15 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -310,7 +310,8 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
             segments.add(new ActiveSegment(
                     s.getSegmentId(),
                     HashRange.of((int) s.getHashStart(), (int) s.getHashEnd()),
-                    s.getSegmentTopic()));
+                    s.getSegmentTopic(),
+                    /*legacyTopicName*/ null));
         }
         return Collections.unmodifiableList(segments);
     }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 2952318936a..8eef5183ead 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -422,7 +422,9 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
         var segConf = consumerConf.clone();
         segConf.getTopicNames().clear();
         segConf.setTopicsPattern(null);
-        segConf.getTopicNames().add(segment.segmentTopicName());
+        // Legacy segments wrap an externally managed persistent:// topic; 
regular ones use the
+        // computed segment:// URI. attachTopicName() collapses both into the 
right URI.
+        segConf.getTopicNames().add(segment.attachTopicName());
         segConf.setSubscriptionType(SubscriptionType.Shared);
         if (consumerConf.getConsumerName() != null) {
             segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index afd703ae9a4..c6e70553002 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -373,7 +373,9 @@ final class ScalableStreamConsumer<T>
         var segConf = consumerConf.clone();
         segConf.getTopicNames().clear();
         segConf.setTopicsPattern(null);
-        segConf.getTopicNames().add(segment.segmentTopicName());
+        // Legacy segments wrap an externally managed persistent:// topic; 
regular ones use the
+        // computed segment:// URI. attachTopicName() collapses both into the 
right URI.
+        segConf.getTopicNames().add(segment.attachTopicName());
         segConf.setSubscriptionType(SubscriptionType.Exclusive);
         if (consumerConf.getConsumerName() != null) {
             segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 8d88490794e..70d0f7fd3ee 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -487,15 +487,17 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
     private CompletableFuture<org.apache.pulsar.client.api.Producer<T>> 
getOrCreateSegmentProducerAsync(
             long segmentId) {
         return segmentProducers.computeIfAbsent(segmentId, id -> {
-            // Find the segment topic name
-            String segmentTopicName = null;
+            // Find the segment and the URI to attach the per-segment v4 
producer to.
+            // Regular segments use the computed segment:// URI; legacy 
segments (synthetic
+            // layouts wrapping an externally managed persistent:// topic) use 
that URI directly.
+            String attachTopicName = null;
             for (var seg : activeSegments) {
                 if (seg.segmentId() == id) {
-                    segmentTopicName = seg.segmentTopicName();
+                    attachTopicName = seg.attachTopicName();
                     break;
                 }
             }
-            if (segmentTopicName == null) {
+            if (attachTopicName == null) {
                 return CompletableFuture.failedFuture(
                         new PulsarClientException("Segment " + id + " not 
found in active segments"));
             }
@@ -506,7 +508,7 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             // initialSequenceId, accessMode, properties, ...) and not just 
the few
             // fields explicitly carried over.
             var segConf = producerConf.clone();
-            segConf.setTopicName(segmentTopicName);
+            segConf.setTopicName(attachTopicName);
             if (producerConf.getProducerName() != null
                     && !producerConf.getProducerName().isEmpty()) {
                 segConf.setProducerName(producerConf.getProducerName() + 
"-seg-" + id);
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
index efa206f701a..1cf9cdf4abc 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SegmentRouter.java
@@ -35,6 +35,11 @@ final class SegmentRouter {
     /**
      * Route a message key to the segment that owns its hash range.
      *
+     * <p>If every active segment is a legacy segment (synthetic layout for a 
not-yet-migrated
+     * regular topic), the routing switches to {@code 
signSafeMod(murmurHash3_32(key), N)} over
+     * {@code segment_id} so V5 producers route the same way v4 
partitioned-topic producers do
+     * — preserving per-key destinations while clients gradually upgrade.
+     *
      * @param key the message key
      * @param activeSegments the currently active segments (sorted by hash 
range)
      * @return the segment ID to route to
@@ -44,6 +49,9 @@ final class SegmentRouter {
         if (activeSegments.isEmpty()) {
             throw new IllegalStateException("No active segments");
         }
+        if (allLegacy(activeSegments)) {
+            return routeModN(key, activeSegments);
+        }
         int hash = hash(key);
         for (var segment : activeSegments) {
             if (segment.hashRange().contains(hash)) {
@@ -64,6 +72,39 @@ final class SegmentRouter {
         return activeSegments.get(idx).segmentId();
     }
 
+    /** True iff every active segment is a legacy segment — signals a 
synthetic-layout topic. */
+    private static boolean allLegacy(List<ActiveSegment> activeSegments) {
+        for (var s : activeSegments) {
+            if (!s.isLegacy()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Mod-N routing over {@code segment_id}, matching v4 partitioned-topic 
routing
+     * ({@code signSafeMod(murmurHash3_32(key), N)}).
+     */
+    private static long routeModN(String key, List<ActiveSegment> 
activeSegments) {
+        int hash32 = org.apache.pulsar.common.util.Murmur3_32Hash.getInstance()
+                
.makeHash(key.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+        int n = activeSegments.size();
+        int partition = signSafeMod(hash32, n);
+        for (var segment : activeSegments) {
+            if (segment.segmentId() == partition) {
+                return segment.segmentId();
+            }
+        }
+        throw new IllegalStateException(
+                "Synthetic layout missing segment_id=" + partition + " (N=" + 
n + ")");
+    }
+
+    private static int signSafeMod(int dividend, int divisor) {
+        int mod = dividend % divisor;
+        return mod < 0 ? mod + divisor : mod;
+    }
+
     /**
      * Compute the 16-bit hash for a key using MurmurHash3.
      */
@@ -81,7 +122,24 @@ final class SegmentRouter {
 
     /**
      * Represents an active segment with its hash range and ID.
+     *
+     * <p>{@code legacyTopicName} is non-null for <i>legacy segments</i> — 
entries in a
+     * synthetic layout that wrap an existing, externally managed {@code 
persistent://...}
+     * topic (e.g. one partition of a not-yet-migrated regular topic). The 
per-segment v4
+     * producer/consumer attaches to {@link #attachTopicName()}, which returns
+     * {@code legacyTopicName} for legacy segments and the computed
+     * {@code segmentTopicName} otherwise.
      */
-    record ActiveSegment(long segmentId, HashRange hashRange, String 
segmentTopicName) {
+    record ActiveSegment(long segmentId, HashRange hashRange, String 
segmentTopicName,
+                         String legacyTopicName) {
+
+        boolean isLegacy() {
+            return legacyTopicName != null && !legacyTopicName.isEmpty();
+        }
+
+        /** The topic URI the per-segment v4 producer/consumer should attach 
to. */
+        String attachTopicName() {
+            return isLegacy() ? legacyTopicName : segmentTopicName;
+        }
     }
 }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index d18b0351f00..13ab896800c 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -87,7 +87,7 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
                     client, v5Schema, conf, namespaceName, propertyFilters);
         }
 
-        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        TopicName topic = V5Utils.parseScalableTopicInput(topicName);
         ScalableConsumerClient session = new ScalableConsumerClient(
                 client.v4Client(),
                 topic,
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
index f4c8ae38235..2031f83b2cb 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5Utils.java
@@ -30,21 +30,32 @@ final class V5Utils {
     }
 
     /**
-     * Parse a topic string and ensure it uses the {@code topic://} domain.
+     * Parse a topic string passed to a V5 builder and validate it for the 
scalable-topic
+     * lookup session.
      *
-     * <p>V5 scalable topics always use the {@code topic://} domain. If the 
user passes
-     * a bare name (e.g. {@code "my-topic"} or {@code "tenant/ns/my-topic"}), 
the standard
-     * {@link TopicName#get(String)} would default to {@code persistent://}. 
This method
-     * re-wraps the parsed name with the correct domain.
+     * <p>Accepts three input forms, preserving the domain so the broker can 
decide
+     * whether the lookup resolves to a real DAG (natively scalable topic) or 
a synthetic
+     * layout (regular topic that has not yet been migrated):
+     * <ul>
+     *   <li>{@code topic://tenant/ns/x} — explicitly scalable.</li>
+     *   <li>{@code persistent://tenant/ns/x} — regular topic; if it has been 
migrated the
+     *       broker promotes it to its {@code topic://} identity and returns 
the real DAG,
+     *       otherwise it returns a synthetic layout that wraps the existing 
partitions.</li>
+     *   <li>Short forms ({@code my-topic} or {@code tenant/ns/my-topic}) — 
normalised by
+     *       {@link TopicName#get(String)} to {@code 
persistent://public/default/...} or
+     *       {@code persistent://tenant/ns/...}; treated like any other 
persistent input.</li>
+     * </ul>
      *
-     * <p>If the input already has the {@code topic://} scheme it is returned 
as-is.
+     * <p>Rejects {@code non-persistent://} with {@link 
UnsupportedOperationException}:
+     * scalable topics are always backed by managed ledgers, and the V5 SDK 
has no path
+     * to a non-persistent topic.
      */
-    static TopicName asScalableTopicName(String topic) {
+    static TopicName parseScalableTopicInput(String topic) {
         TopicName tn = TopicName.get(topic);
-        if (tn.getDomain() == TopicDomain.topic) {
-            return tn;
+        if (tn.getDomain() == TopicDomain.non_persistent) {
+            throw new UnsupportedOperationException(
+                    "V5 does not support non-persistent:// topics: " + topic);
         }
-        return TopicName.get(TopicDomain.topic.value(),
-                tn.getNamespaceObject(), tn.getLocalName());
+        return tn;
     }
 }
diff --git 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
index 107499c5e0f..d0f7db03185 100644
--- 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
+++ 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SegmentRouterTest.java
@@ -31,7 +31,12 @@ import org.testng.annotations.Test;
 public class SegmentRouterTest {
 
     private static ActiveSegment seg(long id, int start, int end) {
-        return new ActiveSegment(id, HashRange.of(start, end), 
"persistent://t/n/seg-" + id);
+        return new ActiveSegment(id, HashRange.of(start, end), 
"persistent://t/n/seg-" + id, null);
+    }
+
+    /** Build a legacy segment (synthetic-layout entry wrapping an externally 
managed persistent:// topic). */
+    private static ActiveSegment legacySeg(long id, int start, int end, String 
underlying) {
+        return new ActiveSegment(id, HashRange.of(start, end), 
"segment://t/n/x/" + id, underlying);
     }
 
     // --- route(key, ...) ---
@@ -123,6 +128,62 @@ public class SegmentRouterTest {
         assertThrows(IllegalStateException.class, () -> 
router.routeRoundRobin(List.of()));
     }
 
+    // --- mod-N routing for synthetic layouts (all legacy segments) ---
+
+    @Test
+    public void testAllLegacySegmentsRouteModN() {
+        // Synthetic layout for a 4-partition regular topic: 4 legacy segments
+        // with segment_id == partition_index. routing must match v4 
partitioned-topic
+        // routing (signSafeMod(murmurHash3_32(key), N)).
+        SegmentRouter router = new SegmentRouter();
+        int n = 4;
+        List<ActiveSegment> segments = List.of(
+                legacySeg(0, 0x0000, 0x3FFF, "persistent://t/n/x-partition-0"),
+                legacySeg(1, 0x4000, 0x7FFF, "persistent://t/n/x-partition-1"),
+                legacySeg(2, 0x8000, 0xBFFF, "persistent://t/n/x-partition-2"),
+                legacySeg(3, 0xC000, 0xFFFF, 
"persistent://t/n/x-partition-3"));
+
+        // For a synthetic layout, the router must NOT use hash ranges — it 
must do
+        // mod-N over segment_id. Verify a handful of keys land on the expected
+        // partition computed exactly as v4 would.
+        for (String key : new String[]{"a", "customer-1", "customer-2", 
"order-99", ""}) {
+            int hash32 = 
org.apache.pulsar.common.util.Murmur3_32Hash.getInstance()
+                    
.makeHash(key.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+            int mod = hash32 % n;
+            int expected = mod < 0 ? mod + n : mod;
+            assertEquals(router.route(key, segments), expected,
+                    "key=" + key + " expected v4 mod-N partition " + expected);
+        }
+    }
+
+    @Test
+    public void testMixedLegacyAndRegularStillUsesHashRouting() {
+        // After migration, the DAG has sealed legacy parents + active 
range-based children.
+        // Routing on the active set is range-based — the all-legacy 
special-case only
+        // triggers when *every* active segment is a legacy segment.
+        SegmentRouter router = new SegmentRouter();
+        List<ActiveSegment> mixed = List.of(
+                legacySeg(0, 0x0000, 0x7FFF, "persistent://t/n/x-partition-0"),
+                seg(1, 0x8000, 0xFFFF));
+
+        // pick a key whose hash falls in the range owned by the *regular* 
segment;
+        // it must land there, not on segment_id=mod.
+        String regularKey = findKeyInRange(0x8000, 0xFFFF);
+        assertEquals(router.route(regularKey, mixed), 1L);
+    }
+
+    @Test
+    public void testAllLegacyRoutingIsDeterministic() {
+        SegmentRouter router = new SegmentRouter();
+        List<ActiveSegment> segments = List.of(
+                legacySeg(0, 0x0000, 0x7FFF, "persistent://t/n/x-partition-0"),
+                legacySeg(1, 0x8000, 0xFFFF, 
"persistent://t/n/x-partition-1"));
+        long first = router.route("stable", segments);
+        for (int i = 0; i < 20; i++) {
+            assertEquals(router.route("stable", segments), first);
+        }
+    }
+
     // --- hash ---
 
     @Test
diff --git 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
index b1374896202..f8422564a5c 100644
--- 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
+++ 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/V5UtilsTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.client.impl.v5;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertSame;
+import static org.testng.Assert.expectThrows;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.Test;
@@ -27,8 +27,9 @@ import org.testng.annotations.Test;
 public class V5UtilsTest {
 
     @Test
-    public void testAsScalableTopicNameKeepsExistingTopicDomain() {
-        TopicName tn = 
V5Utils.asScalableTopicName("topic://tenant/ns/my-topic");
+    public void testParseTopicInputAcceptsTopicDomain() {
+        // topic://... is the canonical scalable form — passed through 
unchanged.
+        TopicName tn = 
V5Utils.parseScalableTopicInput("topic://tenant/ns/my-topic");
         assertEquals(tn.getDomain(), TopicDomain.topic);
         assertEquals(tn.getTenant(), "tenant");
         assertEquals(tn.getNamespacePortion(), "ns");
@@ -36,43 +37,43 @@ public class V5UtilsTest {
     }
 
     @Test
-    public void 
testAsScalableTopicNameReturnsSameInstanceWhenAlreadyTopicDomain() {
-        // asScalableTopicName returns the parsed TopicName as-is when it 
already has the
-        // topic:// domain. TopicName.get caches by fully-qualified name, so 
the same string
-        // should yield an identical cached instance on the second call too.
-        TopicName first = 
V5Utils.asScalableTopicName("topic://tenant/ns/my-topic");
-        TopicName second = 
V5Utils.asScalableTopicName("topic://tenant/ns/my-topic");
-        assertSame(first, second);
+    public void testParseTopicInputAcceptsPersistentDomain() {
+        // persistent://... is preserved — the broker decides whether the 
lookup
+        // resolves to a real DAG (migrated topic) or a synthetic layout 
(regular topic).
+        TopicName tn = 
V5Utils.parseScalableTopicInput("persistent://tenant/ns/my-topic");
+        assertEquals(tn.getDomain(), TopicDomain.persistent);
+        assertEquals(tn.getTenant(), "tenant");
+        assertEquals(tn.getNamespacePortion(), "ns");
+        assertEquals(tn.getLocalName(), "my-topic");
     }
 
     @Test
-    public void testAsScalableTopicNameRewrapsBareLocalName() {
-        // A bare name parses as persistent:// by default; V5 must rewrap it 
as topic://.
-        TopicName tn = V5Utils.asScalableTopicName("my-topic");
-        assertEquals(tn.getDomain(), TopicDomain.topic);
-        assertEquals(tn.getLocalName(), "my-topic");
-        // default namespace is public/default
+    public void testParseTopicInputNormalisesBareNameToPersistentDefault() {
+        // A bare local name normalises to persistent://public/default/... — 
same as v4.
+        TopicName tn = V5Utils.parseScalableTopicInput("my-topic");
+        assertEquals(tn.getDomain(), TopicDomain.persistent);
         assertEquals(tn.getTenant(), "public");
         assertEquals(tn.getNamespacePortion(), "default");
+        assertEquals(tn.getLocalName(), "my-topic");
     }
 
     @Test
-    public void testAsScalableTopicNameRewrapsFullyQualifiedNonTopicDomain() {
-        // A persistent://... name must be rewrapped as topic://...
-        TopicName tn = 
V5Utils.asScalableTopicName("persistent://tenant/ns/my-topic");
-        assertEquals(tn.getDomain(), TopicDomain.topic);
+    public void testParseTopicInputNormalisesShortFormToPersistent() {
+        // tenant/ns/my-topic (no scheme) normalises to 
persistent://tenant/ns/my-topic.
+        TopicName tn = V5Utils.parseScalableTopicInput("tenant/ns/my-topic");
+        assertEquals(tn.getDomain(), TopicDomain.persistent);
         assertEquals(tn.getTenant(), "tenant");
         assertEquals(tn.getNamespacePortion(), "ns");
         assertEquals(tn.getLocalName(), "my-topic");
     }
 
     @Test
-    public void testAsScalableTopicNameRewrapsShortForm() {
-        // tenant/ns/my-topic (no scheme) is parsed with the default 
persistent:// prefix.
-        TopicName tn = V5Utils.asScalableTopicName("tenant/ns/my-topic");
-        assertEquals(tn.getDomain(), TopicDomain.topic);
-        assertEquals(tn.getTenant(), "tenant");
-        assertEquals(tn.getNamespacePortion(), "ns");
-        assertEquals(tn.getLocalName(), "my-topic");
+    public void testParseTopicInputRejectsNonPersistent() {
+        // Scalable topics are always backed by managed ledgers — 
non-persistent must be
+        // rejected at the SDK builder boundary, not deferred to a broker 
error.
+        UnsupportedOperationException ex = 
expectThrows(UnsupportedOperationException.class, () ->
+                
V5Utils.parseScalableTopicInput("non-persistent://tenant/ns/my-topic"));
+        assertEquals(ex.getMessage(),
+                "V5 does not support non-persistent:// topics: 
non-persistent://tenant/ns/my-topic");
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 8b5623b4bb7..97430e72a22 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1367,7 +1367,8 @@ public class ClientCnx extends PulsarHandler {
 
         DagWatchSession session = dagWatchSessions.get(sessionId);
         if (session != null) {
-            session.onUpdate(cmd.getDag());
+            String resolvedTopicName = cmd.hasResolvedTopicName() ? 
cmd.getResolvedTopicName() : null;
+            session.onUpdate(cmd.getDag(), resolvedTopicName);
         } else {
             log.warn().attr("sessionId", sessionId)
                     .log("Received scalable topic update for unknown session");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
index f48681ff6d0..fe25bf166a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
@@ -29,8 +29,13 @@ public interface DagWatchSession {
 
     /**
      * Called when the broker sends a DAG update for this session.
+     *
+     * @param dag the new DAG layout.
+     * @param resolvedTopicName the canonical {@code topic://t/n/x} identity 
of the topic
+     *                          this session is watching, as resolved by the 
broker.
+     *                          Set on every successful response regardless of 
input form.
      */
-    void onUpdate(ScalableTopicDAG dag);
+    void onUpdate(ScalableTopicDAG dag, String resolvedTopicName);
 
     /**
      * Called when the broker sends an error for this session.


Reply via email to