This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f9502f3805 [feat][broker] PIP-475: regular-to-scalable topic 
migration command (#25875)
8f9502f3805 is described below

commit 8f9502f3805dca8b2febb3c7a9b3a46e534107b8
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 07:30:20 2026 -0700

    [feat][broker] PIP-475: regular-to-scalable topic migration command (#25875)
---
 pip/pip-475.md                                     | 135 ++++++++-------
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../pulsar/broker/admin/v2/ScalableTopics.java     | 190 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerService.java       |  23 +++
 .../service/scalable/ScalableTopicController.java  |  67 ++++++++
 .../broker/admin/ScalableTopicMigrationTest.java   | 161 +++++++++++++++++
 .../broker/service/SharedPulsarBaseTest.java       |   7 +
 .../scalable/ScalableTopicControllerTest.java      |  68 ++++++++
 .../apache/pulsar/client/admin/ScalableTopics.java |  21 +++
 .../policies/data/ScalableTopicMetadata.java       |  11 ++
 .../policies/data/ScalableTopicMetadataTest.java   |  23 ++-
 .../client/admin/internal/ScalableTopicsImpl.java  |  14 ++
 .../apache/pulsar/admin/cli/CmdScalableTopics.java |  19 +++
 .../client/impl/v5/ScalableQueueConsumer.java      |   9 +
 .../client/impl/v5/ScalableStreamConsumer.java     |   9 +
 .../client/impl/v5/ScalableTopicProducer.java      |  17 +-
 .../common/policies/data/TopicOperation.java       |   4 +
 .../common/scalable/ScalableTopicConstants.java    |  45 +++++
 18 files changed, 755 insertions(+), 69 deletions(-)

diff --git a/pip/pip-475.md b/pip/pip-475.md
index 0337de31dd7..11a22b9c0d7 100644
--- a/pip/pip-475.md
+++ b/pip/pip-475.md
@@ -77,7 +77,7 @@ The broker responds to the lookup session based on the input 
form and the topic'
 | Input form | Lookup response |
 |---|---|
 | `topic://t/n/x` | Real DAG layout (or `NotFound` if the scalable topic 
doesn't exist — same as today). |
-| `persistent://t/n/x` | If scalable metadata exists for the equivalent 
`topic://t/n/x`: real DAG layout, with the topic identity promoted to 
`topic://...` for all subsequent operations. Otherwise: a **synthetic layout** 
that models the regular topic's partitions as **special segments** (see 
[Special segments](#special-segments) below). |
+| `persistent://t/n/x` | If scalable metadata exists for the equivalent 
`topic://t/n/x`: real DAG layout, with the topic identity promoted to 
`topic://...` for all subsequent operations. Otherwise: a **synthetic layout** 
that models the regular topic's partitions as **legacy segments** (see [Legacy 
segments](#legacy-segments) below). |
 | `my-topic` (or any short / unscoped form) | Normalize to 
`persistent://public/default/my-topic` then apply the rule above. |
 | `non-persistent://...` | Reject at `create()` / `subscribe()` with 
`UnsupportedOperationException`. V5 does not support non-persistent topics. |
 
@@ -85,16 +85,16 @@ Because the lookup session is push-based, the broker can 
**update the layout in
 
 This gives the "once scalable, always scalable" guarantee (Goal 4) for free: 
once the lookup session has reported a real DAG, future updates can only refine 
the DAG via splits / merges; there is no "downgrade to synthetic" path the 
broker exposes.
 
-### Special segments
+### Legacy segments
 
-A special segment is the lookup-session encoding of "this slice of the 
keyspace is not yet a real `segment://...` topic — it's the existing 
`persistent://t/n/x[-partition-K]` topic instead." It carries the same fields 
as a regular segment plus a marker that points at the underlying 
`persistent://...` name.
+A legacy segment is the lookup-session encoding of "this slice of the keyspace 
is not yet a real `segment://...` topic — it's the existing 
`persistent://t/n/x[-partition-K]` topic instead." It carries the same fields 
as a regular segment plus a marker that points at the underlying 
`persistent://...` name.
 
-The V5 SDK's per-segment producer and consumer infrastructure already attaches 
to a topic name; the only difference for a special segment is that the name 
uses the `persistent://` domain instead of `segment://`. No separate code path, 
no separate wrapper class hierarchy.
+The V5 SDK's per-segment producer and consumer infrastructure already attaches 
to a topic name; the only difference for a legacy segment is that the name uses 
the `persistent://` domain instead of `segment://`. No separate code path, no 
separate wrapper class hierarchy.
 
 For routing, the layout carries the routing function as data:
 
-- **Synthetic layout for an N-partitioned regular topic**: N special segments, 
one per partition; routing is `signSafeMod(murmurHash3_32(key), N)` — exactly 
v4's partitioned-producer routing. Producers route the same way the v4 SDK 
would, ensuring per-key ordering during the migration window.
-- **Synthetic layout for a non-partitioned regular topic**: 1 special segment 
covering the full hash range; routing is trivial (no key matters).
+- **Synthetic layout for an N-partitioned regular topic**: N legacy segments, 
one per partition; routing is `signSafeMod(murmurHash3_32(key), N)` — exactly 
v4's partitioned-producer routing. Producers route the same way the v4 SDK 
would, ensuring per-key ordering during the migration window.
+- **Synthetic layout for a non-partitioned regular topic**: 1 legacy segment 
covering the full hash range; routing is trivial (no key matters).
 - **Real DAG (post-migration or natively scalable)**: range-based routing — 
the standard scalable-topic semantics. Producers route by hash range; per-key 
ordering across the migration boundary is preserved by the controller's 
drain-before-assign protocol (see [Routing across 
migration](#routing-across-migration) below).
 
 ### Migration protocol (operator's view)
@@ -108,35 +108,48 @@ Pre-migration (steady state):
     persistent://...-partition-K topics, with mod-N routing.
 
 Step 1 — Operator upgrades all clients to the V5 SDK.
-  Step 2 enforces this: the migration command inspects the topic's active
-  producer/consumer connections and fails with HTTP 409 if any v4
-  connections remain. Old code keeps working unchanged before migration
-  because the synthetic layout exposes the existing persistent topics
-  through the V5 surface; routing is mod-N so per-key destinations are
-  identical to v4 partitioned-topic routing.
+  Step 2 enforces this by default: the migration command inspects the
+  topic's attached producers/consumers and fails with HTTP 409 if any
+  *legacy v4* connection remains. V5 connections are recognised by a
+  reserved metadata marker the V5 SDK sets on every per-segment v4
+  producer/consumer it creates, so a topic served only by upgraded V5
+  clients passes the check even though those clients are attached to the
+  underlying persistent:// partitions via the synthetic layout. The
+  operator can override the check with --force (e.g. to evict a stale v4
+  client). Old code keeps working unchanged before migration because the
+  synthetic layout exposes the existing persistent topics through the V5
+  surface; routing is mod-N so per-key destinations are identical to v4
+  partitioned-topic routing.
 
 Step 2 — Operator runs:
-    pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x
+    pulsar-admin scalable-topics migrate persistent://t/n/x [--force]
 
   The broker:
-   2a. Validates that no v4 producer/consumer connections are attached to
-       the topic. If any are, fails with HTTP 409 and the connection count
-       in the error message. (Also fails 409 if scalable metadata already
-       exists, and 404 if the topic doesn't exist.)
+   2a. Validates that the topic exists and is not already scalable (else
+       404 / 409). Unless --force is set, validates that no legacy v4
+       producer/consumer connection is attached (counting only connections
+       without the V5-managed marker); if any are, fails with HTTP 409.
    2b. Builds ScalableTopicMetadata with:
-        • N sealed parent segments (or 1 for non-partitioned), each
-          wrapping the existing managed ledger for
-          persistent://t/n/x-partition-K (or persistent://t/n/x for
-          non-partitioned). The managed ledgers are unchanged; no data copy.
+        • N sealed legacy parent segments (or 1 for non-partitioned), each
+          wrapping the existing persistent://t/n/x-partition-K (or
+          persistent://t/n/x for non-partitioned). The managed ledgers are
+          unchanged; no data copy. Each parent spans the full hash range
+          (v4 mod-N routing scattered keys for any child range across every
+          partition).
         • N active child segments (or 1) with equal-width contiguous hash
           ranges covering [0x0000, 0xFFFF], using standard range-based
           routing. Each child has all N parents as predecessors in the DAG.
        New writes route to children; subscriptions drain the parents
        before consuming from children — the same drain-before-assign
        protocol the controller already uses for segment splits / merges.
-   2c. Atomically writes the metadata, taking the topic from
-       "regular" to "scalable" in one CAS on the metadata store.
-   2d. Pushes the new layout to every connected lookup session.
+   2c. Creates the backing segment topics for the new active children
+       (the sealed parents reuse the existing persistent:// managed ledgers).
+   2d. Atomically writes the metadata, taking the topic from
+       "regular" to "scalable" in one CAS on the metadata store. This is
+       the commit point; connected lookup sessions are pushed the real DAG
+       via the metadata-store watch.
+   2e. Terminates the old persistent:// topic(s) so no further v4 writes
+       can land — they become the drainable sealed parent segments.
 
 Step 3 — Connected V5 clients receive the layout-update push on their
   lookup session. Synthetic layout → real DAG. The SDK's existing
@@ -164,21 +177,21 @@ For non-partitioned topics (N=1) the same protocol 
applies trivially: one sealed
 
 ### 1. V5 SDK changes
 
-The SDK reuses its existing scalable code path — the lookup session, 
per-segment producer / consumer infrastructure, layout-change handler — for 
*every* topic, including regular ones. The only new bits are: how the lookup 
session is opened from `persistent://` / short-form input, how the SDK 
interprets a "special segment" entry in the layout, and how it carries the 
routing function carried by the layout.
+The SDK reuses its existing scalable code path — the lookup session, 
per-segment producer / consumer infrastructure, layout-change handler — for 
*every* topic, including regular ones. The only new bits are: how the lookup 
session is opened from `persistent://` / short-form input, how the SDK 
interprets a "legacy segment" entry in the layout, and how it carries the 
routing function carried by the layout.
 
 #### 1.1 Lookup session opens for any input form
 
 `PulsarClientV5` opens a lookup session for the user's topic regardless of 
domain. The existing scalable-topic lookup-session machinery is extended so the 
broker accepts `persistent://...` and short-form names in addition to 
`topic://...`. The session response carries:
 
 - The promoted topic identity (always `topic://t/n/x` after normalization).
-- The current layout — either a real DAG or a synthetic layout (see [Special 
segments](#special-segments) above).
+- The current layout — either a real DAG or a synthetic layout (see [Legacy 
segments](#legacy-segments) above).
 - The routing function (`mod-N` for the synthetic layout, `range-based` for 
the real DAG).
 
 `non-persistent://...` inputs are rejected at the V5 builder before the lookup 
is opened, with `UnsupportedOperationException`.
 
 #### 1.2 Special-segment handling in the per-segment infrastructure
 
-A regular `Segment` carries a `segment://...` URI. A special segment carries a 
`persistent://...` URI instead, plus a flag indicating it's special. The SDK's 
per-segment producer (`PerSegmentProducer`) and per-segment consumer 
(`PerSegmentConsumer`) attach to whatever URI the segment carries — the v4 
producer / consumer beneath them already accepts `persistent://` and 
`segment://` alike. No separate adapter classes are introduced.
+A regular `Segment` carries a `segment://...` URI. A legacy segment carries a 
`persistent://...` URI instead, plus a flag indicating it's special. The SDK's 
per-segment producer (`PerSegmentProducer`) and per-segment consumer 
(`PerSegmentConsumer`) attach to whatever URI the segment carries — the v4 
producer / consumer beneath them already accepts `persistent://` and 
`segment://` alike. No separate adapter classes are introduced.
 
 V5-specific features that don't apply on a regular topic surface as ordinary 
"this layout doesn't support that" errors at the API surface:
 
@@ -190,7 +203,7 @@ V5-specific features that don't apply on a regular topic 
surface as ordinary "th
 
 Layout updates pushed by the lookup session already trigger the SDK's 
per-segment reconcile: segments that disappeared get their per-segment 
producers / consumers torn down; new segments get fresh ones; segments whose 
URI changed get rebuilt on the new URI.
 
-A migration is exactly this: the special segment with URI 
`persistent://t/n/x-partition-K` is replaced in the new layout with a real 
segment whose URI is `segment://t/n/x/<descriptor>` — and that `segment://...` 
resolves to the same managed ledger. The reconciler tears down the per-segment 
v4 producer/consumer attached to the `persistent://` URI and reattaches one to 
the `segment://` URI; from the application's perspective the SDK's internal 
reconnect happens (as it does for any layout c [...]
+A migration is exactly this: the legacy segment with URI 
`persistent://t/n/x-partition-K` is replaced in the new layout with a real 
segment whose URI is `segment://t/n/x/<descriptor>` — and that `segment://...` 
resolves to the same managed ledger. The reconciler tears down the per-segment 
v4 producer/consumer attached to the `persistent://` URI and reattaches one to 
the `segment://` URI; from the application's perspective the SDK's internal 
reconnect happens (as it does for any layout ch [...]
 
 There is no `TopicMigratedException` exposed to the application by default. 
Applications that *want* to observe migrations can subscribe to the lookup 
session's layout-change events directly via a future hook — out of scope for 
this PIP.
 
@@ -198,13 +211,15 @@ There is no `TopicMigratedException` exposed to the 
application by default. Appl
 
 #### 2.1 Admin REST endpoint
 
-`POST 
/admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable` — 
requires `produce` permission on the topic (the same permission needed to write 
to it in the first place). Migration is irreversible but does not destroy data, 
so the blast radius is bounded by what a write-permissioned user can already 
do; super-user is not required.
+`POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate` — 
authorized via a dedicated `TopicOperation.MIGRATE_TO_SCALABLE` operation, 
which the default authorization provider maps to `PRODUCE` permission on the 
topic (the same permission needed to write to it in the first place). Exposing 
it as its own operation lets third-party RBAC implementations restrict 
migration independently of produce. Migration is irreversible but does not 
destroy data, so the blast radius is bounde [...]
+
+Query parameter: `force` (boolean, default `false`).
 
 No request body.
 
-The broker counts v4 producer / consumer connections via the existing 
topic-stats path and rejects the migration with HTTP 409 if any are present, 
with the count in the error message.
+Unless `force=true`, the broker enumerates the producers/consumers attached to 
the source topic (across all partitions, via the topic-stats path) and rejects 
the migration with HTTP 409 if any *legacy v4* connection is present. A 
connection is recognised as V5-managed — and therefore excluded — by a reserved 
metadata marker the V5 SDK sets on every per-segment v4 producer/consumer it 
creates; everything else counts as a legacy v4 connection. `force=true` skips 
this check.
 
-Response: `204 No Content` on success. `409 Conflict` if scalable metadata 
already exists or v4 connections are still attached. `404` if the topic doesn't 
exist.
+Response: `204 No Content` on success. `409 Conflict` if scalable metadata 
already exists or legacy v4 connections are still attached (and `force` is not 
set). `404` if the topic doesn't exist.
 
 #### 2.2 Admin client
 
@@ -213,42 +228,43 @@ package org.apache.pulsar.client.admin;
 
 public interface ScalableTopics {
     /** Migrate an existing partitioned or non-partitioned topic to a scalable 
topic. */
-    void migrateToScalable(String topic) throws PulsarAdminException;
-    CompletableFuture<Void> migrateToScalableAsync(String topic);
+    void migrateToScalable(String topic) throws PulsarAdminException;          
  // force = false
+    void migrateToScalable(String topic, boolean force) throws 
PulsarAdminException;
+    CompletableFuture<Void> migrateToScalableAsync(String topic);              
 // force = false
+    CompletableFuture<Void> migrateToScalableAsync(String topic, boolean 
force);
 }
 ```
 
 #### 2.3 CLI
 
 ```
-pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x
+pulsar-admin scalable-topics migrate persistent://t/n/x [--force]
 ```
 
 #### 2.4 Broker-side migration steps
 
-Executed by the topic's owning broker, atomically as far as possible:
+Executed by the topic's owning broker:
 
-1. **Lock**: acquire a metadata-store lock on `/topics/t/n/x` to prevent 
concurrent migrations or competing admin operations.
-2. **Precheck**:
-   - Topic exists (as either partitioned or non-partitioned).
-   - No `ScalableTopicMetadata` already exists at the same path. If it does, 
fail 409.
-   - No v4 producer / consumer connections are attached. If any are, fail 409 
with the count in the error message.
-3. **Build initial layout**:
-   - N sealed parent segments (or 1 for non-partitioned), each wrapping the 
existing managed ledger for `persistent://t/n/x-partition-K` (or 
`persistent://t/n/x`).
-   - N active child segments (or 1 for non-partitioned) with equal-width 
contiguous hash ranges covering `[0x0000, 0xFFFF]`. Routing is range-based.
+1. **Precheck**:
+   - No `ScalableTopicMetadata` already exists at the path. If it does, fail 
409.
+   - Topic exists (as either partitioned or non-partitioned). If not, fail 404.
+   - Unless `force`, no *legacy v4* connection is attached (counting only 
producers/consumers whose metadata lacks the V5-managed marker). If any are, 
fail 409.
+2. **Build initial layout** — `ScalableTopicController.createMigratedMetadata`:
+   - N sealed legacy parent segments (or 1 for non-partitioned), each wrapping 
`persistent://t/n/x-partition-K` (or `persistent://t/n/x`). Each parent spans 
the full hash range.
+   - N active child segments (or 1) with equal-width contiguous hash ranges 
covering `[0x0000, 0xFFFF]`. Routing is range-based.
    - DAG edges: every child has all N parents as predecessors. The 
subscription controller's drain-before-assign protocol (already used for splits 
/ merges) preserves per-key ordering.
    - Set `epoch = 0`, `nextSegmentId = 2N`.
-4. **Atomic flip**: write `ScalableTopicMetadata` to `/topics/t/n/x` via 
metadata-store CAS. This is the commit point — once it succeeds, the topic is 
scalable.
-5. **Push the new layout to every connected lookup session.** V5 clients that 
were seeing the synthetic layout for this topic transition to the real DAG via 
the same machinery used for split / merge layout updates. No `TerminateTopic` 
is needed — the underlying managed ledgers don't change identity, only the 
layout-level segment names that wrap them do.
-6. **Release the lock**.
+3. **Create child segment topics**: materialize the backing topic for each new 
active child. Sealed legacy parents reuse the existing `persistent://` managed 
ledgers, so they are skipped.
+4. **Atomic flip**: write `ScalableTopicMetadata` to `/topics/t/n/x` via 
metadata-store CAS. This is the commit point — once it succeeds, the topic is 
scalable. Connected lookup sessions transition from the synthetic layout to the 
real DAG via the metadata-store watch.
+5. **Terminate the old topic(s)**: terminate `persistent://t/n/x` (or every 
partition, for a partitioned source) so no further v4 writes can land. 
Termination is exactly the sealed-parent state — existing data is still 
drainable by consumers, and the sealed legacy parent segments reference these 
terminated topics.
 
-Failures before step 4 leave the topic untouched; failures after step 4 leave 
the topic scalable. Step 5 is best-effort per session — late-joining lookups 
always read the freshest metadata directly.
+Failures before step 4 leave the topic untouched; failures after step 4 leave 
the topic scalable.
 
-### 3. Lookup-session guard for v4 clients
+### 3. v4 write guard
 
-The lookup session described above is V5-only. v4 clients use the older 
`CommandLookupTopic`. The broker must, for v4 lookups of `persistent://t/n/x` 
where `ScalableTopicMetadata` exists for `topic://t/n/x`, return a 
`TopicMigrated` redirect (binary protocol) / HTTP 410 Gone (REST). The error 
carries the new `topic://...` name. v4 clients translate this into a hard 
failure (they can't speak the scalable protocol); operators see a clear signal 
that some clients haven't upgraded yet.
+Step 5 above (terminating the old topics) is the primary guard: an 
already-connected v4 producer's `send()` fails with `TopicTerminatedError`, and 
a freshly-connected one hits the same wall. This keeps stale v4 producers from 
writing to a topic that has been migrated.
 
-This guard is what makes the "once scalable, always scalable" guarantee robust 
against stale v4 clients or v4-only tooling. V5 clients are guarded by the 
lookup session itself — once it has reported a real DAG, the broker only ever 
pushes refinements (split / merge), never a downgrade.
+To make the "once scalable, always scalable" guarantee robust even after the 
old (terminated, drained) topics are eventually removed by retention, the 
broker also refuses to **auto-create** a `persistent://t/n/x` topic when 
`ScalableTopicMetadata` exists for the equivalent `topic://t/n/x`. This 
prevents a stray v4 client from recreating a regular topic that would shadow 
the scalable one. The rejection reuses `TopicTerminatedError` (binary) / the 
equivalent REST status; no new error code  [...]
 
 ### 4. `migratedFrom` on `ScalableTopicMetadata`
 
@@ -263,24 +279,19 @@ No `legacyModNRouting` flag or other migration-specific 
routing knob is needed:
 ### REST API
 
 New endpoint:
-- `POST 
/admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable` — 
requires `produce` permission on the topic; no request body; 204 on success, 
409 if already scalable or v4 connections are still attached, 404 if topic 
missing.
+- `POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate` — 
requires `produce` permission on the topic; query param `force` (default 
`false`); no request body; 204 on success, 409 if already scalable or legacy v4 
connections are still attached (and `force` is not set), 404 if topic missing.
 
-Modified endpoints:
-- All v4 lookup endpoints: when scalable metadata exists for the equivalent 
`topic://...`, return a `TopicMigrated` redirect with the new name.
+Modified behaviour:
+- Auto-creation of a `persistent://t/n/x` topic is refused when scalable 
metadata exists for the equivalent `topic://t/n/x` (reusing 
`TopicTerminatedError`); together with the post-migration termination of the 
old topics, this keeps v4 clients off a migrated topic.
 
 ### Binary protocol
 
-- The existing scalable-topic lookup-session command (PIP-468) is extended to 
accept `persistent://...` and short-form names in addition to `topic://...`. 
For non-scalable topics it returns a synthetic layout with special-segment 
entries.
-- New error code `TopicMigrated` — sent by the broker on the v4 lookup command 
(`CommandLookupTopic`) when the requested `persistent://...` name is shadowed 
by an existing scalable topic. Carries the new `topic://...` name in the error 
payload. V5 clients never see this error because they always use the scalable 
lookup session.
-
-### Configuration
-
-New broker config:
-- `enableScalableTopicMigration` (default `true`) — kill switch for the 
migration command. Operators on regulated infra may want to disable.
+- The existing scalable-topic lookup-session command (PIP-468) is extended to 
accept `persistent://...` and short-form names in addition to `topic://...`. 
For non-scalable topics it returns a synthetic layout with legacy-segment 
entries.
+- No new error code: the v4 write guard reuses the existing 
`TopicTerminatedError`.
 
 ### CLI
 
-- `pulsar-admin scalable-topics migrate-to-scalable <topic>`
+- `pulsar-admin scalable-topics migrate <topic> [--force]`
 
 ### V5 SDK behavior
 
@@ -298,7 +309,7 @@ The upgrade story this PIP supports is:
 
 1. Upgrade brokers to the version containing this PIP. Brokers handle both old 
`persistent://` clients and new `topic://` clients side by side; no behavior 
change for existing topics.
 2. Upgrade applications to the V5 SDK at the operator's pace. No topic changes 
required; V5 SDK uses the wrapper path.
-3. Once all applications on a given topic are V5, run `migrate-to-scalable`. 
The migration is atomic and one-way.
+3. Once all applications on a given topic are V5, run `migrate`. The migration 
is atomic and one-way.
 
 Old client versions continue to work with the cluster on un-migrated topics.
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 75657577abb..bf0164879e2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -679,6 +679,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                             case GET_METADATA:
                                 return canLookupAsync(topicName, role, 
authData);
                             case PRODUCE:
+                            case MIGRATE_TO_SCALABLE:
                                 return canProduceAsync(topicName, role, 
authData);
                             case GET_SUBSCRIPTIONS:
                             case CONSUME:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 81a55da7f70..f76d9a11ff2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -54,10 +54,13 @@ import 
org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.scalable.ScalableTopicController;
 import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.scalable.ScalableTopicConstants;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentTopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -227,6 +230,193 @@ public class ScalableTopics extends AdminResource {
         }
     }
 
+    // --- Migrate (PIP-475 regular-to-scalable) ---
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/migrate")
+    @ApiOperation(value = "Migrate an existing regular (partitioned or 
non-partitioned) topic "
+            + "to a scalable topic.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Topic migrated successfully"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have produce permission 
on the topic"),
+            @ApiResponse(code = 404, message = "Topic doesn't exist"),
+            @ApiResponse(code = 409, message = "Already a scalable topic, or 
legacy v4 clients are "
+                    + "still connected and force was not set"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void migrateToScalable(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Migrate even if legacy v4 clients are still 
connected to the topic")
+            @QueryParam("force") @DefaultValue("false") boolean force) {
+        validateNamespaceName(tenant, namespace);
+        // The scalable topic's canonical identity uses the topic:// domain; 
the migration
+        // source is the same name in the persistent:// domain.
+        TopicName scalableName = TopicName.get(TopicDomain.topic.value(), 
namespaceName, encodedTopic);
+        TopicName persistentBase =
+                TopicName.get(TopicDomain.persistent.value(), namespaceName, 
encodedTopic);
+
+        validateTopicOperationAsync(persistentBase, 
TopicOperation.MIGRATE_TO_SCALABLE)
+                .thenCompose(__ -> doMigrateToScalableAsync(scalableName, 
persistentBase, force))
+                .thenAccept(__ -> {
+                    log.info().attr("clientAppId", 
clientAppId()).attr("topic", scalableName)
+                            .attr("force", force).log("Migrated topic to 
scalable");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof 
MetadataStoreException.AlreadyExistsException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.CONFLICT,
+                                "Topic is already scalable: " + scalableName));
+                    } else {
+                        log.error().attr("clientAppId", 
clientAppId()).attr("topic", scalableName)
+                                .exception(ex).log("Failed to migrate topic to 
scalable");
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
+    }
+
+    /**
+     * Orchestrate a regular-to-scalable migration:
+     * <ol>
+     *   <li>reject if scalable metadata already exists;</li>
+     *   <li>resolve the source topic's existence + partition count;</li>
+     *   <li>unless {@code force}, reject if any legacy v4 client is still 
connected;</li>
+     *   <li>build the migrated layout (sealed legacy parents + active 
children);</li>
+     *   <li>create the new child segment topics;</li>
+     *   <li>atomically write the scalable metadata (the commit point — 
connected V5 lookup
+     *       sessions transition from the synthetic layout to the real DAG via 
the metadata
+     *       watch);</li>
+     *   <li>terminate the old topics so no further v4 writes can land — they 
become the
+     *       drainable sealed parent segments.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> doMigrateToScalableAsync(TopicName 
scalableName,
+                                                             TopicName 
persistentBase, boolean force) {
+        return 
resources().getScalableTopicMetadataAsync(scalableName).thenCompose(existing -> 
{
+            if (existing.isPresent()) {
+                throw new RestException(Response.Status.CONFLICT,
+                        "Topic is already scalable: " + scalableName);
+            }
+            return 
pulsar().getNamespaceService().checkTopicExistsAsync(persistentBase);
+        }).thenCompose(existsInfo -> {
+            boolean exists = existsInfo.isExists();
+            int partitions = existsInfo.getPartitions();
+            existsInfo.recycle();
+            if (!exists) {
+                throw new RestException(Response.Status.NOT_FOUND,
+                        "Topic does not exist: " + persistentBase);
+            }
+            CompletableFuture<Void> precheck = force
+                    ? CompletableFuture.completedFuture(null)
+                    : checkNoLegacyConnectionsAsync(persistentBase, 
partitions);
+            return precheck.thenApply(__ -> partitions);
+        }).thenCompose(partitions -> {
+            ScalableTopicMetadata metadata =
+                    
ScalableTopicController.createMigratedMetadata(persistentBase, partitions);
+            return createMigratedChildTopicsAsync(scalableName, metadata)
+                    .thenCompose(__ -> 
resources().createScalableTopicAsync(scalableName, metadata))
+                    .thenCompose(__ -> 
terminateLegacyTopicsAsync(persistentBase, partitions));
+        });
+    }
+
+    /**
+     * Reject the migration if any producer/consumer attached to the source 
topic is a legacy
+     * v4 client — i.e. its metadata lacks the V5-managed marker. V5 clients 
(which attach to
+     * the synthetic layout's legacy segments and transition transparently) 
are excluded.
+     */
+    private CompletableFuture<Void> checkNoLegacyConnectionsAsync(TopicName 
persistentBase,
+                                                                  int 
partitions) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+        CompletableFuture<? extends TopicStats> statsFuture =
+                partitions > 0
+                        ? 
admin.topics().getPartitionedStatsAsync(persistentBase.toString(), false)
+                        : 
admin.topics().getStatsAsync(persistentBase.toString());
+        return statsFuture.thenAccept(stats -> {
+            long legacy = countLegacyConnections(stats);
+            if (legacy > 0) {
+                throw new RestException(Response.Status.CONFLICT,
+                        legacy + " legacy v4 client connection(s) still 
attached to " + persistentBase
+                                + "; disconnect them (or all clients are V5) 
before migrating, "
+                                + "or retry with force=true");
+            }
+        });
+    }
+
+    private static long countLegacyConnections(TopicStats stats) {
+        long count = 0;
+        for (var publisher : stats.getPublishers()) {
+            if (!isV5Managed(publisher.getMetadata())) {
+                count++;
+            }
+        }
+        for (var subscription : stats.getSubscriptions().values()) {
+            for (var consumer : subscription.getConsumers()) {
+                if (!isV5Managed(consumer.getMetadata())) {
+                    count++;
+                }
+            }
+        }
+        return count;
+    }
+
+    private static boolean isV5Managed(Map<String, String> metadata) {
+        return metadata != null && 
ScalableTopicConstants.V5_MANAGED_METADATA_VALUE
+                
.equals(metadata.get(ScalableTopicConstants.V5_MANAGED_METADATA_KEY));
+    }
+
+    /**
+     * Create the backing segment topic for each new <i>active child</i> in 
the migrated layout.
+     * The sealed legacy parents are skipped — they wrap existing {@code 
persistent://} topics
+     * that already have managed ledgers.
+     */
+    private CompletableFuture<Void> createMigratedChildTopicsAsync(
+            TopicName scalableName, ScalableTopicMetadata metadata) {
+        try {
+            var admin = pulsar().getAdminClient();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (SegmentInfo seg : metadata.getSegments().values()) {
+                if (seg.isLegacy()) {
+                    continue;
+                }
+                String segmentTopic = SegmentTopicName.fromParent(
+                        scalableName, seg.hashRange(), 
seg.segmentId()).toString();
+                
futures.add(admin.scalableTopics().createSegmentAsync(segmentTopic, List.of()));
+            }
+            return FutureUtil.waitForAll(futures);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    /**
+     * Terminate the old topic(s) so no further v4 writes can land. The 
terminated topics
+     * become the drainable sealed parent segments of the new scalable topic.
+     */
+    private CompletableFuture<Void> terminateLegacyTopicsAsync(TopicName 
persistentBase,
+                                                               int partitions) 
{
+        try {
+            var admin = pulsar().getAdminClient();
+            CompletableFuture<?> terminate = partitions > 0
+                    ? 
admin.topics().terminatePartitionedTopicAsync(persistentBase.toString())
+                    : 
admin.topics().terminateTopicAsync(persistentBase.toString());
+            return terminate.thenAccept(ignored -> { });
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
     // --- Get metadata ---
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 79e234bd727..d180048b1b3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -120,6 +120,7 @@ import 
org.apache.pulsar.broker.resources.DynamicConfigurationResources;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -3907,6 +3908,28 @@ public class BrokerService implements Closeable {
             return CompletableFuture.completedFuture(false);
         }
 
+        // PIP-475: never auto-create a persistent:// topic that has been 
migrated to a
+        // scalable topic — recreating it would shadow the scalable topic and 
let a stray v4
+        // client write to a name that is now owned by topic://t/n/x. The 
migrated topic's
+        // old ledgers are terminated (not deleted) at migration time, so this 
only bites
+        // once retention GC removes them and a v4 client looks the name up 
again. The check
+        // is metadata-cache-backed (present and absent verdicts are both 
cached).
+        if (topicName.getDomain() == TopicDomain.persistent
+                && pulsar.getConfiguration().isScalableTopicsEnabled()) {
+            ScalableTopicResources scalableResources =
+                    pulsar.getPulsarResources().getScalableTopicResources();
+            if (scalableResources != null) {
+                return 
scalableResources.getScalableTopicMetadataAsync(topicName.toScalableTopic())
+                        .thenCompose(scalableMetadata -> 
scalableMetadata.isPresent()
+                                ? CompletableFuture.completedFuture(false)
+                                : 
isAllowAutoTopicCreationResolvedAsync(topicName, policies));
+            }
+        }
+        return isAllowAutoTopicCreationResolvedAsync(topicName, policies);
+    }
+
+    private CompletableFuture<Boolean> isAllowAutoTopicCreationResolvedAsync(
+            final TopicName topicName, final Optional<Policies> policies) {
         //Other system topics can be created automatically
         if (pulsar.getConfiguration().isSystemTopicEnabled() && 
isSystemTopic(topicName)) {
             return CompletableFuture.completedFuture(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 44d2230e49e..c5501db56e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -1173,4 +1173,71 @@ public class ScalableTopicController {
                 .properties(properties != null ? properties : Map.of())
                 .build();
     }
+
+    /**
+     * Build the initial scalable-topic layout for a regular-to-scalable 
migration (PIP-475).
+     *
+     * <p>Each of the {@code partitions} old partitions (or the whole topic, 
for a
+     * non-partitioned source where {@code partitions <= 0}) becomes a 
<b>sealed legacy
+     * parent</b> segment that wraps the existing {@code persistent://...} 
topic. Alongside
+     * them, {@code N} fresh <b>active children</b> are created with 
equal-width contiguous
+     * hash ranges tiling {@code [0x0000, MAX_HASH]} and standard range-based 
routing.
+     *
+     * <p>The parents each span the <i>full</i> hash range because v4 
partitioned routing
+     * ({@code signSafeMod(hash, N)}) scattered keys for any child's range 
across every
+     * partition. Consequently every child lists <i>all</i> parents as 
predecessors: the
+     * subscription controller's drain-before-assign protocol then drains all 
parents before
+     * a consumer is assigned to any child, preserving per-key ordering across 
the migration.
+     *
+     * <p>Segment IDs: parents are {@code 0..N-1}, children are {@code 
N..2N-1},
+     * {@code nextSegmentId == 2N}.
+     *
+     * @param persistentBase the source topic in the {@code persistent://} 
domain (its
+     *                       partitions are {@code persistentBase-partition-K})
+     * @param partitions     the source partition count; {@code <= 0} means 
non-partitioned
+     */
+    public static ScalableTopicMetadata createMigratedMetadata(TopicName 
persistentBase,
+                                                               int partitions) 
{
+        int n = Math.max(partitions, 1);
+        long nowMs = System.currentTimeMillis();
+        Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+
+        // Child IDs are N..2N-1; every child lists every parent (full fan-in).
+        List<Long> childIds = new ArrayList<>(n);
+        for (int j = 0; j < n; j++) {
+            childIds.add((long) (n + j));
+        }
+        List<Long> parentIds = new ArrayList<>(n);
+        for (int k = 0; k < n; k++) {
+            parentIds.add((long) k);
+        }
+
+        // N sealed legacy parents — the old partitions, each spanning the 
full hash range.
+        for (int k = 0; k < n; k++) {
+            String legacyTopic = partitions <= 0
+                    ? persistentBase.toString()
+                    : persistentBase.getPartition(k).toString();
+            SegmentInfo parent = SegmentInfo
+                    .activeLegacy(k, HashRange.of(0x0000, HashRange.MAX_HASH), 
legacyTopic, 0, nowMs)
+                    .sealed(0, nowMs, childIds);
+            segments.put((long) k, parent);
+        }
+
+        // N active range-based children tiling [0x0000, MAX_HASH]; each has 
all parents.
+        int rangeSize = (HashRange.MAX_HASH + 1) / n;
+        for (int j = 0; j < n; j++) {
+            long segId = n + j;
+            int start = j * rangeSize;
+            int end = (j == n - 1) ? HashRange.MAX_HASH : (start + rangeSize - 
1);
+            SegmentInfo child = SegmentInfo.active(segId, HashRange.of(start, 
end), parentIds, 0, nowMs);
+            segments.put(segId, child);
+        }
+
+        return ScalableTopicMetadata.builder()
+                .epoch(0)
+                .nextSegmentId(2L * n)
+                .segments(segments)
+                .properties(Map.of())
+                .build();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicMigrationTest.java
new file mode 100644
index 00000000000..0c5f70f0d6d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicMigrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for the PIP-475 regular-to-scalable migration admin command
+ * ({@code admin.scalableTopics().migrateToScalable(...)}).
+ */
+public class ScalableTopicMigrationTest extends SharedPulsarBaseTest {
+
+    private String baseName(String suffix) {
+        return getNamespace() + "/" + suffix + "-" + 
UUID.randomUUID().toString().substring(0, 8);
+    }
+
+    @Test
+    public void testMigratePartitionedTopic() throws Exception {
+        String topic = baseName("part");
+        admin.topics().createPartitionedTopic(topic, 4);
+
+        admin.scalableTopics().migrateToScalable(topic, false);
+
+        // Scalable metadata now exists: 4 sealed legacy parents (ids 0..3) + 
4 active
+        // children (ids 4..7).
+        ScalableTopicMetadata md = 
admin.scalableTopics().getMetadata("topic://" + topic);
+        assertEquals(md.getSegments().size(), 8);
+        assertEquals(md.getNextSegmentId(), 8L);
+        for (long id = 0; id < 4; id++) {
+            ScalableTopicMetadata.SegmentInfo parent = 
md.getSegments().get(id);
+            assertTrue(parent.isSealed(), "parent " + id + " must be sealed");
+            assertTrue(parent.isLegacy(), "parent " + id + " must wrap a 
legacy topic");
+            assertEquals(parent.getLegacyTopicName(),
+                    "persistent://" + topic + "-partition-" + id);
+        }
+        for (long id = 4; id < 8; id++) {
+            ScalableTopicMetadata.SegmentInfo child = md.getSegments().get(id);
+            assertTrue(child.isActive(), "child " + id + " must be active");
+            assertFalse(child.isLegacy(), "child " + id + " must be a regular 
segment");
+        }
+    }
+
+    @Test
+    public void testMigrateNonPartitionedTopic() throws Exception {
+        String topic = baseName("np");
+        admin.topics().createNonPartitionedTopic("persistent://" + topic);
+
+        admin.scalableTopics().migrateToScalable(topic, false);
+
+        ScalableTopicMetadata md = 
admin.scalableTopics().getMetadata("topic://" + topic);
+        assertEquals(md.getSegments().size(), 2);
+        assertTrue(md.getSegments().get(0L).isSealed());
+        assertEquals(md.getSegments().get(0L).getLegacyTopicName(), 
"persistent://" + topic);
+        assertTrue(md.getSegments().get(1L).isActive());
+    }
+
+    @Test
+    public void testMigrateFailsWhenAlreadyScalable() throws Exception {
+        String topic = baseName("already");
+        admin.topics().createNonPartitionedTopic("persistent://" + topic);
+        admin.scalableTopics().migrateToScalable(topic, false);
+
+        // A second migration must be rejected — the topic is already scalable.
+        PulsarAdminException ex = expectThrows(PulsarAdminException.class,
+                () -> admin.scalableTopics().migrateToScalable(topic, false));
+        assertEquals(ex.getStatusCode(), 409);
+    }
+
+    @Test
+    public void testMigrateFailsForNonExistentTopic() {
+        String topic = baseName("ghost");
+        PulsarAdminException ex = expectThrows(PulsarAdminException.class,
+                () -> admin.scalableTopics().migrateToScalable(topic, false));
+        assertEquals(ex.getStatusCode(), 404);
+    }
+
+    @Test
+    public void testMigrateRejectsLegacyV4ConnectionWithoutForce() throws 
Exception {
+        String topic = baseName("legacy");
+        admin.topics().createNonPartitionedTopic("persistent://" + topic);
+
+        // A plain v4 producer carries no V5-managed marker → counts as a 
legacy connection.
+        @Cleanup
+        Producer<byte[]> v4Producer = pulsarClient.newProducer()
+                .topic("persistent://" + topic)
+                .create();
+
+        PulsarAdminException ex = expectThrows(PulsarAdminException.class,
+                () -> admin.scalableTopics().migrateToScalable(topic, false));
+        assertEquals(ex.getStatusCode(), 409);
+
+        // No scalable metadata should have been written.
+        PulsarAdminException notFound = 
expectThrows(PulsarAdminException.class,
+                () -> admin.scalableTopics().getMetadata("topic://" + topic));
+        assertEquals(notFound.getStatusCode(), 404);
+    }
+
+    @Test
+    public void testMigrateWithForceSucceedsDespiteLegacyConnection() throws 
Exception {
+        String topic = baseName("force");
+        admin.topics().createNonPartitionedTopic("persistent://" + topic);
+
+        @Cleanup
+        Producer<byte[]> v4Producer = pulsarClient.newProducer()
+                .topic("persistent://" + topic)
+                .create();
+
+        // force=true bypasses the legacy-connection precheck.
+        admin.scalableTopics().migrateToScalable(topic, true);
+
+        ScalableTopicMetadata md = 
admin.scalableTopics().getMetadata("topic://" + topic);
+        assertEquals(md.getSegments().size(), 2);
+    }
+
+    @Test
+    public void testAutoCreateBlockedWhenScalableTopicShadowsName() throws 
Exception {
+        // A scalable topic owns topic://t/n/x; the broker must refuse to 
auto-create the
+        // shadowing persistent://t/n/x (the long-term "once scalable, always 
scalable"
+        // guard that survives even after a migrated topic's old ledgers are 
GC'd).
+        String name = baseName("shadow");
+
+        // Sanity: before the scalable topic exists, auto-create of the 
persistent name is
+        // allowed (confirms the shared cluster has auto-creation enabled).
+        assertTrue(getPulsar().getBrokerService()
+                        
.isAllowAutoTopicCreationAsync(TopicName.get("persistent://" + name)).get(),
+                "auto-create should be allowed before the name is claimed by a 
scalable topic");
+
+        admin.scalableTopics().createScalableTopic("topic://" + name, 1);
+
+        assertFalse(getPulsar().getBrokerService()
+                        
.isAllowAutoTopicCreationAsync(TopicName.get("persistent://" + name)).get(),
+                "auto-create must be blocked when a scalable topic shadows the 
persistent:// name");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
index 3cdf1dc3ffc..a15971d1874 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
@@ -68,6 +68,13 @@ public abstract class SharedPulsarBaseTest {
         return pulsar.getBrokerServiceUrl();
     }
 
+    /**
+     * The shared {@link PulsarService}, for tests that need broker-internal 
accessors.
+     */
+    protected PulsarService getPulsar() {
+        return pulsar;
+    }
+
     /**
      * Returns the web service URL (http://...) for HTTP-based lookups and 
admin operations.
      */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index db274bc8712..4a0cca1eb78 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -811,4 +811,72 @@ public class ScalableTopicControllerTest {
         ConsumerRegistration b = new ConsumerRegistration();
         assertEquals(a, b);
     }
+
+    // --- createMigratedMetadata (PIP-475 regular-to-scalable migration) ---
+
+    @Test
+    public void testCreateMigratedMetadataForPartitionedTopic() {
+        // 3-partition source → 3 sealed legacy parents (ids 0..2, full range, 
wrapping
+        // each -partition-K) + 3 active range-based children (ids 3..5, full 
fan-in).
+        TopicName base = TopicName.get("persistent://tenant/ns/my-topic");
+        ScalableTopicMetadata md = 
ScalableTopicController.createMigratedMetadata(base, 3);
+
+        assertEquals(md.getEpoch(), 0L);
+        assertEquals(md.getNextSegmentId(), 6L);
+        assertEquals(md.getSegments().size(), 6);
+
+        // Parents 0..2: sealed legacy, full hash range, children = [3,4,5], 
no parents.
+        for (int k = 0; k < 3; k++) {
+            org.apache.pulsar.common.scalable.SegmentInfo parent = 
md.getSegments().get((long) k);
+            assertTrue(parent.isSealed(), "parent " + k + " must be sealed");
+            assertTrue(parent.isLegacy(), "parent " + k + " must be a legacy 
segment");
+            assertEquals(parent.legacyTopicName(),
+                    "persistent://tenant/ns/my-topic-partition-" + k);
+            assertEquals(parent.hashRange().start(), 0x0000);
+            assertEquals(parent.hashRange().end(), 
org.apache.pulsar.common.scalable.HashRange.MAX_HASH);
+            assertTrue(parent.parentIds().isEmpty());
+            assertEquals(parent.childIds(), java.util.List.of(3L, 4L, 5L));
+        }
+
+        // Children 3..5: active, range-based tiling, parents = [0,1,2], not 
legacy.
+        int expectedWidth = 
(org.apache.pulsar.common.scalable.HashRange.MAX_HASH + 1) / 3;
+        for (int j = 0; j < 3; j++) {
+            long id = 3 + j;
+            org.apache.pulsar.common.scalable.SegmentInfo child = 
md.getSegments().get(id);
+            assertTrue(child.isActive(), "child " + id + " must be active");
+            assertFalse(child.isLegacy(), "child " + id + " must be a regular 
segment");
+            assertEquals(child.parentIds(), java.util.List.of(0L, 1L, 2L));
+            assertTrue(child.childIds().isEmpty());
+            int expectedStart = j * expectedWidth;
+            assertEquals(child.hashRange().start(), expectedStart);
+        }
+        // Children tile the full space: first starts at 0, last ends at 
MAX_HASH.
+        assertEquals(md.getSegments().get(3L).hashRange().start(), 0x0000);
+        assertEquals(md.getSegments().get(5L).hashRange().end(),
+                org.apache.pulsar.common.scalable.HashRange.MAX_HASH);
+    }
+
+    @Test
+    public void testCreateMigratedMetadataForNonPartitionedTopic() {
+        // Non-partitioned source (partitions <= 0) → 1 sealed legacy parent 
wrapping the
+        // base persistent:// topic + 1 active child covering the full range.
+        TopicName base = TopicName.get("persistent://tenant/ns/np-topic");
+        ScalableTopicMetadata md = 
ScalableTopicController.createMigratedMetadata(base, 0);
+
+        assertEquals(md.getNextSegmentId(), 2L);
+        assertEquals(md.getSegments().size(), 2);
+
+        org.apache.pulsar.common.scalable.SegmentInfo parent = 
md.getSegments().get(0L);
+        assertTrue(parent.isSealed());
+        assertTrue(parent.isLegacy());
+        assertEquals(parent.legacyTopicName(), 
"persistent://tenant/ns/np-topic");
+        assertEquals(parent.childIds(), java.util.List.of(1L));
+
+        org.apache.pulsar.common.scalable.SegmentInfo child = 
md.getSegments().get(1L);
+        assertTrue(child.isActive());
+        assertFalse(child.isLegacy());
+        assertEquals(child.parentIds(), java.util.List.of(0L));
+        assertEquals(child.hashRange().start(), 0x0000);
+        assertEquals(child.hashRange().end(), 
org.apache.pulsar.common.scalable.HashRange.MAX_HASH);
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index f9fb01444a6..debf54231b9 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -108,6 +108,27 @@ public interface ScalableTopics {
     CompletableFuture<Void> createScalableTopicAsync(String topic, int 
numInitialSegments,
                                                       Map<String, String> 
properties);
 
+    /**
+     * Migrate an existing regular (partitioned or non-partitioned) topic to a 
scalable topic.
+     *
+     * <p>The old partitions become sealed parent segments of the new scalable 
topic and the
+     * old topics are terminated; new active segments take over. Fails if the 
topic is already
+     * scalable, if it doesn't exist, or if any legacy v4 client is still 
connected (unless
+     * {@code force} is set).
+     *
+     * @param topic Topic name in the format "tenant/namespace/topic"
+     * @param force Migrate even if legacy v4 clients are still connected
+     */
+    void migrateToScalable(String topic, boolean force) throws 
PulsarAdminException;
+
+    /**
+     * Migrate an existing regular topic to a scalable topic asynchronously.
+     *
+     * @param topic Topic name in the format "tenant/namespace/topic"
+     * @param force Migrate even if legacy v4 clients are still connected
+     */
+    CompletableFuture<Void> migrateToScalableAsync(String topic, boolean 
force);
+
     /**
      * Get scalable topic metadata.
      *
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
index 18175add461..6f3123228f1 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
@@ -70,6 +70,13 @@ public class ScalableTopicMetadata {
         private long createdAtEpoch;
         private long sealedAtEpoch;
 
+        /**
+         * For a legacy segment, the externally managed {@code 
persistent://...} topic this
+         * segment wraps; {@code null} for regular controller-managed 
segments. Populated for
+         * the sealed parent segments produced by a regular-to-scalable 
migration (PIP-475).
+         */
+        private String legacyTopicName;
+
         public boolean isActive() {
             return "ACTIVE".equals(state);
         }
@@ -77,6 +84,10 @@ public class ScalableTopicMetadata {
         public boolean isSealed() {
             return "SEALED".equals(state);
         }
+
+        public boolean isLegacy() {
+            return legacyTopicName != null && !legacyTopicName.isEmpty();
+        }
     }
 
     /**
diff --git 
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
 
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
index 3e9b95078da..4784ea9b1b9 100644
--- 
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
+++ 
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -118,11 +119,27 @@ public class ScalableTopicMetadataTest {
     @Test
     public void testSegmentInfoHelpersForUnknownStateAreFalse() {
         ScalableTopicMetadata.SegmentInfo seg = new 
ScalableTopicMetadata.SegmentInfo(
-                0L, hashRange(0, 0xFFFF), "UNKNOWN", List.of(), List.of(), 0L, 
-1L);
+                0L, hashRange(0, 0xFFFF), "UNKNOWN", List.of(), List.of(), 0L, 
-1L, null);
         assertFalse(seg.isActive());
         assertFalse(seg.isSealed());
     }
 
+    @Test
+    public void testSegmentInfoLegacyFlag() {
+        // A null/empty legacyTopicName is a regular controller-managed 
segment.
+        ScalableTopicMetadata.SegmentInfo regular = new 
ScalableTopicMetadata.SegmentInfo(
+                1L, hashRange(0, 0xFFFF), "ACTIVE", List.of(), List.of(), 0L, 
-1L, null);
+        assertFalse(regular.isLegacy());
+        assertNull(regular.getLegacyTopicName());
+
+        // A non-empty legacyTopicName marks a legacy segment wrapping a 
persistent:// topic.
+        ScalableTopicMetadata.SegmentInfo legacy = new 
ScalableTopicMetadata.SegmentInfo(
+                0L, hashRange(0, 0xFFFF), "SEALED", List.of(), List.of(2L), 
0L, 0L,
+                "persistent://tenant/ns/x-partition-0");
+        assertTrue(legacy.isLegacy());
+        assertEquals(legacy.getLegacyTopicName(), 
"persistent://tenant/ns/x-partition-0");
+    }
+
     @Test
     public void testSegmentInfoGetters() {
         ScalableTopicMetadata.SegmentInfo seg = sealedSegment(
@@ -166,7 +183,7 @@ public class ScalableTopicMetadataTest {
                                                                     long 
createdAtEpoch) {
         return new ScalableTopicMetadata.SegmentInfo(
                 id, hashRange(start, end), "ACTIVE",
-                List.of(), List.of(), createdAtEpoch, -1L);
+                List.of(), List.of(), createdAtEpoch, -1L, null);
     }
 
     private static ScalableTopicMetadata.SegmentInfo sealedSegment(long id, 
int start, int end,
@@ -176,7 +193,7 @@ public class ScalableTopicMetadataTest {
                                                                     long 
sealedAtEpoch) {
         return new ScalableTopicMetadata.SegmentInfo(
                 id, hashRange(start, end), "SEALED",
-                parents, children, createdAtEpoch, sealedAtEpoch);
+                parents, children, createdAtEpoch, sealedAtEpoch, null);
     }
 
     private static ScalableTopicMetadata.HashRange hashRange(int start, int 
end) {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 57d26c7011a..09a81ea26af 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -114,6 +114,20 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncPutRequest(path, entity);
     }
 
+    // --- Migrate ---
+
+    @Override
+    public void migrateToScalable(String topic, boolean force) throws 
PulsarAdminException {
+        sync(() -> migrateToScalableAsync(topic, force));
+    }
+
+    @Override
+    public CompletableFuture<Void> migrateToScalableAsync(String topic, 
boolean force) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn).path("migrate").queryParam("force", 
force);
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
     // --- Get metadata ---
 
     @Override
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index 0391653198f..5ef9f47f089 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -83,6 +83,24 @@ public class CmdScalableTopics extends CmdBase {
         }
     }
 
+    @Command(description = "Migrate an existing regular (partitioned or 
non-partitioned) topic"
+            + " to a scalable topic. Fails if legacy v4 clients are still 
connected unless"
+            + " --force is set.")
+    private class MigrateCmd extends CliCommand {
+        @Parameters(description = "tenant/namespace/topic", arity = "1")
+        private String topic;
+
+        @Option(names = {"-f", "--force"},
+                description = "Migrate even if legacy v4 clients are still 
connected")
+        private boolean force;
+
+        @Override
+        void run() throws Exception {
+            scalableTopics().migrateToScalable(topic, force);
+            print("Migrated topic " + topic + " to a scalable topic");
+        }
+    }
+
     @Command(description = "Get scalable topic metadata")
     private class GetMetadataCmd extends CliCommand {
         @Parameters(description = "tenant/namespace/topic", arity = "1")
@@ -204,6 +222,7 @@ public class CmdScalableTopics extends CmdBase {
         super("scalable-topics", admin);
         addCommand("list", new ListCmd());
         addCommand("create", new CreateCmd());
+        addCommand("migrate", new MigrateCmd());
         addCommand("get-metadata", new GetMetadataCmd());
         addCommand("stats", new GetStatsCmd());
         addCommand("delete", new DeleteCmd());
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 8eef5183ead..3ead54e6955 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -48,6 +48,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.ScalableTopicConstants;
 import org.apache.pulsar.common.util.Backoff;
 
 /**
@@ -426,6 +427,14 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
         // computed segment:// URI. attachTopicName() collapses both into the 
right URI.
         segConf.getTopicNames().add(segment.attachTopicName());
         segConf.setSubscriptionType(SubscriptionType.Shared);
+        // Only legacy segments wrap a persistent:// topic that the 
regular-to-scalable
+        // migration pre-check inspects, so mark just those connections as 
V5-managed —
+        // connections to real segment:// topics are never examined.
+        if (segment.isLegacy()) {
+            segConf.getProperties().put(
+                    ScalableTopicConstants.V5_MANAGED_METADATA_KEY,
+                    ScalableTopicConstants.V5_MANAGED_METADATA_VALUE);
+        }
         if (consumerConf.getConsumerName() != null) {
             segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
         }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index c6e70553002..9cc3d7b110e 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
+import org.apache.pulsar.common.scalable.ScalableTopicConstants;
 
 /**
  * V5 StreamConsumer implementation for scalable topics.
@@ -377,6 +378,14 @@ final class ScalableStreamConsumer<T>
         // computed segment:// URI. attachTopicName() collapses both into the 
right URI.
         segConf.getTopicNames().add(segment.attachTopicName());
         segConf.setSubscriptionType(SubscriptionType.Exclusive);
+        // Only legacy segments wrap a persistent:// topic that the 
regular-to-scalable
+        // migration pre-check inspects, so mark just those connections as 
V5-managed —
+        // connections to real segment:// topics are never examined.
+        if (segment.isLegacy()) {
+            segConf.getProperties().put(
+                    ScalableTopicConstants.V5_MANAGED_METADATA_KEY,
+                    ScalableTopicConstants.V5_MANAGED_METADATA_VALUE);
+        }
         if (consumerConf.getConsumerName() != null) {
             segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
         }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 70d0f7fd3ee..dd1540feee7 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
+import org.apache.pulsar.common.scalable.ScalableTopicConstants;
 
 /**
  * V5 Producer implementation for scalable topics.
@@ -490,14 +491,14 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             // Find the segment and the URI to attach the per-segment v4 
producer to.
             // Regular segments use the computed segment:// URI; legacy 
segments (synthetic
             // layouts wrapping an externally managed persistent:// topic) use 
that URI directly.
-            String attachTopicName = null;
+            ActiveSegment segment = null;
             for (var seg : activeSegments) {
                 if (seg.segmentId() == id) {
-                    attachTopicName = seg.attachTopicName();
+                    segment = seg;
                     break;
                 }
             }
-            if (attachTopicName == null) {
+            if (segment == null) {
                 return CompletableFuture.failedFuture(
                         new PulsarClientException("Segment " + id + " not 
found in active segments"));
             }
@@ -508,7 +509,15 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             // initialSequenceId, accessMode, properties, ...) and not just 
the few
             // fields explicitly carried over.
             var segConf = producerConf.clone();
-            segConf.setTopicName(attachTopicName);
+            segConf.setTopicName(segment.attachTopicName());
+            // Only legacy segments wrap a persistent:// topic that the 
regular-to-scalable
+            // migration pre-check (PIP-475) inspects, so mark just those 
connections as
+            // V5-managed — connections to real segment:// topics are never 
examined.
+            if (segment.isLegacy()) {
+                segConf.getProperties().put(
+                        ScalableTopicConstants.V5_MANAGED_METADATA_KEY,
+                        ScalableTopicConstants.V5_MANAGED_METADATA_VALUE);
+            }
             if (producerConf.getProducerName() != null
                     && !producerConf.getProducerName().isEmpty()) {
                 segConf.setProducerName(producerConf.getProducerName() + 
"-seg-" + id);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index 4c74ffcd8f0..dfcf86350cf 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -56,4 +56,8 @@ public enum TopicOperation {
     SET_REPLICATED_SUBSCRIPTION_STATUS,
     GET_REPLICATED_SUBSCRIPTION_STATUS,
     TRIM_TOPIC,
+
+    // PIP-475: migrate a regular topic to a scalable topic. Maps to PRODUCE 
in the default
+    // authorization provider; a custom provider may restrict it independently.
+    MIGRATE_TO_SCALABLE,
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/ScalableTopicConstants.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/ScalableTopicConstants.java
new file mode 100644
index 00000000000..eab445e675e
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/ScalableTopicConstants.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.scalable;
+
+/**
+ * Constants shared between the broker and the scalable-topic (V5) client.
+ */
+public final class ScalableTopicConstants {
+
+    private ScalableTopicConstants() {
+    }
+
+    /**
+     * Producer/consumer metadata key set by the V5 SDK on every per-segment v4
+     * producer and consumer it creates. Its presence marks a connection as
+     * V5-managed (driven by the scalable-topic surface) rather than a legacy 
v4
+     * client talking to the {@code persistent://} topic directly.
+     *
+     * <p>PIP-475 uses it during regular-to-scalable migration: the migration
+     * pre-check enumerates the producers/consumers attached to the source 
topic
+     * and treats any whose metadata lacks this key as a still-connected 
legacy v4
+     * client, which (absent {@code --force}) blocks the migration. V5 clients,
+     * which transition transparently via the lookup session, are excluded.
+     */
+    public static final String V5_MANAGED_METADATA_KEY = "__pulsar.v5.managed";
+
+    /** Value stored under {@link #V5_MANAGED_METADATA_KEY}. */
+    public static final String V5_MANAGED_METADATA_VALUE = "true";
+}

Reply via email to