This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 953bf34a26d [feat][broker] PIP-475: synthetic-layout lookup for 
regular topics (#25822)
953bf34a26d is described below

commit 953bf34a26d647b4640f3087cc27f54f090c878a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 21 02:43:24 2026 -0700

    [feat][broker] PIP-475: synthetic-layout lookup for regular topics (#25822)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |   9 ++
 .../broker/service/scalable/DagWatchSession.java   | 123 ++++++++++++++++++++-
 .../service/scalable/DagWatchSessionTest.java      | 121 +++++++++++++++++++-
 .../scalable/ScalableTopicControllerTest.java      |   6 +-
 .../org/apache/pulsar/common/naming/TopicName.java |  19 ++++
 .../apache/pulsar/common/protocol/Commands.java    |   8 +-
 .../apache/pulsar/common/scalable/SegmentInfo.java |  65 ++++++++---
 pulsar-common/src/main/proto/PulsarApi.proto       |  20 +++-
 .../apache/pulsar/common/naming/TopicNameTest.java |  22 ++++
 .../common/protocol/CommandsScalableTopicTest.java |  24 +++-
 10 files changed, 389 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 36bf129f160..fa7e919880f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -165,6 +165,7 @@ import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -787,6 +788,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        // Scalable topics do not support non-persistent storage. Reject early 
with a
+        // clear error rather than failing later in segment infrastructure.
+        if (topicName.getDomain() == TopicDomain.non_persistent) {
+            ctx.writeAndFlush(Commands.newScalableTopicError(sessionId, 
ServerError.NotAllowedError,
+                    "Scalable topics do not support non-persistent:// 
topics"));
+            return;
+        }
+
         if (!this.service.getPulsar().isRunning()) {
             log.warn("ScalableTopicLookup rejected: broker not ready");
             ctx.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 2cc50301486..350125438e7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -32,8 +32,10 @@ import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
 import org.apache.pulsar.common.api.proto.SegmentBrokerAddress;
 import org.apache.pulsar.common.api.proto.SegmentInfoProto;
 import org.apache.pulsar.common.api.proto.SegmentState;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.scalable.HashRange;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -61,6 +63,9 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
     private final BrokerService brokerService;
 
     private final String metadataPath;
+    /** Canonical {@code topic://...} identity returned to the client 
regardless of the
+     *  input form ({@code topic://}, {@code persistent://}, or short-form). */
+    private final String resolvedTopicName;
     private volatile boolean closed = false;
 
     public DagWatchSession(long sessionId,
@@ -74,6 +79,7 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
         this.resources = resources;
         this.brokerService = brokerService;
         this.metadataPath = resources.topicPath(topicName);
+        this.resolvedTopicName = topicName.toScalableTopic().toString();
         this.log = LOG.with().attr("topic", topicName).attr("sessionId", 
sessionId).build();
     }
 
@@ -85,23 +91,121 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
     /**
      * Start the session: load current metadata, set up watch, and return
      * the initial layout response.
+     *
+     * <p>If no scalable metadata exists at the canonical path:
+     * <ul>
+     *   <li>{@code topic://...} input → fail with {@code TopicNotFound} (the 
scalable
+     *       topic doesn't exist).</li>
+     *   <li>{@code persistent://...} input → build a synthetic layout that 
wraps the
+     *       existing regular (partitioned or non-partitioned) topic as one or 
more
+     *       legacy segments, so V5 clients can operate against the regular 
topic
+     *       through the scalable surface until the operator migrates it.</li>
+     * </ul>
+     * The metadata-store watch is registered regardless, so a subsequent 
migration
+     * that writes scalable metadata to the same path will be observed and the
+     * synthetic layout will be transparently replaced with the real DAG.
      */
     public CompletableFuture<ScalableTopicLayoutResponse> start() {
+        // A specific partition (persistent://t/n/x-partition-K) is not a valid
+        // scalable-topic lookup target — the synthetic layout models the whole
+        // partitioned topic. Reject it up front rather than producing a 
layout that
+        // wraps nonsensical -partition-K-partition-J names.
+        if (topicName.isPartitioned()) {
+            return CompletableFuture.failedFuture(new IllegalArgumentException(
+                    "Cannot open a scalable-topic lookup for an individual 
partition: " + topicName
+                            + "; use the base topic name " + 
topicName.getPartitionedTopicName()));
+        }
+
         // Register through the resources-level fan-out so close() can 
deregister us
         // and we don't accumulate stale store-level listeners over time.
         resources.registerPathListener(this);
 
         return resources.getScalableTopicMetadataAsync(topicName, true)
                 .thenCompose(optMd -> {
-                    if (optMd.isEmpty()) {
-                        return CompletableFuture.failedFuture(
-                                new IllegalStateException("Scalable topic not 
found: " + topicName));
+                    if (optMd.isPresent()) {
+                        return buildResponse(optMd.get());
                     }
-                    ScalableTopicMetadata metadata = optMd.get();
-                    return buildResponse(metadata);
+                    if (topicName.getDomain() == TopicDomain.persistent) {
+                        return buildSyntheticResponse();
+                    }
+                    return CompletableFuture.failedFuture(
+                            new IllegalStateException("Scalable topic not 
found: " + topicName));
                 });
     }
 
+    /**
+     * Build a synthetic layout for a not-yet-migrated regular topic. Each 
partition
+     * (or the whole topic, for non-partitioned) becomes an active legacy 
segment
+     * that wraps the existing {@code persistent://...} topic. The synthetic 
layout
+     * uses mod-N routing (signalled by all active leaves being legacy 
segments)
+     * so V5 producers route the same way v4 producers do.
+     *
+     * <p>Note: {@code fetchPartitionedTopicMetadataAsync} returns {@code 
partitions == 0}
+     * for both an existing non-partitioned topic and a topic that does not 
exist at all.
+     * In the latter case a single-segment synthetic layout is still returned; 
whether the
+     * underlying {@code persistent://...} topic gets auto-created is then 
decided by the
+     * namespace's auto-creation policy when the V5 producer/consumer first 
attaches —
+     * exactly the behaviour a v4 client would see.
+     */
+    private CompletableFuture<ScalableTopicLayoutResponse> 
buildSyntheticResponse() {
+        return brokerService.fetchPartitionedTopicMetadataAsync(topicName)
+                .thenApply(partitionedMd -> {
+                    int partitions = partitionedMd.partitions;
+                    long createdAtMs = System.currentTimeMillis();
+                    Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+                    if (partitions <= 0) {
+                        // Non-partitioned: one legacy segment covering the 
full hash range.
+                        // We're only called when topicName.getDomain() == 
persistent, so
+                        // toString() is the canonical persistent://t/n/x form.
+                        segments.put(0L, SegmentInfo.activeLegacy(
+                                0L,
+                                HashRange.of(0x0000, 0xFFFF),
+                                topicName.toString(),
+                                /*createdAtEpoch*/ 0L,
+                                createdAtMs));
+                    } else {
+                        // Partitioned: N legacy segments. The hash ranges 
here are cosmetic —
+                        // routing for synthetic layouts is mod-N over 
segment_id (see
+                        // SegmentRouter on the SDK side), so the ranges are 
never consulted
+                        // for routing. They're assigned for stable, 
human-readable layouts.
+                        for (int k = 0; k < partitions; k++) {
+                            HashRange range = syntheticRange(k, partitions);
+                            segments.put((long) k, SegmentInfo.activeLegacy(
+                                    k,
+                                    range,
+                                    topicName.getPartition(k).toString(),
+                                    /*createdAtEpoch*/ 0L,
+                                    createdAtMs));
+                        }
+                    }
+                    return new ScalableTopicLayoutResponse(
+                            /*epoch*/ 0L,
+                            segments,
+                            /*segmentBrokerAddresses*/ null,
+                            /*segmentBrokerAddressesTls*/ null,
+                            /*controllerBrokerUrl*/ null,
+                            /*controllerBrokerUrlTls*/ null);
+                });
+    }
+
+    /**
+     * Cosmetic hash range for partition {@code k} of {@code n} in a synthetic 
layout.
+     * When {@code n <= 0x10000} the ranges tile {@code [0x0000, 0xFFFF]} 
contiguously;
+     * beyond that there are more partitions than hash slots, so a degenerate 
single-slot
+     * range (clamped to the hash space) is used. Either way the value is 
never consulted
+     * for routing — synthetic layouts route mod-N over segment_id.
+     */
+    private static HashRange syntheticRange(int k, int n) {
+        if (n > 0x10000) {
+            int slot = Math.min(k, 0xFFFF);
+            return HashRange.of(slot, slot);
+        }
+        int width = 0x10000 / n;
+        int start = k * width;
+        int end = (k == n - 1) ? 0xFFFF : (start + width - 1);
+        return HashRange.of(start, end);
+    }
+
     /**
      * Invoked by the {@link ScalableTopicResources} fan-out for every 
metadata event
      * matching this session's topic path. The registry already path-filtered 
for us;
@@ -142,7 +246,10 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
         }
         ScalableTopicDAG dag = buildDagProto(response);
         log.info().attr("epoch", response.epoch()).log("Pushing DAG update");
-        cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId, 
dag));
+        // Always report the canonical topic://... identity so clients that 
looked up
+        // via persistent://... or short-form know the resolved name.
+        cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(
+                sessionId, resolvedTopicName, dag));
     }
 
     private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse 
response) {
@@ -170,6 +277,10 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
             if (seg.sealedAtMs() >= 0) {
                 segProto.setSealedAtMs(seg.sealedAtMs());
             }
+            // Legacy segments wrap an existing, externally managed 
persistent://... topic.
+            if (seg.legacyTopicName() != null) {
+                segProto.setLegacyTopicName(seg.legacyTopicName());
+            }
         }
 
         // Add broker addresses for active segments
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
index a14ef6d5780..849061e0520 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.scalable.HashRange;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentState;
@@ -104,6 +105,8 @@ public class DagWatchSessionTest {
 
     @Test
     public void testStartFailsWhenTopicMetadataMissing() {
+        // topic://... input + no scalable metadata = TopicNotFound. Synthetic 
layouts
+        // are only produced for persistent://... input (regular topics).
         when(resources.getScalableTopicMetadataAsync(TOPIC, true))
                 
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
 
@@ -124,6 +127,121 @@ public class DagWatchSessionTest {
         }
     }
 
+    // --- synthetic layout for not-yet-migrated regular topics ---
+
+    @Test
+    public void 
testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() throws 
Exception {
+        // persistent:// input + no scalable metadata + non-partitioned 
regular topic
+        // (partitions=0) → synthetic layout with a single active legacy 
segment
+        // covering [0x0000, 0xFFFF] that wraps the existing persistent:// 
topic.
+        TopicName regular = TopicName.get("persistent://tenant/ns/my-regular");
+        String regularPath = "/admin/scalable-topics/tenant/ns/my-regular";
+        when(resources.topicPath(regular)).thenReturn(regularPath);
+        when(resources.getScalableTopicMetadataAsync(regular, true))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+        when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
+                .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0)));
+
+        DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, 
resources, brokerService);
+        ScalableTopicLayoutResponse response = s.start().get();
+
+        assertEquals(response.epoch(), 0L);
+        assertEquals(response.segments().size(), 1);
+        SegmentInfo seg = response.segments().get(0L);
+        assertNotNull(seg);
+        assertEquals(seg.segmentId(), 0L);
+        assertEquals(seg.hashRange().start(), 0x0000);
+        assertEquals(seg.hashRange().end(), 0xFFFF);
+        assertTrue(seg.isActive());
+        assertTrue(seg.isLegacy(), "non-partitioned regular topic must wrap as 
a legacy segment");
+        assertEquals(seg.legacyTopicName(), 
"persistent://tenant/ns/my-regular");
+    }
+
+    @Test
+    public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() 
throws Exception {
+        // persistent:// input + no scalable metadata + 4-partition topic →
+        // synthetic layout with 4 active legacy segments wrapping each
+        // persistent://...-partition-K and equal-width contiguous hash ranges.
+        TopicName regular = 
TopicName.get("persistent://tenant/ns/my-partitioned");
+        String regularPath = "/admin/scalable-topics/tenant/ns/my-partitioned";
+        when(resources.topicPath(regular)).thenReturn(regularPath);
+        when(resources.getScalableTopicMetadataAsync(regular, true))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+        when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
+                .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(4)));
+
+        DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, 
resources, brokerService);
+        ScalableTopicLayoutResponse response = s.start().get();
+
+        assertEquals(response.epoch(), 0L);
+        assertEquals(response.segments().size(), 4);
+        for (int k = 0; k < 4; k++) {
+            SegmentInfo seg = response.segments().get((long) k);
+            assertNotNull(seg, "missing segment for partition " + k);
+            assertEquals(seg.segmentId(), k);
+            assertTrue(seg.isActive());
+            assertTrue(seg.isLegacy(), "partition " + k + " must wrap as a 
legacy segment");
+            assertEquals(seg.legacyTopicName(),
+                    "persistent://tenant/ns/my-partitioned-partition-" + k);
+        }
+        // Hash ranges cover [0x0000, 0xFFFF] contiguously and end at 0xFFFF 
inclusive
+        // on the last segment.
+        assertEquals(response.segments().get(0L).hashRange().start(), 0x0000);
+        assertEquals(response.segments().get(3L).hashRange().end(), 0xFFFF);
+        // No gaps between consecutive segments.
+        for (int k = 0; k < 3; k++) {
+            int endK = response.segments().get((long) k).hashRange().end();
+            int startK1 = response.segments().get((long) (k + 
1)).hashRange().start();
+            assertEquals(startK1, endK + 1, "gap between partition " + k + " 
and " + (k + 1));
+        }
+    }
+
+    @Test
+    public void testStartRejectsIndividualPartitionInput() throws Exception {
+        // A specific partition name must be rejected — the synthetic layout 
models the
+        // whole partitioned topic, and wrapping a single partition would 
otherwise
+        // produce nonsensical -partition-K-partition-J underlying names.
+        TopicName partition = 
TopicName.get("persistent://tenant/ns/my-partitioned-partition-3");
+
+        DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, 
resources, brokerService);
+        CompletableFuture<ScalableTopicLayoutResponse> future = s.start();
+
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.get();
+            fail("expected failure");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException, "got: 
" + e.getCause());
+            assertTrue(e.getCause().getMessage().contains("individual 
partition"),
+                    e.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() {
+        // The synthetic-layout response goes through pushUpdate, which always 
emits the
+        // canonical topic://... identity in resolved_topic_name regardless of 
input form.
+        TopicName regular = TopicName.get("persistent://tenant/ns/my-regular");
+        ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse(
+                0L,
+                Map.of(0L, SegmentInfo.activeLegacy(0L, HashRange.of(0x0000, 
0xFFFF),
+                        "persistent://tenant/ns/my-regular", 0L, 12345L)),
+                null, null, null, null);
+
+        DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, 
resources, brokerService);
+        s.pushUpdate(response);
+
+        ArgumentCaptor<ByteBuf> captor = 
ArgumentCaptor.forClass(ByteBuf.class);
+        verify(ctx).writeAndFlush(captor.capture());
+        BaseCommand cmd = parseFrame(captor.getValue());
+        assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(),
+                "topic://tenant/ns/my-regular");
+        // The legacy-segment marker round-trips through the wire format.
+        var seg = cmd.getScalableTopicUpdate().getDag().getSegmentAt(0);
+        assertTrue(seg.hasLegacyTopicName());
+        assertEquals(seg.getLegacyTopicName(), 
"persistent://tenant/ns/my-regular");
+    }
+
     @Test
     public void testStartRegistersWithResources() {
         // start() routes through the resources-level fan-out instead of 
registering
@@ -314,7 +432,8 @@ public class DagWatchSessionTest {
                 createdAt,
                 sealedAt,
                 createdAtMs,
-                sealedAtMs);
+                sealedAtMs,
+                null);
     }
 
     private static java.util.List<Long> toList(long[] arr) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 183e66b68d9..db274bc8712 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -539,19 +539,19 @@ public class ScalableTopicControllerTest {
                 org.apache.pulsar.common.scalable.SegmentState.SEALED,
                 java.util.List.of(), java.util.List.of(3L),
                 /*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1,
-                /*createdAtMs*/ t0, /*sealedAtMs*/ t1);
+                /*createdAtMs*/ t0, /*sealedAtMs*/ t1, null);
         org.apache.pulsar.common.scalable.SegmentInfo seg1 = new 
org.apache.pulsar.common.scalable.SegmentInfo(
                 1L,
                 org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF),
                 org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
                 java.util.List.of(), java.util.List.of(),
-                0, -1, t1, -1);
+                0, -1, t1, -1, null);
         org.apache.pulsar.common.scalable.SegmentInfo seg2 = new 
org.apache.pulsar.common.scalable.SegmentInfo(
                 2L,
                 org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF),
                 org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
                 java.util.List.of(), java.util.List.of(),
-                0, -1, t3, -1);
+                0, -1, t3, -1, null);
 
         TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic");
         ScalableTopicMetadata md = ScalableTopicMetadata.builder()
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 52e9a60ad9b..ea70d11a0ac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -300,6 +300,25 @@ public class TopicName implements ServiceUnitId {
         return get(partitionName);
     }
 
+    /**
+     * Returns this topic re-expressed in the scalable {@code topic://...} 
domain,
+     * regardless of the original input domain. Used to derive the canonical
+     * scalable-topic identity for a name the user may have spelled as
+     * {@code persistent://...} or short-form.
+     *
+     * <p>Any {@code -partition-K} suffix is stripped, so a partition name like
+     * {@code persistent://t/n/x-partition-3} resolves to the base topic's 
scalable
+     * identity {@code topic://t/n/x}, not {@code topic://t/n/x-partition-3}.
+     */
+    public TopicName toScalableTopic() {
+        if (domain == TopicDomain.topic) {
+            return this;
+        }
+        TopicName base = isPartitioned() ? get(getPartitionedTopicName()) : 
this;
+        return get(TopicDomain.topic.value() + "://" + base.getNamespace()
+                + "/" + base.getEncodedLocalName());
+    }
+
     /**
      * @return partition index of the completeTopicName.
      * It returns -1 if the completeTopicName (topic) is not partitioned.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2c0c74fb378..e6cb2605315 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1700,10 +1700,16 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newScalableTopicUpdate(long sessionId, 
ScalableTopicDAG dag) {
+    public static ByteBuf newScalableTopicUpdate(long sessionId, String 
resolvedTopicName,
+                                                 ScalableTopicDAG dag) {
         BaseCommand cmd = new 
BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
         CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate()
                 .setSessionId(sessionId);
+        // resolved_topic_name is optional on the wire; guard against null 
because the
+        // lightproto setter would NPE rather than leave the field unset.
+        if (resolvedTopicName != null) {
+            update.setResolvedTopicName(resolvedTopicName);
+        }
         update.setDag().copyFrom(dag);
         return serializeWithSize(cmd);
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
index 848326dd026..0b727b23e18 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
@@ -35,15 +35,25 @@ import java.util.List;
  *       Used for retention-based segment GC and for timestamp-based seek.</li>
  * </ul>
  *
- * @param segmentId      monotonically increasing, unique within the topic
- * @param hashRange      inclusive hash range [start, end]
- * @param state          ACTIVE or SEALED
- * @param parentIds      parent segment IDs in the DAG (empty for initial/root 
segments)
- * @param childIds       child segment IDs in the DAG (empty for active leaf 
segments)
- * @param createdAtEpoch DAG epoch when this segment was created
- * @param sealedAtEpoch  DAG epoch when sealed (-1 if still active)
- * @param createdAtMs    wall-clock millis at creation time
- * @param sealedAtMs     wall-clock millis at seal time (-1 if still active)
+ * <p>A segment may be a <i>legacy segment</i> — one that is not managed by the
+ * scalable-topic controller and has no {@code segment://...} URI of its own; 
instead it
+ * wraps an existing, externally managed {@code persistent://...} topic. 
Legacy segments
+ * appear in the synthetic-layout response returned for a regular (partitioned 
or
+ * non-partitioned) topic that has not yet been migrated to a scalable topic.
+ * {@code legacyTopicName} is non-null exactly for legacy segments.
+ *
+ * @param segmentId          monotonically increasing, unique within the topic
+ * @param hashRange          inclusive hash range [start, end]
+ * @param state              ACTIVE or SEALED
+ * @param parentIds          parent segment IDs in the DAG (empty for 
initial/root segments)
+ * @param childIds           child segment IDs in the DAG (empty for active 
leaf segments)
+ * @param createdAtEpoch     DAG epoch when this segment was created
+ * @param sealedAtEpoch      DAG epoch when sealed (-1 if still active)
+ * @param createdAtMs        wall-clock millis at creation time
+ * @param sealedAtMs         wall-clock millis at seal time (-1 if still 
active)
+ * @param legacyTopicName    for legacy segments: the externally managed
+ *                           {@code persistent://...} topic this segment wraps.
+ *                           {@code null} for regular controller-managed 
segments.
  */
 public record SegmentInfo(
         long segmentId,
@@ -54,7 +64,8 @@ public record SegmentInfo(
         long createdAtEpoch,
         long sealedAtEpoch,
         long createdAtMs,
-        long sealedAtMs
+        long sealedAtMs,
+        String legacyTopicName
 ) {
     public SegmentInfo {
         parentIds = parentIds != null ? List.copyOf(parentIds) : List.of();
@@ -65,32 +76,56 @@ public record SegmentInfo(
     public static SegmentInfo active(long segmentId, HashRange hashRange,
                                      long createdAtEpoch, long createdAtMs) {
         return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
-                List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1);
+                List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, 
null);
     }
 
     /** Create a new active segment with the given parent IDs. */
     public static SegmentInfo active(long segmentId, HashRange hashRange,
                                      List<Long> parentIds, long 
createdAtEpoch, long createdAtMs) {
         return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
-                parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1);
+                parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1, 
null);
+    }
+
+    /**
+     * Create a new active legacy segment that wraps the given externally 
managed
+     * {@code persistent://...} topic instead of having its own {@code 
segment://...} URI.
+     * Used by the synthetic-layout response for not-yet-migrated regular 
topics.
+     */
+    public static SegmentInfo activeLegacy(long segmentId, HashRange hashRange,
+                                           String legacyTopicName,
+                                           long createdAtEpoch, long 
createdAtMs) {
+        return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
+                List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, 
legacyTopicName);
     }
 
     /** Return a sealed copy of this segment with the given child IDs. */
     public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List<Long> 
childIds) {
         return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED,
-                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs);
+                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs,
+                legacyTopicName);
     }
 
     /** Return a copy with different parent IDs. */
     public SegmentInfo withParentIds(List<Long> parentIds) {
         return new SegmentInfo(segmentId, hashRange, state,
-                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs);
+                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs,
+                legacyTopicName);
     }
 
     /** Return a copy with different child IDs. */
     public SegmentInfo withChildIds(List<Long> childIds) {
         return new SegmentInfo(segmentId, hashRange, state,
-                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs);
+                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs,
+                legacyTopicName);
+    }
+
+    /**
+     * True if this is a legacy segment — one that wraps an existing, 
externally managed
+     * {@code persistent://...} topic rather than owning a controller-managed
+     * {@code segment://...} URI. An empty {@code legacyTopicName} does not 
count as legacy.
+     */
+    public boolean isLegacy() {
+        return legacyTopicName != null && !legacyTopicName.isEmpty();
     }
 
     public boolean isActive() {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index b2a698d33da..28501c8bc66 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -855,6 +855,14 @@ message SegmentInfoProto {
     // and timestamp-based seek; epoch above is a DAG generation number, not a 
clock.
     required uint64 created_at_ms    = 9;
     optional uint64 sealed_at_ms     = 10;
+
+    // Legacy-segment marker. When set, this segment is not managed by the 
scalable-topic
+    // controller and has no segment://... topic of its own — it wraps the 
named, externally
+    // managed persistent://... topic instead. Used by the synthetic-layout 
response the
+    // broker returns for a regular (partitioned or non-partitioned) topic 
that has not yet
+    // been migrated to a scalable topic. When absent, the segment URI is 
computed normally
+    // from the scalable topic name and segment_id 
(segment://<topic>/<descriptor>).
+    optional string legacy_topic_name = 11;
 }
 
 message SegmentBrokerAddress {
@@ -874,7 +882,11 @@ message ScalableTopicDAG {
 // Client -> Broker: Request scalable topic metadata and initiate watch session
 message CommandScalableTopicLookup {
     required uint64 session_id = 1;   // Client-assigned session ID
-    required string topic      = 2;   // e.g. "topic://tenant/ns/my-topic"
+    // Any of "topic://t/n/x", "persistent://t/n/x", or a short form like
+    // "my-topic" (normalized by the broker to 
persistent://public/default/my-topic).
+    // The broker resolves the input to the canonical topic://... identity and
+    // returns it in CommandScalableTopicUpdate.resolved_topic_name.
+    required string topic      = 2;
 }
 
 // Broker -> Client: Used for BOTH initial response and subsequent pushed 
updates
@@ -884,6 +896,12 @@ message CommandScalableTopicUpdate {
 
     optional ServerError error         = 3;
     optional string message            = 4;
+
+    // Canonical scalable-topic identity (always "topic://t/n/x") that the 
client
+    // should use for downstream operations. Set on every success response,
+    // including for inputs that were given as persistent://... or short-form.
+    // Absent on error responses.
+    optional string resolved_topic_name = 5;
 }
 
 // Client -> Broker: Close the DAG watch session
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 412d3696b1e..eb9254ecf4a 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -517,4 +517,26 @@ public class TopicNameTest {
         assertThrows(IllegalArgumentException.class,
                 () -> 
TopicName.toFullTopicName("non-persistent://tenant/cluster/ns/topic"));
     }
+
+    @Test
+    public void testToScalableTopic() {
+        // topic://... is returned as-is.
+        TopicName already = TopicName.get("topic://tenant/ns/x");
+        assertEquals(already.toScalableTopic(), already);
+
+        // persistent://... is re-expressed in the topic:// domain.
+        
assertEquals(TopicName.get("persistent://tenant/ns/x").toScalableTopic().toString(),
+                "topic://tenant/ns/x");
+
+        // Short forms normalise to persistent://public/default/... first, 
then to topic://.
+        assertEquals(TopicName.get("my-topic").toScalableTopic().toString(),
+                "topic://public/default/my-topic");
+        
assertEquals(TopicName.get("tenant/ns/my-topic").toScalableTopic().toString(),
+                "topic://tenant/ns/my-topic");
+
+        // A -partition-K suffix is stripped: the partition resolves to the 
base topic's
+        // scalable identity, not topic://.../x-partition-K.
+        
assertEquals(TopicName.get("persistent://tenant/ns/x-partition-3").toScalableTopic().toString(),
+                "topic://tenant/ns/x");
+    }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
index c2c92676362..a5d028889ee 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
@@ -101,12 +101,13 @@ public class CommandsScalableTopicTest {
                 .addParentId(0L);
         
dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650");
 
-        ByteBuf frame = Commands.newScalableTopicUpdate(77L, dag);
+        ByteBuf frame = Commands.newScalableTopicUpdate(77L, "topic://t/n/x", 
dag);
         BaseCommand cmd = parseFrame(frame);
 
         assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
         assertTrue(cmd.hasScalableTopicUpdate());
         assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 77L);
+        assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), 
"topic://t/n/x");
         assertFalse(cmd.getScalableTopicUpdate().hasError(),
                 "successful update must not carry an error field");
 
@@ -132,6 +133,27 @@ public class CommandsScalableTopicTest {
         assertEquals(got.getSegmentBrokerAt(0).getBrokerUrl(), 
"pulsar://broker-a:6650");
     }
 
+    @Test
+    public void 
testNewScalableTopicUpdateWithNullResolvedTopicNameLeavesFieldUnset() {
+        // resolved_topic_name is optional on the wire; a null must serialise 
cleanly with
+        // the field unset rather than NPE in the lightproto setter.
+        ScalableTopicDAG dag = new ScalableTopicDAG().setEpoch(1L);
+        dag.addSegment()
+                .setSegmentId(0L)
+                .setHashStart(0x0000)
+                .setHashEnd(0xFFFF)
+                .setState(SegmentState.ACTIVE)
+                .setCreatedAtEpoch(0L)
+                .setCreatedAtMs(System.currentTimeMillis());
+
+        ByteBuf frame = Commands.newScalableTopicUpdate(5L, null, dag);
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 5L);
+        assertFalse(cmd.getScalableTopicUpdate().hasResolvedTopicName(),
+                "null resolvedTopicName must leave the optional field unset");
+    }
+
     @Test
     public void testNewScalableTopicError() {
         ByteBuf frame = Commands.newScalableTopicError(15L, 
ServerError.TopicNotFound,


Reply via email to