This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c7a7df1603 [feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer 
(#25651)
7c7a7df1603 is described below

commit 7c7a7df1603197a9492bd1222ca19f90d0d2c430
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 2 06:09:23 2026 -0700

    [feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer (#25651)
---
 .../api/v5/V5MultiTopicQueueConsumerTest.java      | 149 ++++++
 .../api/v5/V5MultiTopicStreamConsumerTest.java     | 208 ++++++++
 .../pulsar/client/api/v5/QueueConsumerBuilder.java |  45 +-
 .../client/api/v5/StreamConsumerBuilder.java       |  29 +-
 .../org/apache/pulsar/client/api/v5/Examples.java  |  17 +-
 .../client/impl/v5/AsyncQueueConsumerV5.java       |   4 +-
 .../apache/pulsar/client/impl/v5/MessageIdV5.java  | 262 ++++++++--
 .../apache/pulsar/client/impl/v5/MessageV5.java    |  43 +-
 .../client/impl/v5/MultiTopicQueueConsumer.java    | 426 ++++++++++++++++
 .../client/impl/v5/MultiTopicStreamConsumer.java   | 542 +++++++++++++++++++++
 .../client/impl/v5/QueueConsumerBuilderV5.java     |  49 +-
 .../pulsar/client/impl/v5/QueueConsumerImpl.java   |  35 ++
 .../client/impl/v5/ScalableQueueConsumer.java      |  46 +-
 .../client/impl/v5/ScalableStreamConsumer.java     |  53 +-
 .../client/impl/v5/StreamConsumerBuilderV5.java    |  38 +-
 .../pulsar/client/impl/v5/MessageIdV5Test.java     |  58 +++
 16 files changed, 1872 insertions(+), 132 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java
new file mode 100644
index 00000000000..4cf5f0ff666
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for {@link QueueConsumerBuilder#namespace}: a multi-topic
+ * QueueConsumer follows the matching set live, multiplexes from every 
per-topic
+ * consumer into one user-visible queue, and routes individual acks back to the
+ * right topic for redelivery purposes.
+ */
+public class V5MultiTopicQueueConsumerTest extends V5ClientBaseTest {
+
+    private String topicName(String suffix) {
+        return "topic://" + getNamespace() + "/" + suffix + "-"
+                + UUID.randomUUID().toString().substring(0, 8);
+    }
+
+    @Test
+    public void receivesFromAllTopicsInNamespace() throws Exception {
+        String topicA = topicName("a");
+        String topicB = topicName("b");
+        admin.scalableTopics().createScalableTopic(topicA, 1);
+        admin.scalableTopics().createScalableTopic(topicB, 1);
+
+        @Cleanup
+        Producer<String> pa = 
v5Client.newProducer(Schema.string()).topic(topicA).create();
+        @Cleanup
+        Producer<String> pb = 
v5Client.newProducer(Schema.string()).topic(topicB).create();
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .namespace(getNamespace())
+                .subscriptionName("multi-q")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Send to both topics; the multi-topic consumer must receive both 
sets.
+        Set<String> expected = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            String va = "a-" + i;
+            String vb = "b-" + i;
+            pa.newMessage().value(va).send();
+            pb.newMessage().value(vb).send();
+            expected.add(va);
+            expected.add(vb);
+        }
+
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected.size() && System.currentTimeMillis() 
< deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                consumer.acknowledge(msg.id());
+            }
+        }
+        assertEquals(received, expected, "should receive every message 
produced to either topic");
+    }
+
+    @Test
+    public void picksUpTopicCreatedAfterSubscribe() throws Exception {
+        // Fresh namespace, no topics yet — initial snapshot is empty. 
Earliest so the
+        // race between "topic created" and "per-topic consumer attached via 
Diff" can't
+        // drop messages produced in that window.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .namespace(getNamespace())
+                .subscriptionName("multi-q-late")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Create a topic AFTER subscribe; the watcher's Diff event must 
trigger the
+        // consumer to attach.
+        String lateTopic = topicName("late");
+        admin.scalableTopics().createScalableTopic(lateTopic, 1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(lateTopic).create();
+        producer.newMessage().value("late-message").send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+        assertTrue(msg != null, "expected to receive message from late-added 
topic");
+        assertEquals(msg.value(), "late-message");
+        assertEquals(msg.topic(), lateTopic, "topic() should surface the 
parent scalable topic");
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception 
{
+        String aliceTopic = topicName("alice");
+        String bobTopic = topicName("bob");
+        admin.scalableTopics().createScalableTopic(aliceTopic, 1, 
Map.of("owner", "alice"));
+        admin.scalableTopics().createScalableTopic(bobTopic, 1, 
Map.of("owner", "bob"));
+
+        @Cleanup
+        Producer<String> pa = 
v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
+        @Cleanup
+        Producer<String> pb = 
v5Client.newProducer(Schema.string()).topic(bobTopic).create();
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .namespace(getNamespace(), Map.of("owner", "alice"))
+                .subscriptionName("multi-q-filter")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        pa.newMessage().value("alice-msg").send();
+        pb.newMessage().value("bob-msg").send();
+
+        // Only alice's message reaches this consumer.
+        Message<String> got = consumer.receive(Duration.ofSeconds(10));
+        assertTrue(got != null, "expected one message");
+        assertEquals(got.value(), "alice-msg");
+        consumer.acknowledge(got.id());
+
+        // Confirm bob's message never arrives within a generous window.
+        Message<String> empty = consumer.receive(Duration.ofSeconds(2));
+        assertTrue(empty == null, "bob's message must be filtered out, got " + 
empty);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java
new file mode 100644
index 00000000000..f3299b1c74b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for {@link StreamConsumerBuilder#namespace}: a multi-topic
+ * StreamConsumer follows the matching set live, multiplexes from every 
per-topic
+ * consumer into one user-visible queue, and supports cumulative ack across
+ * topics via per-message position vectors.
+ */
+public class V5MultiTopicStreamConsumerTest extends V5ClientBaseTest {
+
+    private String topicName(String suffix) {
+        return "topic://" + getNamespace() + "/" + suffix + "-"
+                + UUID.randomUUID().toString().substring(0, 8);
+    }
+
+    @Test
+    public void receivesFromAllTopicsInNamespace() throws Exception {
+        String topicA = topicName("a");
+        String topicB = topicName("b");
+        admin.scalableTopics().createScalableTopic(topicA, 1);
+        admin.scalableTopics().createScalableTopic(topicB, 1);
+
+        @Cleanup
+        Producer<String> pa = 
v5Client.newProducer(Schema.string()).topic(topicA).create();
+        @Cleanup
+        Producer<String> pb = 
v5Client.newProducer(Schema.string()).topic(topicB).create();
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .namespace(getNamespace())
+                .subscriptionName("multi-stream")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Set<String> expected = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            String va = "a-" + i;
+            String vb = "b-" + i;
+            pa.newMessage().value(va).send();
+            pb.newMessage().value(vb).send();
+            expected.add(va);
+            expected.add(vb);
+        }
+
+        Set<String> received = new HashSet<>();
+        MessageId last = null;
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected.size() && System.currentTimeMillis() 
< deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                last = msg.id();
+            }
+        }
+        assertEquals(received, expected, "should receive every message 
produced to either topic");
+        // Cumulative ack must succeed across both per-topic consumers — 
exercises the
+        // multi-topic position vector embedded in the message id.
+        assertNotNull(last);
+        consumer.acknowledgeCumulative(last);
+    }
+
+    @Test
+    public void picksUpTopicCreatedAfterSubscribe() throws Exception {
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .namespace(getNamespace())
+                .subscriptionName("multi-stream-late")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        String lateTopic = topicName("late");
+        admin.scalableTopics().createScalableTopic(lateTopic, 1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(lateTopic).create();
+        producer.newMessage().value("late-message").send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+        assertTrue(msg != null, "expected to receive message from late-added 
topic");
+        assertEquals(msg.value(), "late-message");
+        assertEquals(msg.topic(), lateTopic, "topic() should surface the 
parent scalable topic");
+        consumer.acknowledgeCumulative(msg.id());
+    }
+
+    @Test
+    public void cumulativeAckCoversEveryTopicSeenSoFar() throws Exception {
+        // Two topics, interleaved producers. After we cumulatively ack the 
LAST message,
+        // closing and re-subscribing must NOT redeliver any of the previous 
messages —
+        // that would only happen if the per-topic ack didn't fire for the 
topic that's
+        // not the message's own.
+        String topicA = topicName("a");
+        String topicB = topicName("b");
+        admin.scalableTopics().createScalableTopic(topicA, 1);
+        admin.scalableTopics().createScalableTopic(topicB, 1);
+
+        @Cleanup
+        Producer<String> pa = 
v5Client.newProducer(Schema.string()).topic(topicA).create();
+        @Cleanup
+        Producer<String> pb = 
v5Client.newProducer(Schema.string()).topic(topicB).create();
+
+        StreamConsumer<String> first = 
v5Client.newStreamConsumer(Schema.string())
+                .namespace(getNamespace())
+                .subscriptionName("multi-stream-cumulative")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 5;
+        for (int i = 0; i < n; i++) {
+            pa.newMessage().value("a-" + i).send();
+            pb.newMessage().value("b-" + i).send();
+        }
+
+        // Drain everything, remember the last message id.
+        Set<String> drained = new HashSet<>();
+        MessageId last = null;
+        long deadline = System.currentTimeMillis() + 20_000L;
+        while (drained.size() < 2 * n && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = first.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                drained.add(msg.value());
+                last = msg.id();
+            }
+        }
+        assertEquals(drained.size(), 2 * n, "first consumer should drain every 
message");
+        assertNotNull(last);
+        // Cumulative ack — should fan out to BOTH topics' per-segment acks.
+        first.acknowledgeCumulative(last);
+        // Block briefly so the async ack flushes through to the broker before 
we close.
+        Thread.sleep(500);
+        first.close();
+
+        // Re-subscribe with the same name. If the cumulative ack covered both 
topics,
+        // there's nothing to re-deliver. If it only acked the message's OWN 
topic, the
+        // other topic would re-deliver from the start.
+        @Cleanup
+        StreamConsumer<String> second = 
v5Client.newStreamConsumer(Schema.string())
+                .namespace(getNamespace())
+                .subscriptionName("multi-stream-cumulative")
+                .subscribe();
+
+        Message<String> stale = second.receive(Duration.ofSeconds(2));
+        assertTrue(stale == null,
+                "cumulative ack should have covered every topic; got 
redelivery: "
+                        + (stale != null ? stale.value() : ""));
+    }
+
+    @Test
+    public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception 
{
+        String aliceTopic = topicName("alice");
+        String bobTopic = topicName("bob");
+        admin.scalableTopics().createScalableTopic(aliceTopic, 1, 
Map.of("owner", "alice"));
+        admin.scalableTopics().createScalableTopic(bobTopic, 1, 
Map.of("owner", "bob"));
+
+        @Cleanup
+        Producer<String> pa = 
v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
+        @Cleanup
+        Producer<String> pb = 
v5Client.newProducer(Schema.string()).topic(bobTopic).create();
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .namespace(getNamespace(), Map.of("owner", "alice"))
+                .subscriptionName("multi-stream-filter")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        pa.newMessage().value("alice-msg").send();
+        pb.newMessage().value("bob-msg").send();
+
+        Message<String> got = consumer.receive(Duration.ofSeconds(10));
+        assertTrue(got != null, "expected one message");
+        assertEquals(got.value(), "alice-msg");
+
+        Message<String> empty = consumer.receive(Duration.ofSeconds(2));
+        assertTrue(empty == null, "bob's message must be filtered out, got " + 
empty);
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index a6249efaca3..db03f3c5469 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -19,10 +19,8 @@
 package org.apache.pulsar.client.api.v5;
 
 import java.time.Duration;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
@@ -52,38 +50,37 @@ public interface QueueConsumerBuilder<T> {
     CompletableFuture<QueueConsumer<T>> subscribeAsync();
 
     // --- Topic selection ---
+    // Either {@link #topic(String)} or {@link #namespace} must be set, not 
both.
 
     /**
-     * The topic(s) to subscribe to.
+     * Subscribe to a single scalable topic by name.
      *
-     * @param topicNames one or more topic names
+     * @param topicName the fully-qualified topic name (e.g. {@code 
topic://tenant/ns/name})
      * @return this builder instance for chaining
      */
-    QueueConsumerBuilder<T> topic(String... topicNames);
+    QueueConsumerBuilder<T> topic(String topicName);
 
     /**
-     * The topics to subscribe to.
+     * Subscribe to every scalable topic under a namespace. The matching set 
follows
+     * live: when topics are created in or deleted from the namespace, the 
consumer
+     * attaches / detaches automatically.
      *
-     * @param topicNames the list of topic names
+     * @param namespace the namespace in {@code tenant/namespace} form
      * @return this builder instance for chaining
      */
-    QueueConsumerBuilder<T> topics(List<String> topicNames);
+    QueueConsumerBuilder<T> namespace(String namespace);
 
     /**
-     * Subscribe to all topics matching a regex pattern.
+     * Subscribe to scalable topics under a namespace whose properties match 
every
+     * key/value pair in {@code propertyFilters} (AND semantics). An empty map 
is
+     * equivalent to {@link #namespace(String)} — every topic in the namespace.
+     * The matching set follows live as topic properties change.
      *
-     * @param pattern the compiled regex pattern to match topic names against
+     * @param namespace       the namespace in {@code tenant/namespace} form
+     * @param propertyFilters property name/value pairs that all must match
      * @return this builder instance for chaining
      */
-    QueueConsumerBuilder<T> topicsPattern(Pattern pattern);
-
-    /**
-     * Subscribe to all topics matching a regex pattern (string form).
-     *
-     * @param regex the regex pattern string to match topic names against
-     * @return this builder instance for chaining
-     */
-    QueueConsumerBuilder<T> topicsPattern(String regex);
+    QueueConsumerBuilder<T> namespace(String namespace, Map<String, String> 
propertyFilters);
 
     // --- Subscription ---
 
@@ -190,16 +187,6 @@ public interface QueueConsumerBuilder<T> {
      */
     QueueConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy policy);
 
-    // --- Pattern subscription ---
-
-    /**
-     * How often to re-discover topics matching the pattern.
-     *
-     * @param interval the auto-discovery interval
-     * @return this builder instance for chaining
-     */
-    QueueConsumerBuilder<T> patternAutoDiscoveryPeriod(Duration interval);
-
     // --- Encryption ---
 
     /**
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
index ad6f0df0da6..0963f792035 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
@@ -48,14 +48,37 @@ public interface StreamConsumerBuilder<T> {
     CompletableFuture<StreamConsumer<T>> subscribeAsync();
 
     // --- Required ---
+    // Either {@link #topic(String)} or {@link #namespace} must be set, not 
both.
 
     /**
-     * The topic(s) to subscribe to.
+     * Subscribe to a single scalable topic by name.
      *
-     * @param topicNames one or more topic names
+     * @param topicName the fully-qualified topic name (e.g. {@code 
topic://tenant/ns/name})
      * @return this builder instance for chaining
      */
-    StreamConsumerBuilder<T> topic(String... topicNames);
+    StreamConsumerBuilder<T> topic(String topicName);
+
+    /**
+     * Subscribe to every scalable topic under a namespace. The matching set 
follows
+     * live: when topics are created in or deleted from the namespace, the 
consumer
+     * attaches / detaches automatically.
+     *
+     * @param namespace the namespace in {@code tenant/namespace} form
+     * @return this builder instance for chaining
+     */
+    StreamConsumerBuilder<T> namespace(String namespace);
+
+    /**
+     * Subscribe to scalable topics under a namespace whose properties match 
every
+     * key/value pair in {@code propertyFilters} (AND semantics). An empty map 
is
+     * equivalent to {@link #namespace(String)} — every topic in the namespace.
+     * The matching set follows live as topic properties change.
+     *
+     * @param namespace       the namespace in {@code tenant/namespace} form
+     * @param propertyFilters property name/value pairs that all must match
+     * @return this builder instance for chaining
+     */
+    StreamConsumerBuilder<T> namespace(String namespace, Map<String, String> 
propertyFilters);
 
     /**
      * The subscription name.
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index 2cc17d67b68..582a41a0574 100644
--- 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api.v5;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.v5.async.AsyncProducer;
 import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
@@ -394,15 +395,21 @@ public class Examples {
     }
 
     // 
==================================================================================
-    // 8. Multi-topic queue consumer with pattern
+    // 8. Multi-topic queue consumer over a namespace, optionally filtered by 
property
     // 
==================================================================================
 
-    /** Subscribe to all topics matching a pattern. */
-    void patternSubscription(PulsarClient client) throws Exception {
+    /**
+     * Subscribe to every scalable topic in a namespace whose properties match 
the
+     * given key/value pairs. The matching set follows live: when topics are 
created
+     * with matching properties, the consumer attaches automatically; when 
they're
+     * deleted or change properties out of the filter, it detaches. Pass
+     * {@code Map.of()} (or use the single-arg overload) to subscribe to every
+     * scalable topic in the namespace.
+     */
+    void namespaceSubscription(PulsarClient client) throws Exception {
         try (var consumer = client.newQueueConsumer(Schema.string())
-                .topicsPattern("persistent://public/default/events-.*")
+                .namespace("public/default", Map.of("kind", "events"))
                 .subscriptionName("all-events")
-                .patternAutoDiscoveryPeriod(Duration.ofMinutes(1))
                 .subscribe()) {
 
             while (true) {
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
index f66e1227794..7242b5e53f7 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
@@ -29,9 +29,9 @@ import 
org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
  */
 final class AsyncQueueConsumerV5<T> implements AsyncQueueConsumer<T> {
 
-    private final ScalableQueueConsumer<T> consumer;
+    private final QueueConsumerImpl<T> consumer;
 
-    AsyncQueueConsumerV5(ScalableQueueConsumer<T> consumer) {
+    AsyncQueueConsumerV5(QueueConsumerImpl<T> consumer) {
         this.consumer = consumer;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
index 3a39986f3c7..97a47eff60f 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
@@ -20,9 +20,16 @@ package org.apache.pulsar.client.impl.v5;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.MessageId;
+
+// The v5 MessageId interface and the underlying v4 MessageId class share the 
same simple
+// name in different packages — a real conflict that Java imports can't 
resolve. We import
+// the v4 type as `MessageId` (used heavily) and refer to v5 via FQN in the 
few places it
+// appears (the `implements` clause and `compareTo`).
 
 /**
  * V5 MessageId implementation that wraps a v4 MessageId and includes a 
position
@@ -36,17 +43,20 @@ import org.apache.pulsar.client.api.v5.MessageId;
  * <p>For non-cumulative consumers (QueueConsumer) and readers 
(CheckpointConsumer),
  * only the single segment ID and v4 message ID are needed; the position 
vector is
  * empty.
+ *
+ * <p>Multi-topic consumer wrappers additionally tag the id with the parent 
scalable
+ * topic (for ack routing) and a cross-topic position vector (for cumulative 
ack
+ * across topics). Both fields are optional and round-trip through
+ * {@link #toByteArray} / {@link #fromByteArray}.
  */
-public final class MessageIdV5 implements MessageId {
+public final class MessageIdV5 implements 
org.apache.pulsar.client.api.v5.MessageId {
 
     static final long NO_SEGMENT = -1;
 
-    static final MessageIdV5 EARLIEST = new MessageIdV5(
-            org.apache.pulsar.client.api.MessageId.earliest, NO_SEGMENT, 
Map.of());
-    static final MessageIdV5 LATEST = new MessageIdV5(
-            org.apache.pulsar.client.api.MessageId.latest, NO_SEGMENT, 
Map.of());
+    static final MessageIdV5 EARLIEST = new MessageIdV5(MessageId.earliest, 
NO_SEGMENT, Map.of());
+    static final MessageIdV5 LATEST = new MessageIdV5(MessageId.latest, 
NO_SEGMENT, Map.of());
 
-    private final org.apache.pulsar.client.api.MessageId v4MessageId;
+    private final MessageId v4MessageId;
     private final long segmentId;
 
     /**
@@ -54,30 +64,73 @@ public final class MessageIdV5 implements MessageId {
      * taken at the moment this message was delivered to the application.
      * Used by StreamConsumer for cumulative ack across all segments.
      */
-    private final Map<Long, org.apache.pulsar.client.api.MessageId> 
positionVector;
+    private final Map<Long, MessageId> positionVector;
+
+    /**
+     * Parent scalable topic this message was delivered from. Set only by 
multi-topic
+     * consumer wrappers so they can route a subsequent ack back to the right
+     * per-topic consumer; {@code null} for single-topic consumers (where the 
consumer
+     * already knows its topic).
+     */
+    private final String parentTopic;
+
+    /**
+     * Cross-topic position vector for multi-topic StreamConsumer cumulative 
ack.
+     * Maps parent topic name → (segment id → latest-delivered msgId at the 
moment
+     * this message entered the multiplexed queue). {@code null} for 
single-topic
+     * messages — use {@link #positionVector()} instead.
+     */
+    private final Map<String, Map<Long, MessageId>> multiTopicVector;
 
     /**
-     * Create a MessageIdV5 with a position vector for cumulative ack support.
+     * Create a MessageIdV5 with a position vector for single-topic cumulative 
ack.
      */
-    public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId,
+    public MessageIdV5(MessageId v4MessageId,
                        long segmentId,
-                       Map<Long, org.apache.pulsar.client.api.MessageId> 
positionVector) {
-        this.v4MessageId = Objects.requireNonNull(v4MessageId);
-        this.segmentId = segmentId;
-        this.positionVector = Map.copyOf(positionVector);
+                       Map<Long, MessageId> positionVector) {
+        this(v4MessageId, segmentId, positionVector, null, null);
+    }
+
+    /**
+     * Constructor for multi-topic queue consumer messages: parent topic tag 
for ack
+     * routing; no cross-topic position vector (queue consumers don't do 
cumulative
+     * ack).
+     */
+    public MessageIdV5(MessageId v4MessageId,
+                       long segmentId,
+                       Map<Long, MessageId> positionVector,
+                       String parentTopic) {
+        this(v4MessageId, segmentId, positionVector, parentTopic, null);
     }
 
     /**
      * Create a MessageIdV5 without a position vector (for individual ack / 
reader use).
      */
-    public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId, 
long segmentId) {
-        this(v4MessageId, segmentId, Map.of());
+    public MessageIdV5(MessageId v4MessageId, long segmentId) {
+        this(v4MessageId, segmentId, Map.of(), null, null);
+    }
+
+    /**
+     * Full constructor for multi-topic StreamConsumer messages: parent topic 
+ the
+     * cross-topic position vector captured at enqueue time. Used by the 
multi-topic
+     * stream consumer's pump.
+     */
+    public MessageIdV5(MessageId v4MessageId,
+                       long segmentId,
+                       Map<Long, MessageId> positionVector,
+                       String parentTopic,
+                       Map<String, Map<Long, MessageId>> multiTopicVector) {
+        this.v4MessageId = Objects.requireNonNull(v4MessageId);
+        this.segmentId = segmentId;
+        this.positionVector = Map.copyOf(positionVector);
+        this.parentTopic = parentTopic;
+        this.multiTopicVector = multiTopicVector;
     }
 
     /**
      * Get the underlying v4 MessageId. Package-private for internal use.
      */
-    org.apache.pulsar.client.api.MessageId v4MessageId() {
+    MessageId v4MessageId() {
         return v4MessageId;
     }
 
@@ -92,36 +145,123 @@ public final class MessageIdV5 implements MessageId {
      * Get the position vector — the latest delivered message ID per segment 
at the
      * time this message was delivered. Used by StreamConsumer for cumulative 
ack.
      */
-    Map<Long, org.apache.pulsar.client.api.MessageId> positionVector() {
+    Map<Long, MessageId> positionVector() {
         return positionVector;
     }
 
+    /**
+     * Parent scalable topic when this message was delivered through a 
multi-topic
+     * consumer; {@code null} for single-topic consumers. Package-private — 
used by
+     * multi-topic ack routing only.
+     */
+    String parentTopic() {
+        return parentTopic;
+    }
+
+    Map<String, Map<Long, MessageId>> multiTopicVector() {
+        return multiTopicVector;
+    }
+
+    /**
+     * Wire format. All sections are length-prefixed so the reader can detect
+     * absent trailing sections (older serialised forms wrote only sections 
1-3).
+     *
+     * <pre>
+     * 1. segmentId               : 8 bytes
+     * 2. v4MessageId             : 4-byte length + bytes
+     * 3. positionVector          : 4-byte count + repeating { 8-byte segId,
+     *                                                          4-byte len, 
idBytes }
+     * 4. parentTopic             : 4-byte length (-1 = null) + UTF-8 bytes
+     * 5. multiTopicVector        : 4-byte count (-1 = null) + repeating {
+     *                                4-byte topic name length, UTF-8 bytes,
+     *                                4-byte segCount, repeating { 8-byte 
segId,
+     *                                                              4-byte 
len, idBytes } }
+     * </pre>
+     *
+     * <p>Sections 4 and 5 are present in every new id; older serialisations 
from a
+     * pre-multi-topic build are still readable — the reader treats the missing
+     * sections as null.
+     */
     @Override
     public byte[] toByteArray() {
         byte[] v4Bytes = v4MessageId.toByteArray();
-        // Format: [8 bytes segmentId] [4 bytes v4Length] [v4Bytes]
-        //         [4 bytes numPositions] [for each: [8 bytes segId] [4 bytes 
idLen] [idBytes]]
-        int totalSize = 8 + 4 + v4Bytes.length + 4;
-        var serializedPositions = new java.util.HashMap<Long, byte[]>();
-        for (var entry : positionVector.entrySet()) {
-            byte[] idBytes = entry.getValue().toByteArray();
-            serializedPositions.put(entry.getKey(), idBytes);
-            totalSize += 8 + 4 + idBytes.length;
+
+        // Pre-serialise position-vector entries so we can size the buffer.
+        Map<Long, byte[]> serializedPositions = 
serializeSegmentVector(positionVector);
+        int positionBytes = 4; // count
+        for (var entry : serializedPositions.entrySet()) {
+            positionBytes += 8 + 4 + entry.getValue().length;
+        }
+
+        // Section 4: parent topic.
+        byte[] parentTopicBytes = parentTopic == null
+                ? null : parentTopic.getBytes(StandardCharsets.UTF_8);
+
+        // Section 5: pre-serialise the multi-topic vector tree.
+        Map<byte[], Map<Long, byte[]>> serializedMulti = null;
+        int multiBytes = 4; // count or -1
+        if (multiTopicVector != null) {
+            serializedMulti = new HashMap<>(multiTopicVector.size());
+            for (var entry : multiTopicVector.entrySet()) {
+                byte[] topicBytes = 
entry.getKey().getBytes(StandardCharsets.UTF_8);
+                Map<Long, byte[]> inner = 
serializeSegmentVector(entry.getValue());
+                serializedMulti.put(topicBytes, inner);
+                multiBytes += 4 + topicBytes.length + 4;
+                for (var seg : inner.entrySet()) {
+                    multiBytes += 8 + 4 + seg.getValue().length;
+                }
+            }
         }
 
+        int totalSize = 8 + 4 + v4Bytes.length + positionBytes
+                + 4 + (parentTopicBytes == null ? 0 : parentTopicBytes.length)
+                + multiBytes;
+
         ByteBuffer buf = ByteBuffer.allocate(totalSize);
         buf.putLong(segmentId);
         buf.putInt(v4Bytes.length);
         buf.put(v4Bytes);
-        buf.putInt(positionVector.size());
+
+        buf.putInt(serializedPositions.size());
         for (var entry : serializedPositions.entrySet()) {
             buf.putLong(entry.getKey());
             buf.putInt(entry.getValue().length);
             buf.put(entry.getValue());
         }
+
+        if (parentTopicBytes == null) {
+            buf.putInt(-1);
+        } else {
+            buf.putInt(parentTopicBytes.length);
+            buf.put(parentTopicBytes);
+        }
+
+        if (serializedMulti == null) {
+            buf.putInt(-1);
+        } else {
+            buf.putInt(serializedMulti.size());
+            for (var entry : serializedMulti.entrySet()) {
+                buf.putInt(entry.getKey().length);
+                buf.put(entry.getKey());
+                buf.putInt(entry.getValue().size());
+                for (var seg : entry.getValue().entrySet()) {
+                    buf.putLong(seg.getKey());
+                    buf.putInt(seg.getValue().length);
+                    buf.put(seg.getValue());
+                }
+            }
+        }
         return buf.array();
     }
 
+    private static Map<Long, byte[]> serializeSegmentVector(Map<Long, 
MessageId> vector) {
+        Map<Long, byte[]> out = new HashMap<>(vector.size());
+        for (var entry : vector.entrySet()) {
+            out.put(entry.getKey(), entry.getValue().toByteArray());
+        }
+        return out;
+    }
+
     static MessageIdV5 fromByteArray(byte[] data) throws IOException {
         if (data == null || data.length < 12) {
             throw new IOException("Invalid MessageIdV5 data: too short");
@@ -134,30 +274,66 @@ public final class MessageIdV5 implements MessageId {
         }
         byte[] v4Bytes = new byte[v4Length];
         buf.get(v4Bytes);
-        org.apache.pulsar.client.api.MessageId v4Id =
-                org.apache.pulsar.client.api.MessageId.fromByteArray(v4Bytes);
+        MessageId v4Id = MessageId.fromByteArray(v4Bytes);
+
+        // Section 3: position vector (single-topic / per-segment).
+        Map<Long, MessageId> positions = Map.of();
+        if (buf.hasRemaining()) {
+            positions = readSegmentVector(buf);
+        }
+
+        // Section 4: parent topic. Length -1 sentinel means "absent".
+        String parentTopic = null;
+        if (buf.hasRemaining()) {
+            int parentLen = buf.getInt();
+            if (parentLen >= 0) {
+                if (parentLen > buf.remaining()) {
+                    throw new IOException("Invalid MessageIdV5 data: bad 
parent-topic length");
+                }
+                byte[] parentBytes = new byte[parentLen];
+                buf.get(parentBytes);
+                parentTopic = new String(parentBytes, StandardCharsets.UTF_8);
+            }
+        }
 
-        // Read position vector if present
-        Map<Long, org.apache.pulsar.client.api.MessageId> positions = Map.of();
+        // Section 5: cross-topic vector. Count -1 sentinel means "absent".
+        Map<String, Map<Long, MessageId>> multiTopic = null;
         if (buf.hasRemaining()) {
-            int numPositions = buf.getInt();
-            var posMap = new java.util.HashMap<Long, 
org.apache.pulsar.client.api.MessageId>();
-            for (int i = 0; i < numPositions; i++) {
-                long posSegId = buf.getLong();
-                int idLen = buf.getInt();
-                byte[] idBytes = new byte[idLen];
-                buf.get(idBytes);
-                posMap.put(posSegId,
-                        
org.apache.pulsar.client.api.MessageId.fromByteArray(idBytes));
+            int topicCount = buf.getInt();
+            if (topicCount >= 0) {
+                multiTopic = new HashMap<>(topicCount);
+                for (int i = 0; i < topicCount; i++) {
+                    int topicLen = buf.getInt();
+                    byte[] topicBytes = new byte[topicLen];
+                    buf.get(topicBytes);
+                    String topic = new String(topicBytes, 
StandardCharsets.UTF_8);
+                    Map<Long, MessageId> inner = readSegmentVector(buf);
+                    multiTopic.put(topic, inner);
+                }
             }
-            positions = posMap;
         }
 
-        return new MessageIdV5(v4Id, segmentId, positions);
+        return new MessageIdV5(v4Id, segmentId, positions, parentTopic, 
multiTopic);
+    }
+
+    private static Map<Long, MessageId> readSegmentVector(ByteBuffer buf) 
throws IOException {
+        int count = buf.getInt();
+        Map<Long, MessageId> out = new HashMap<>(count);
+        for (int i = 0; i < count; i++) {
+            long segId = buf.getLong();
+            int idLen = buf.getInt();
+            if (idLen < 0 || idLen > buf.remaining()) {
+                throw new IOException("Invalid MessageIdV5 data: bad inner id 
length");
+            }
+            byte[] idBytes = new byte[idLen];
+            buf.get(idBytes);
+            out.put(segId, MessageId.fromByteArray(idBytes));
+        }
+        return out;
     }
 
     @Override
-    public int compareTo(MessageId other) {
+    public int compareTo(org.apache.pulsar.client.api.v5.MessageId other) {
         if (!(other instanceof MessageIdV5 o)) {
             throw new IllegalArgumentException("Cannot compare with " + 
other.getClass());
         }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
index 910bb114e51..1a9444911f4 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
@@ -31,13 +31,19 @@ final class MessageV5<T> implements Message<T> {
 
     private final org.apache.pulsar.client.api.Message<T> v4Message;
     private final MessageIdV5 messageId;
+    /**
+     * Optional override for {@link #topic()}. Set by multi-topic consumer 
wrappers to
+     * the parent scalable topic name (the v4 message's own topic is the 
segment topic,
+     * which is internal). Null for single-topic consumers — {@code topic()} 
falls back
+     * to the v4 message topic in that case.
+     */
+    private final String topicOverride;
 
     /**
      * Create with a simple segment ID (for queue consumer, checkpoint 
consumer, producer).
      */
     MessageV5(org.apache.pulsar.client.api.Message<T> v4Message, long 
segmentId) {
-        this.v4Message = v4Message;
-        this.messageId = new MessageIdV5(v4Message.getMessageId(), segmentId);
+        this(v4Message, new MessageIdV5(v4Message.getMessageId(), segmentId), 
null);
     }
 
     /**
@@ -45,8 +51,36 @@ final class MessageV5<T> implements Message<T> {
      * (for stream consumer cumulative ack support).
      */
     MessageV5(org.apache.pulsar.client.api.Message<T> v4Message, MessageIdV5 
messageId) {
+        this(v4Message, messageId, null);
+    }
+
+    /**
+     * Create with an explicit topic override — used by multi-topic consumer 
wrappers
+     * to surface the parent scalable topic via {@link #topic()} instead of the
+     * underlying segment topic. {@code topicOverride} may be {@code null}.
+     */
+    MessageV5(org.apache.pulsar.client.api.Message<T> v4Message, MessageIdV5 
messageId,
+              String topicOverride) {
         this.v4Message = v4Message;
         this.messageId = messageId;
+        this.topicOverride = topicOverride;
+    }
+
+    /**
+     * Re-brand this message with a parent scalable topic. Used by multi-topic 
consumer
+     * wrappers when forwarding from a per-topic consumer's queue into the 
shared
+     * multiplexed queue: the message id picks up the parent for ack routing, 
and
+     * {@link #topic()} starts returning the parent.
+     */
+    MessageV5<T> withTopicOverride(String parentTopic) {
+        MessageIdV5 newId = new MessageIdV5(messageId.v4MessageId(), 
messageId.segmentId(),
+                messageId.positionVector(), parentTopic);
+        return new MessageV5<>(v4Message, newId, parentTopic);
+    }
+
+    /** Underlying v4 message — exposed to multi-topic wrappers that re-build 
with a new id. */
+    org.apache.pulsar.client.api.Message<T> v4Message() {
+        return v4Message;
     }
 
     @Override
@@ -98,7 +132,10 @@ final class MessageV5<T> implements Message<T> {
 
     @Override
     public String topic() {
-        return v4Message.getTopicName();
+        // Multi-topic consumer wrappers set topicOverride to the parent 
scalable topic
+        // so the user-visible topic() matches the topic they subscribed to 
(the
+        // v4 message carries the internal segment topic).
+        return topicOverride != null ? topicOverride : 
v4Message.getTopicName();
     }
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
new file mode 100644
index 00000000000..102d6de6c52
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Multi-topic {@link QueueConsumer} that subscribes to every scalable topic 
in a
+ * namespace matching a (possibly empty) set of property filters. The matching 
set
+ * follows live: when topics enter or leave the filter, the consumer attaches /
+ * detaches automatically via a long-lived {@link ScalableTopicsWatcher} 
session
+ * to the broker.
+ *
+ * <p>Internals:
+ * <ul>
+ *   <li>One {@link ScalableQueueConsumer} per matched topic.</li>
+ *   <li>A pump thread per topic forwards from the per-topic queue into the 
shared
+ *       multiplexed queue, tagging each message with the parent topic so the
+ *       subsequent ack can be routed back.</li>
+ *   <li>The watcher's {@code Snapshot} replaces the active set; {@code Diff}
+ *       applies removals (flushing acks first) before additions to handle a
+ *       rapid remove-then-add of the same topic name.</li>
+ *   <li>Per-topic add failures retry forever with exponential backoff (100 ms
+ *       initial, 30 min cap).</li>
+ * </ul>
+ */
+final class MultiTopicQueueConsumer<T> implements QueueConsumerImpl<T> {
+
+    private static final Logger LOG = 
Logger.get(MultiTopicQueueConsumer.class);
+    /**
+     * Cap for per-topic subscribe retries. Matches v4 consumer reconnect 
semantics —
+     * the consumer never gives up; the user just sees no messages from the 
topic
+     * until the broker / topic recovers.
+     */
+    private static final Duration RETRY_MAX = Duration.ofMinutes(30);
+    private final Logger log;
+
+    private final PulsarClientV5 client;
+    private final Schema<T> v5Schema;
+    private final ConsumerConfigurationData<T> consumerConf;
+    private final NamespaceName namespace;
+    private final Map<String, String> propertyFilters;
+    private final String subscriptionName;
+
+    private final ScalableTopicsWatcher watcher;
+    private final ConcurrentHashMap<String, PerTopicState<T>> perTopic = new 
ConcurrentHashMap<>();
+    private final LinkedTransferQueue<MessageV5<T>> mux = new 
LinkedTransferQueue<>();
+
+    private volatile boolean closed = false;
+    private final AsyncQueueConsumerV5<T> asyncView;
+
+    private MultiTopicQueueConsumer(PulsarClientV5 client,
+                                    Schema<T> v5Schema,
+                                    ConsumerConfigurationData<T> consumerConf,
+                                    NamespaceName namespace,
+                                    Map<String, String> propertyFilters,
+                                    ScalableTopicsWatcher watcher) {
+        this.client = client;
+        this.v5Schema = v5Schema;
+        this.consumerConf = consumerConf;
+        this.namespace = namespace;
+        this.propertyFilters = propertyFilters;
+        this.subscriptionName = consumerConf.getSubscriptionName();
+        this.watcher = watcher;
+        this.log = LOG.with()
+                .attr("namespace", namespace)
+                .attr("subscription", subscriptionName)
+                .attr("filters", propertyFilters)
+                .build();
+        this.asyncView = new AsyncQueueConsumerV5<>(this);
+    }
+
+    static <T> CompletableFuture<QueueConsumer<T>> createAsync(PulsarClientV5 
client,
+                                                                Schema<T> 
v5Schema,
+                                                                
ConsumerConfigurationData<T> consumerConf,
+                                                                NamespaceName 
namespace,
+                                                                Map<String, 
String> propertyFilters) {
+        ScalableTopicsWatcher watcher = new ScalableTopicsWatcher(
+                client.v4Client(), namespace, propertyFilters);
+        MultiTopicQueueConsumer<T> consumer = new MultiTopicQueueConsumer<>(
+                client, v5Schema, consumerConf, namespace, propertyFilters, 
watcher);
+        return watcher.start()
+                .thenCompose(initial -> consumer.openInitial(initial))
+                .thenApply(__ -> {
+                    watcher.setListener(consumer.new WatcherListener());
+                    return (QueueConsumer<T>) consumer;
+                })
+                .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
+                    throw ex instanceof CompletionException ce ? ce : new 
CompletionException(ex);
+                }));
+    }
+
+    /**
+     * Open one per-topic consumer per topic in the initial snapshot. Block on 
every
+     * future so {@code subscribeAsync} only resolves once the consumer is 
fully
+     * attached — gives the user the same all-or-nothing semantics as the
+     * single-topic builder.
+     */
+    private CompletableFuture<Void> openInitial(List<String> topics) {
+        if (topics.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        List<CompletableFuture<?>> opens = new ArrayList<>(topics.size());
+        for (String t : topics) {
+            opens.add(openTopic(t, /* retry= */ false));
+        }
+        return 
CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new));
+    }
+
+    /**
+     * Subscribe to one topic. When {@code retry} is true, failures schedule a
+     * background retry with exponential backoff; the returned future 
completes as
+     * soon as the first attempt finishes (success or failure) so we don't 
hold up
+     * Snapshot / Diff processing.
+     */
+    private CompletableFuture<Void> openTopic(String topicName, boolean retry) 
{
+        if (closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (perTopic.containsKey(topicName)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
+        // Per-topic message sink: tag each delivered message with the parent 
scalable
+        // topic for ack routing + display, and forward into the shared mux. 
No pump
+        // thread; per-segment v4 receive loops fire this sink directly.
+        java.util.function.Consumer<MessageV5<T>> sink = msg -> {
+            if (!closed) {
+                mux.add(msg.withTopicOverride(topicName));
+            }
+        };
+        return dagWatch.start()
+                .thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl(
+                        client, v5Schema, perTopicConf(topicName), dagWatch, 
layout, sink))
+                .thenAccept(qc -> {
+                    if (closed) {
+                        qc.closeAsync();
+                        return;
+                    }
+                    PerTopicState<T> state = new PerTopicState<>(topicName, 
qc);
+                    PerTopicState<T> existing = 
perTopic.putIfAbsent(topicName, state);
+                    if (existing != null) {
+                        // Concurrent open; drop the dup.
+                        qc.closeAsync();
+                        return;
+                    }
+                    log.info().attr("topic", topicName).log("Per-topic 
consumer attached");
+                })
+                .exceptionally(ex -> {
+                    Throwable cause = ex instanceof CompletionException ce && 
ce.getCause() != null
+                            ? ce.getCause() : ex;
+                    if (retry && !closed) {
+                        scheduleRetry(topicName);
+                    }
+                    log.warn().attr("topic", topicName).exceptionMessage(cause)
+                            .log("Per-topic subscribe failed");
+                    return null;
+                });
+    }
+
+    private void scheduleRetry(String topicName) {
+        long delayMs = nextBackoff(topicName);
+        log.info().attr("topic", topicName).attr("delayMs", delayMs)
+                .log("Retrying per-topic subscribe after backoff");
+        client.v4Client().timer().newTimeout(timeout -> openTopic(topicName, 
/* retry= */ true),
+                delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private final ConcurrentHashMap<String, AtomicLong> retryDelays = new 
ConcurrentHashMap<>();
+
+    /** Returns the next exponential-backoff delay (ms) for a topic and 
updates the state. */
+    private long nextBackoff(String topicName) {
+        AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new 
AtomicLong(100));
+        long current = al.get();
+        long next = Math.min(current * 2, RETRY_MAX.toMillis());
+        al.set(next);
+        return current;
+    }
+
+    private void resetBackoff(String topicName) {
+        retryDelays.remove(topicName);
+    }
+
+    /**
+     * Clone the user's consumer config for a per-topic consumer. Each 
per-topic
+     * consumer needs a unique {@code consumerName} so the broker can 
disambiguate
+     * them on the same subscription; we suffix with the topic name.
+     */
+    private ConsumerConfigurationData<T> perTopicConf(String topicName) {
+        var conf = consumerConf.clone();
+        if (consumerConf.getConsumerName() != null) {
+            // Disambiguate across topics — the broker side cares about 
uniqueness on
+            // the same Shared subscription per segment.
+            String localName = TopicName.get(topicName).getLocalName();
+            conf.setConsumerName(consumerConf.getConsumerName() + "-" + 
localName);
+        }
+        return conf;
+    }
+
+    /**
+     * Close per-topic consumer for a topic that has dropped out of the 
matching set.
+     * No explicit ack flush — Shared subscription acks are independent per 
message
+     * and the per-topic consumer's existing close already flushes pending 
acks via
+     * its v4 segment consumers.
+     */
+    private CompletableFuture<Void> closeTopic(String topicName) {
+        retryDelays.remove(topicName);
+        PerTopicState<T> state = perTopic.remove(topicName);
+        if (state == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return state.consumer.closeAsync()
+                .thenRun(() -> log.info().attr("topic", topicName)
+                        .log("Per-topic consumer detached"));
+    }
+
+    // --- QueueConsumer ---
+
+    @Override
+    public String topic() {
+        return "namespace://" + namespace;
+    }
+
+    @Override
+    public String subscription() {
+        return subscriptionName;
+    }
+
+    @Override
+    public String consumerName() {
+        return consumerConf.getConsumerName();
+    }
+
+    @Override
+    public Message<T> receive() throws PulsarClientException {
+        try {
+            return mux.take();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Receive interrupted", e);
+        }
+    }
+
+    @Override
+    public Message<T> receive(Duration timeout) throws PulsarClientException {
+        try {
+            return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Receive interrupted", e);
+        }
+    }
+
+    @Override
+    public void acknowledge(MessageId messageId) {
+        routeAck(messageId, ptc -> ptc.acknowledge(messageId));
+    }
+
+    @Override
+    public void acknowledge(MessageId messageId, Transaction txn) {
+        routeAck(messageId, ptc -> ptc.acknowledge(messageId, txn));
+    }
+
+    @Override
+    public void negativeAcknowledge(MessageId messageId) {
+        routeAck(messageId, ptc -> ptc.negativeAcknowledge(messageId));
+    }
+
+    /** Look up the per-topic consumer via the parent topic tag and delegate. 
*/
+    private void routeAck(MessageId messageId, 
java.util.function.Consumer<QueueConsumer<T>> action) {
+        if (!(messageId instanceof MessageIdV5 id)) {
+            throw new IllegalArgumentException("Expected MessageIdV5, got: " + 
messageId.getClass());
+        }
+        String parent = id.parentTopic();
+        if (parent == null) {
+            throw new IllegalStateException("MessageIdV5 missing parent topic 
— was the message"
+                    + " delivered through a multi-topic consumer?");
+        }
+        PerTopicState<T> state = perTopic.get(parent);
+        if (state == null) {
+            // Topic was removed between deliver and ack. Fine — broker has 
dropped the
+            // session for that topic. Drop the ack silently.
+            log.debug().attr("topic", parent)
+                    .log("Ack for removed topic; dropping");
+            return;
+        }
+        action.accept(state.consumer);
+    }
+
+    @Override
+    public AsyncQueueConsumer<T> async() {
+        return asyncView;
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        try {
+            closeAsync().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Close interrupted", e);
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        }
+    }
+
+    @Override
+    public CompletableFuture<Message<T>> receiveAsync() {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return receive();
+            } catch (PulsarClientException e) {
+                throw new CompletionException(e);
+            }
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        if (closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        closed = true;
+        watcher.close();
+        List<CompletableFuture<Void>> closes = new ArrayList<>();
+        for (var topic : new HashSet<>(perTopic.keySet())) {
+            closes.add(closeTopic(topic));
+        }
+        return 
CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new));
+    }
+
+    // --- Watcher listener ---
+
+    private final class WatcherListener implements 
ScalableTopicsWatcher.Listener {
+        @Override
+        public void onSnapshot(List<String> topics) {
+            // Reconcile: open anything new, close anything missing. Same as 
Diff but
+            // computed from the full snapshot — used on reconnect when broker 
hash
+            // differs from ours.
+            Set<String> target = new HashSet<>(topics);
+            Set<String> current = new HashSet<>(perTopic.keySet());
+            // Remove first so a rapid remove-then-add of same name 
closes-then-reopens.
+            for (String t : current) {
+                if (!target.contains(t)) {
+                    closeTopic(t);
+                }
+            }
+            for (String t : target) {
+                if (!current.contains(t)) {
+                    openTopic(t, /* retry= */ true);
+                    resetBackoff(t);
+                }
+            }
+        }
+
+        @Override
+        public void onDiff(List<String> added, List<String> removed) {
+            // Apply removed before added — covers rapid remove-then-add of 
same name.
+            for (String t : removed) {
+                closeTopic(t);
+            }
+            for (String t : added) {
+                openTopic(t, /* retry= */ true);
+                resetBackoff(t);
+            }
+        }
+    }
+
+    // --- Per-topic state ---
+
+    /**
+     * Per-topic bookkeeping. Messages flow directly into the shared mux via 
the
+     * sink the wrapper installed on the per-topic consumer at create-time, so
+     * there's no pump thread to start/stop here — just hold a reference to the
+     * underlying consumer for ack routing and clean shutdown.
+     */
+    private static final class PerTopicState<T> {
+        private final String parentTopic;
+        private final ScalableQueueConsumer<T> consumer;
+
+        PerTopicState(String parentTopic, ScalableQueueConsumer<T> consumer) {
+            this.parentTopic = parentTopic;
+            this.consumer = consumer;
+        }
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
new file mode 100644
index 00000000000..e49b755cca4
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.Messages;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.StreamConsumer;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Multi-topic {@link StreamConsumer} over the union of scalable topics in a
+ * namespace matching a (possibly empty) set of property filters.
+ *
+ * <p>Cumulative ack across topics works via a per-message position-vector
+ * snapshot: each message that enters the multiplexed queue carries
+ * {@code Map<TopicName, Map<SegmentId, MessageId>>} captured at enqueue time.
+ * On {@code acknowledgeCumulative(msg)}, the wrapper fans out to every
+ * per-topic consumer with the right segment vector — same semantics as the
+ * single-topic case, just lifted one level.
+ *
+ * <p>For Removed-mid-stream topics we flush acks up to {@code latestDelivered}
+ * for that topic before closing the per-topic consumer, so the user's
+ * processing-acked invariant is preserved if the topic is later re-added.
+ */
+final class MultiTopicStreamConsumer<T> implements StreamConsumer<T> {
+
+    private static final Logger LOG = 
Logger.get(MultiTopicStreamConsumer.class);
+    /**
+     * Cap for per-topic subscribe retries. Matches v4 consumer reconnect 
semantics.
+     */
+    private static final Duration RETRY_MAX = Duration.ofMinutes(30);
+
+    private final Logger log;
+
+    private final PulsarClientV5 client;
+    private final Schema<T> v5Schema;
+    private final ConsumerConfigurationData<T> consumerConf;
+    private final NamespaceName namespace;
+    private final Map<String, String> propertyFilters;
+    private final String subscriptionName;
+
+    private final ScalableTopicsWatcher watcher;
+    private final ConcurrentHashMap<String, PerTopic<T>> perTopic = new 
ConcurrentHashMap<>();
+    private final LinkedTransferQueue<MessageV5<T>> mux = new 
LinkedTransferQueue<>();
+
+    /**
+     * Tracks the latest delivered message id per (parent topic, segment id) 
across
+     * every per-topic consumer. Snapshotted at enqueue time for each delivered
+     * message so cumulative ack covers everything visible up to that message.
+     */
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Long, 
org.apache.pulsar.client.api.MessageId>>
+            latestDeliveredPerTopicSegment = new ConcurrentHashMap<>();
+
+    private volatile boolean closed = false;
+    private final AsyncStreamConsumerV5Multi asyncView;
+
+    private MultiTopicStreamConsumer(PulsarClientV5 client,
+                                     Schema<T> v5Schema,
+                                     ConsumerConfigurationData<T> consumerConf,
+                                     NamespaceName namespace,
+                                     Map<String, String> propertyFilters,
+                                     ScalableTopicsWatcher watcher) {
+        this.client = client;
+        this.v5Schema = v5Schema;
+        this.consumerConf = consumerConf;
+        this.namespace = namespace;
+        this.propertyFilters = propertyFilters;
+        this.subscriptionName = consumerConf.getSubscriptionName();
+        this.watcher = watcher;
+        this.log = LOG.with()
+                .attr("namespace", namespace)
+                .attr("subscription", subscriptionName)
+                .attr("filters", propertyFilters)
+                .build();
+        this.asyncView = new AsyncStreamConsumerV5Multi();
+    }
+
+    static <T> CompletableFuture<StreamConsumer<T>> createAsync(PulsarClientV5 
client,
+                                                                Schema<T> 
v5Schema,
+                                                                
ConsumerConfigurationData<T> consumerConf,
+                                                                NamespaceName 
namespace,
+                                                                Map<String, 
String> propertyFilters) {
+        ScalableTopicsWatcher watcher = new ScalableTopicsWatcher(
+                client.v4Client(), namespace, propertyFilters);
+        MultiTopicStreamConsumer<T> consumer = new MultiTopicStreamConsumer<>(
+                client, v5Schema, consumerConf, namespace, propertyFilters, 
watcher);
+        return watcher.start()
+                .thenCompose(initial -> consumer.openInitial(initial))
+                .thenApply(__ -> {
+                    watcher.setListener(consumer.new WatcherListener());
+                    return (StreamConsumer<T>) consumer;
+                })
+                .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
+                    throw ex instanceof CompletionException ce ? ce : new 
CompletionException(ex);
+                }));
+    }
+
+    private CompletableFuture<Void> openInitial(List<String> topics) {
+        if (topics.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        List<CompletableFuture<?>> opens = new ArrayList<>(topics.size());
+        for (String t : topics) {
+            opens.add(openTopic(t, /* retry= */ false));
+        }
+        return 
CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new));
+    }
+
+    private CompletableFuture<Void> openTopic(String topicName, boolean retry) 
{
+        if (closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (perTopic.containsKey(topicName)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName topic = V5Utils.asScalableTopicName(topicName);
+        // One ScalableConsumerClient session per topic, same as the 
single-topic builder.
+        ScalableConsumerClient session = new ScalableConsumerClient(
+                client.v4Client(), topic,
+                consumerConf.getSubscriptionName(),
+                perTopicConsumerName(topicName),
+                ScalableConsumerType.STREAM);
+
+        // Per-topic message sink: each delivered message arrives with its
+        // single-topic positionVector (computed by ScalableStreamConsumer). 
Update
+        // our cross-topic latestDelivered map, snapshot the full cross-topic 
vector,
+        // and forward to the shared mux. No pump thread.
+        java.util.function.Consumer<MessageV5<T>> sink = msg ->
+                onPerTopicMessage(topicName, msg);
+
+        return session.start()
+                .thenCompose(initialAssignment -> 
ScalableStreamConsumer.createAsyncImpl(
+                        client, v5Schema, perTopicConf(topicName), session,
+                        topicName, initialAssignment, sink))
+                .thenAccept(sc -> {
+                    if (closed) {
+                        sc.closeAsync();
+                        return;
+                    }
+                    PerTopic<T> state = new PerTopic<>(topicName, sc);
+                    PerTopic<T> existing = perTopic.putIfAbsent(topicName, 
state);
+                    if (existing != null) {
+                        sc.closeAsync();
+                        return;
+                    }
+                    log.info().attr("topic", topicName).log("Per-topic stream 
consumer attached");
+                })
+                .exceptionally(ex -> {
+                    Throwable cause = ex instanceof CompletionException ce && 
ce.getCause() != null
+                            ? ce.getCause() : ex;
+                    if (retry && !closed) {
+                        scheduleRetry(topicName);
+                    }
+                    log.warn().attr("topic", topicName).exceptionMessage(cause)
+                            .log("Per-topic stream subscribe failed");
+                    return null;
+                });
+    }
+
+    private void scheduleRetry(String topicName) {
+        long delayMs = nextBackoff(topicName);
+        log.info().attr("topic", topicName).attr("delayMs", delayMs)
+                .log("Retrying per-topic stream subscribe");
+        client.v4Client().timer().newTimeout(timeout -> openTopic(topicName, 
/* retry= */ true),
+                delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private final ConcurrentHashMap<String, AtomicLong> retryDelays = new 
ConcurrentHashMap<>();
+
+    private long nextBackoff(String topicName) {
+        AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new 
AtomicLong(100));
+        long current = al.get();
+        long next = Math.min(current * 2, RETRY_MAX.toMillis());
+        al.set(next);
+        return current;
+    }
+
+    private void resetBackoff(String topicName) {
+        retryDelays.remove(topicName);
+    }
+
+    /**
+     * Per-topic consumer name. Each topic gets a distinct name so the 
broker's per-topic
+     * coordinator can register them as separate consumers (same 
Exclusive-per-segment
+     * semantics, no cross-topic identity coupling).
+     */
+    private String perTopicConsumerName(String topicName) {
+        String localName = TopicName.get(topicName).getLocalName();
+        if (consumerConf.getConsumerName() != null) {
+            return consumerConf.getConsumerName() + "-" + localName;
+        }
+        return "v5-stream-" + V5RandomIds.randomAlphanumeric(8) + "-" + 
localName;
+    }
+
+    private ConsumerConfigurationData<T> perTopicConf(String topicName) {
+        var conf = consumerConf.clone();
+        conf.setConsumerName(perTopicConsumerName(topicName));
+        return conf;
+    }
+
+    /**
+     * Close per-topic consumer, flushing pending cumulative acks up to 
whatever was
+     * last delivered for that topic. If the topic later re-appears 
(re-Added), a
+     * fresh consumer subscribes and resumes from the broker-side cursor — 
already
+     * advanced past the messages we've delivered to the user.
+     */
+    private CompletableFuture<Void> closeTopic(String topicName) {
+        retryDelays.remove(topicName);
+        PerTopic<T> state = perTopic.remove(topicName);
+        if (state == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // Flush: ack everything we delivered for this topic.
+        ConcurrentHashMap<Long, org.apache.pulsar.client.api.MessageId> latest 
=
+                latestDeliveredPerTopicSegment.remove(topicName);
+        if (latest != null && !latest.isEmpty()) {
+            state.consumer.ackUpToVector(new HashMap<>(latest));
+        }
+        return state.consumer.closeAsync()
+                .thenRun(() -> log.info().attr("topic", topicName)
+                        .log("Per-topic stream consumer detached"));
+    }
+
+    // --- StreamConsumer ---
+
+    @Override
+    public String topic() {
+        return "namespace://" + namespace;
+    }
+
+    @Override
+    public String subscription() {
+        return subscriptionName;
+    }
+
+    @Override
+    public String consumerName() {
+        return consumerConf.getConsumerName();
+    }
+
+    @Override
+    public Message<T> receive() throws PulsarClientException {
+        try {
+            return mux.take();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Receive interrupted", e);
+        }
+    }
+
+    @Override
+    public Message<T> receive(Duration timeout) throws PulsarClientException {
+        try {
+            return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Receive interrupted", e);
+        }
+    }
+
+    @Override
+    public Messages<T> receiveMulti(int maxNumMessages, Duration timeout) 
throws PulsarClientException {
+        // Block for up to `timeout` waiting for the first message, then drain 
whatever
+        // else is immediately available up to maxNumMessages. Same shape as 
the single
+        // topic StreamConsumer.
+        long deadline = System.nanoTime() + timeout.toNanos();
+        List<Message<T>> batch = new ArrayList<>();
+        try {
+            long remaining = deadline - System.nanoTime();
+            while (batch.size() < maxNumMessages && remaining > 0) {
+                MessageV5<T> msg = mux.poll(remaining, TimeUnit.NANOSECONDS);
+                if (msg == null) {
+                    break;
+                }
+                batch.add(msg);
+                remaining = deadline - System.nanoTime();
+            }
+            // Opportunistic drain of anything else already queued.
+            List<MessageV5<T>> tail = new ArrayList<>();
+            mux.drainTo(tail, maxNumMessages - batch.size());
+            batch.addAll(tail);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Receive interrupted", e);
+        }
+        return new MessagesV5<>(batch);
+    }
+
+    @Override
+    public void acknowledgeCumulative(MessageId messageId) {
+        fanOutCumulativeAck(messageId, (sc, vector) -> 
sc.ackUpToVector(vector));
+    }
+
+    @Override
+    public void acknowledgeCumulative(MessageId messageId, Transaction txn) {
+        // Transactions on multi-topic are best-effort across per-topic 
consumers — each
+        // per-topic ack is independently transactional. See note in the 
design doc.
+        fanOutCumulativeAck(messageId, (sc, vector) -> 
sc.ackUpToVector(vector));
+    }
+
+    /**
+     * For a cumulative ack on a multi-topic message, look up its multi-topic 
vector
+     * and invoke the per-topic ack on every parent topic.
+     */
+    private void fanOutCumulativeAck(MessageId messageId,
+                                     
java.util.function.BiConsumer<ScalableStreamConsumer<T>,
+                                             Map<Long, 
org.apache.pulsar.client.api.MessageId>> action) {
+        if (!(messageId instanceof MessageIdV5 id)) {
+            throw new IllegalArgumentException("Expected MessageIdV5, got: " + 
messageId.getClass());
+        }
+        Map<String, Map<Long, org.apache.pulsar.client.api.MessageId>> vector 
= id.multiTopicVector();
+        if (vector == null) {
+            throw new IllegalStateException("MessageIdV5 missing multi-topic 
vector — was the"
+                    + " message delivered through a multi-topic stream 
consumer?");
+        }
+        for (var entry : vector.entrySet()) {
+            PerTopic<T> state = perTopic.get(entry.getKey());
+            if (state == null) {
+                // Topic was Removed since enqueue; closeTopic already flushed.
+                continue;
+            }
+            action.accept(state.consumer, entry.getValue());
+        }
+    }
+
+    @Override
+    public AsyncStreamConsumer<T> async() {
+        return asyncView;
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        try {
+            closeAsync().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Close interrupted", e);
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        }
+    }
+
+    CompletableFuture<Void> closeAsync() {
+        if (closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        closed = true;
+        watcher.close();
+        List<CompletableFuture<Void>> closes = new ArrayList<>();
+        for (var topic : new HashSet<>(perTopic.keySet())) {
+            closes.add(closeTopic(topic));
+        }
+        return 
CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new));
+    }
+
+    // --- Watcher listener ---
+
+    private final class WatcherListener implements 
ScalableTopicsWatcher.Listener {
+        @Override
+        public void onSnapshot(List<String> topics) {
+            Set<String> target = new HashSet<>(topics);
+            Set<String> current = new HashSet<>(perTopic.keySet());
+            for (String t : current) {
+                if (!target.contains(t)) {
+                    closeTopic(t);
+                }
+            }
+            for (String t : target) {
+                if (!current.contains(t)) {
+                    openTopic(t, /* retry= */ true);
+                    resetBackoff(t);
+                }
+            }
+        }
+
+        @Override
+        public void onDiff(List<String> added, List<String> removed) {
+            for (String t : removed) {
+                closeTopic(t);
+            }
+            for (String t : added) {
+                openTopic(t, /* retry= */ true);
+                resetBackoff(t);
+            }
+        }
+    }
+
+    /**
+     * Per-topic message handler installed as the sink on each per-topic
+     * {@link ScalableStreamConsumer}. The single-topic consumer has already
+     * computed its per-segment position vector and stored it on the inbound
+     * message id; we adopt that as the per-topic slice of our cross-topic
+     * vector, snapshot the full map, and forward into the shared mux.
+     *
+     * <p>Runs on the netty IO thread that delivered the per-segment message —
+     * the only contention is the synchronized snapshot block which guards
+     * against torn cross-topic views during concurrent deliveries.
+     */
+    private void onPerTopicMessage(String parentTopic, MessageV5<T> msg) {
+        if (closed) {
+            return;
+        }
+        MessageIdV5 origId = (MessageIdV5) msg.id();
+
+        // Adopt the message's own positionVector as our per-topic 
latest-delivered
+        // slice. ScalableStreamConsumer maintained the increasing invariant on
+        // each segment id; merging via putAll keeps the property cross-topic.
+        ConcurrentHashMap<Long, org.apache.pulsar.client.api.MessageId> ours =
+                latestDeliveredPerTopicSegment.computeIfAbsent(parentTopic,
+                        k -> new ConcurrentHashMap<>());
+        ours.putAll(origId.positionVector());
+
+        // Snapshot the cross-topic vector under lock so concurrent deliveries
+        // can't observe a torn view.
+        Map<String, Map<Long, org.apache.pulsar.client.api.MessageId>> 
snapshot;
+        synchronized (latestDeliveredPerTopicSegment) {
+            snapshot = new HashMap<>(latestDeliveredPerTopicSegment.size());
+            for (var e : latestDeliveredPerTopicSegment.entrySet()) {
+                snapshot.put(e.getKey(), new HashMap<>(e.getValue()));
+            }
+        }
+
+        MessageIdV5 newId = new MessageIdV5(
+                origId.v4MessageId(), origId.segmentId(),
+                origId.positionVector(), parentTopic, snapshot);
+        mux.add(new MessageV5<>(msg.v4Message(), newId, parentTopic));
+    }
+
+    // --- Per-topic state ---
+
+    /**
+     * Per-topic bookkeeping. Messages flow into the shared mux directly via 
the
+     * sink installed on the per-topic consumer at create-time, so there's no
+     * pump thread to manage — this is just a holder for ack routing and clean
+     * shutdown.
+     */
+    private static final class PerTopic<T> {
+        private final String parentTopic;
+        private final ScalableStreamConsumer<T> consumer;
+
+        PerTopic(String parentTopic, ScalableStreamConsumer<T> consumer) {
+            this.parentTopic = parentTopic;
+            this.consumer = consumer;
+        }
+    }
+
+    // --- Async view ---
+
+    private final class AsyncStreamConsumerV5Multi implements 
AsyncStreamConsumer<T> {
+        @Override
+        public CompletableFuture<Message<T>> receive() {
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    return MultiTopicStreamConsumer.this.receive();
+                } catch (PulsarClientException e) {
+                    throw new CompletionException(e);
+                }
+            });
+        }
+
+        @Override
+        public CompletableFuture<Message<T>> receive(Duration timeout) {
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    return MultiTopicStreamConsumer.this.receive(timeout);
+                } catch (PulsarClientException e) {
+                    throw new CompletionException(e);
+                }
+            });
+        }
+
+        @Override
+        public CompletableFuture<List<Message<T>>> receiveMulti(int 
maxNumMessages, Duration timeout) {
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    Messages<T> ms = 
MultiTopicStreamConsumer.this.receiveMulti(maxNumMessages, timeout);
+                    List<Message<T>> out = new ArrayList<>();
+                    for (Message<T> m : ms) {
+                        out.add(m);
+                    }
+                    return out;
+                } catch (PulsarClientException e) {
+                    throw new CompletionException(e);
+                }
+            });
+        }
+
+        @Override
+        public void acknowledgeCumulative(MessageId messageId) {
+            MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId);
+        }
+
+        @Override
+        public void acknowledgeCumulative(MessageId messageId, Transaction 
txn) {
+            MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId, 
txn);
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            return MultiTopicStreamConsumer.this.closeAsync();
+        }
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index a762964c7d7..a268e70219b 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -19,11 +19,9 @@
 package org.apache.pulsar.client.impl.v5;
 
 import java.time.Duration;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.QueueConsumer;
 import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
@@ -34,6 +32,7 @@ import 
org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 
 /**
@@ -44,7 +43,11 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
     private final PulsarClientV5 client;
     private final Schema<T> v5Schema;
     private final ConsumerConfigurationData<T> conf = new 
ConsumerConfigurationData<>();
+    // Exactly one of {topicName, namespaceName} must be set at subscribe() 
time —
+    // single-topic vs multi-topic mode.
     private String topicName;
+    private NamespaceName namespaceName;
+    private Map<String, String> propertyFilters;
 
     QueueConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
         this.client = client;
@@ -65,48 +68,44 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
 
     @Override
     public CompletableFuture<QueueConsumer<T>> subscribeAsync() {
-        if (topicName == null || topicName.isEmpty()) {
+        boolean topicSet = topicName != null && !topicName.isEmpty();
+        boolean namespaceSet = namespaceName != null;
+        if (topicSet == namespaceSet) {
             return CompletableFuture.failedFuture(
-                    new 
PulsarClientException.InvalidConfigurationException("Topic name is required"));
+                    new PulsarClientException.InvalidConfigurationException(
+                            "Exactly one of .topic(name) or .namespace(...) 
must be set"));
         }
         if (conf.getSubscriptionName() == null || 
conf.getSubscriptionName().isEmpty()) {
             return CompletableFuture.failedFuture(
                     new 
PulsarClientException.InvalidConfigurationException("Subscription name is 
required"));
         }
 
+        if (namespaceSet) {
+            return MultiTopicQueueConsumer.createAsync(
+                    client, v5Schema, conf, namespaceName, propertyFilters);
+        }
         TopicName topic = V5Utils.asScalableTopicName(topicName);
         DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
-
         return dagWatch.start()
                 .thenCompose(initialLayout -> 
ScalableQueueConsumer.createAsync(
                         client, v5Schema, conf, dagWatch, initialLayout));
     }
 
     @Override
-    public QueueConsumerBuilderV5<T> topic(String... topicNames) {
-        if (topicNames.length > 0) {
-            this.topicName = topicNames[0];
-        }
+    public QueueConsumerBuilderV5<T> topic(String topicName) {
+        this.topicName = topicName;
         return this;
     }
 
     @Override
-    public QueueConsumerBuilderV5<T> topics(List<String> topicNames) {
-        if (!topicNames.isEmpty()) {
-            this.topicName = topicNames.get(0);
-        }
-        return this;
+    public QueueConsumerBuilderV5<T> namespace(String namespace) {
+        return namespace(namespace, Map.of());
     }
 
     @Override
-    public QueueConsumerBuilderV5<T> topicsPattern(Pattern pattern) {
-        conf.setTopicsPattern(pattern);
-        return this;
-    }
-
-    @Override
-    public QueueConsumerBuilderV5<T> topicsPattern(String regex) {
-        conf.setTopicsPattern(Pattern.compile(regex));
+    public QueueConsumerBuilderV5<T> namespace(String namespace, Map<String, 
String> propertyFilters) {
+        this.namespaceName = NamespaceName.get(namespace);
+        this.propertyFilters = propertyFilters == null ? Map.of() : 
Map.copyOf(propertyFilters);
         return this;
     }
 
@@ -206,12 +205,6 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
         return this;
     }
 
-    @Override
-    public QueueConsumerBuilderV5<T> patternAutoDiscoveryPeriod(Duration 
interval) {
-        conf.setPatternAutoDiscoveryPeriod((int) interval.getSeconds());
-        return this;
-    }
-
     @Override
     public QueueConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy policy) 
{
         
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java
new file mode 100644
index 00000000000..61536f5b442
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+
+/**
+ * Internal extension of {@link QueueConsumer} that exposes the async hooks
+ * needed by {@link AsyncQueueConsumerV5}. Implemented by both
+ * {@link ScalableQueueConsumer} (single-topic) and {@link 
MultiTopicQueueConsumer}
+ * so the async wrapper works against either without duplication.
+ */
+interface QueueConsumerImpl<T> extends QueueConsumer<T> {
+    CompletableFuture<Message<T>> receiveAsync();
+
+    CompletableFuture<Void> closeAsync();
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 61f5e87644d..31cb556b70e 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.common.util.Backoff;
  * Individual acknowledgments and negative acknowledgments are routed to
  * the correct segment consumer via the segment ID in {@link MessageIdV5}.
  */
-final class ScalableQueueConsumer<T> implements QueueConsumer<T>, 
DagWatchClient.LayoutChangeListener {
+final class ScalableQueueConsumer<T> implements QueueConsumerImpl<T>, 
DagWatchClient.LayoutChangeListener {
 
     private static final Logger LOG = Logger.get(ScalableQueueConsumer.class);
     private final Logger log;
@@ -72,6 +72,13 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
     private final ConcurrentHashMap<Long, 
CompletableFuture<org.apache.pulsar.client.api.Consumer<T>>>
             segmentConsumers = new ConcurrentHashMap<>();
     private final LinkedTransferQueue<MessageV5<T>> messageQueue = new 
LinkedTransferQueue<>();
+    /**
+     * Where each per-segment receive loop deposits a freshly-arrived message. 
Defaults
+     * to enqueueing on {@link #messageQueue} for the user's {@link 
#receive()} to pull;
+     * the multi-topic wrapper overrides this to forward directly into its 
shared
+     * multiplexed queue, so no per-topic pump thread is needed.
+     */
+    private final java.util.function.Consumer<MessageV5<T>> messageSink;
 
     private volatile boolean closed = false;
     private final AsyncQueueConsumerV5<T> asyncView;
@@ -88,7 +95,8 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
     private ScalableQueueConsumer(PulsarClientV5 client,
                                   Schema<T> v5Schema,
                                   ConsumerConfigurationData<T> consumerConf,
-                                  DagWatchClient dagWatch) {
+                                  DagWatchClient dagWatch,
+                                  java.util.function.Consumer<MessageV5<T>> 
messageSink) {
         this.client = client;
         this.v5Schema = v5Schema;
         this.v4Schema = SchemaAdapter.toV4(v5Schema);
@@ -96,6 +104,10 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
         this.dagWatch = dagWatch;
         this.topicName = dagWatch.topicName().toString();
         this.subscriptionName = consumerConf.getSubscriptionName();
+        // Default sink enqueues on the local messageQueue for 
receive()/receive(timeout).
+        // Multi-topic mode passes a sink that forwards into the shared mux 
instead — no
+        // per-topic pump thread needed.
+        this.messageSink = messageSink != null ? messageSink : 
messageQueue::add;
         this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
         this.asyncView = new AsyncQueueConsumerV5<>(this);
     }
@@ -111,11 +123,29 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
                                                                
ConsumerConfigurationData<T> consumerConf,
                                                                DagWatchClient 
dagWatch,
                                                                
ClientSegmentLayout initialLayout) {
-        ScalableQueueConsumer<T> consumer = new 
ScalableQueueConsumer<>(client, v5Schema, consumerConf, dagWatch);
+        return createAsyncImpl(client, v5Schema, consumerConf, dagWatch, 
initialLayout, null)
+                .thenApply(c -> c);
+    }
+
+    /**
+     * Like {@link #createAsync} but resolves to the concrete impl type and 
accepts an
+     * optional external message sink. Used by {@link 
MultiTopicQueueConsumer}: it
+     * passes a sink that forwards into the shared multiplexed queue, so 
per-segment
+     * v4 receive loops deliver messages to the wrapper without any pump 
thread.
+     */
+    static <T> CompletableFuture<ScalableQueueConsumer<T>> createAsyncImpl(
+            PulsarClientV5 client,
+            Schema<T> v5Schema,
+            ConsumerConfigurationData<T> consumerConf,
+            DagWatchClient dagWatch,
+            ClientSegmentLayout initialLayout,
+            java.util.function.Consumer<MessageV5<T>> messageSink) {
+        ScalableQueueConsumer<T> consumer = new ScalableQueueConsumer<>(
+                client, v5Schema, consumerConf, dagWatch, messageSink);
         return consumer.subscribeSegments(initialLayout)
                 .thenApply(__ -> {
                     dagWatch.setListener(consumer);
-                    return (QueueConsumer<T>) consumer;
+                    return consumer;
                 })
                 .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
                     throw ex instanceof CompletionException ce ? ce : new 
CompletionException(ex);
@@ -209,7 +239,8 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
 
     // --- Async internals ---
 
-    CompletableFuture<Message<T>> receiveAsync() {
+    @Override
+    public CompletableFuture<Message<T>> receiveAsync() {
         return CompletableFuture.supplyAsync(() -> {
             try {
                 return receive();
@@ -219,7 +250,8 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
         });
     }
 
-    CompletableFuture<Void> closeAsync() {
+    @Override
+    public CompletableFuture<Void> closeAsync() {
         closed = true;
         dagWatch.close();
 
@@ -358,7 +390,7 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
 
     private void startReceiveLoop(org.apache.pulsar.client.api.Consumer<T> 
v4Consumer, long segmentId) {
         v4Consumer.receiveAsync().thenAccept(v4Msg -> {
-            messageQueue.add(new MessageV5<>(v4Msg, segmentId));
+            messageSink.accept(new MessageV5<>(v4Msg, segmentId));
             if (!closed) {
                 startReceiveLoop(v4Consumer, segmentId);
             }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index ddffea7d84d..afd703ae9a4 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -86,6 +86,13 @@ final class ScalableStreamConsumer<T>
             new ConcurrentHashMap<>();
 
     private final LinkedTransferQueue<MessageV5<T>> messageQueue = new 
LinkedTransferQueue<>();
+    /**
+     * Where each per-segment receive loop deposits a freshly-arrived message. 
Defaults
+     * to enqueueing on {@link #messageQueue} for the user's {@link 
#receive()} to pull;
+     * the multi-topic wrapper overrides this to forward into its shared 
multiplexed
+     * queue, applying its own multi-topic position-vector capture in the 
process.
+     */
+    private final java.util.function.Consumer<MessageV5<T>> messageSink;
 
     private volatile boolean closed = false;
     private final AsyncStreamConsumerV5<T> asyncView;
@@ -94,7 +101,8 @@ final class ScalableStreamConsumer<T>
                                    Schema<T> v5Schema,
                                    ConsumerConfigurationData<T> consumerConf,
                                    ScalableConsumerClient session,
-                                   String topicName) {
+                                   String topicName,
+                                   java.util.function.Consumer<MessageV5<T>> 
messageSink) {
         this.client = client;
         this.v5Schema = v5Schema;
         this.v4Schema = SchemaAdapter.toV4(v5Schema);
@@ -102,6 +110,7 @@ final class ScalableStreamConsumer<T>
         this.session = session;
         this.topicName = topicName;
         this.subscriptionName = consumerConf.getSubscriptionName();
+        this.messageSink = messageSink != null ? messageSink : 
messageQueue::add;
         this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
         this.asyncView = new AsyncStreamConsumerV5<>(this);
     }
@@ -118,18 +127,54 @@ final class ScalableStreamConsumer<T>
                                                                 
ScalableConsumerClient session,
                                                                 String 
topicName,
                                                                 
List<ActiveSegment> initialAssignment) {
+        return createAsyncImpl(client, v5Schema, consumerConf, session, 
topicName, initialAssignment, null)
+                .thenApply(c -> c);
+    }
+
+    /**
+     * Like {@link #createAsync} but resolves to the concrete impl type and 
accepts an
+     * optional external message sink. Used by {@link 
MultiTopicStreamConsumer}: it
+     * passes a sink that forwards into the shared multiplexed queue, 
replacing the
+     * per-topic pump thread with direct delivery.
+     */
+    static <T> CompletableFuture<ScalableStreamConsumer<T>> createAsyncImpl(
+            PulsarClientV5 client,
+            Schema<T> v5Schema,
+            ConsumerConfigurationData<T> consumerConf,
+            ScalableConsumerClient session,
+            String topicName,
+            List<ActiveSegment> initialAssignment,
+            java.util.function.Consumer<MessageV5<T>> messageSink) {
         ScalableStreamConsumer<T> consumer = new ScalableStreamConsumer<>(
-                client, v5Schema, consumerConf, session, topicName);
+                client, v5Schema, consumerConf, session, topicName, 
messageSink);
         return consumer.subscribeAssigned(initialAssignment)
                 .thenApply(__ -> {
                     session.setListener(consumer);
-                    return (StreamConsumer<T>) consumer;
+                    return consumer;
                 })
                 .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
                     throw ex instanceof CompletionException ce ? ce : new 
CompletionException(ex);
                 }));
     }
 
+    /**
+     * Multi-topic ack hook. Synthesises a {@link MessageIdV5} carrying the 
supplied
+     * vector and routes it through the regular cumulative-ack path so 
segments are
+     * acked up to the recorded positions. Used by {@link 
MultiTopicStreamConsumer}
+     * to fan out a cumulative ack across every per-topic consumer.
+     */
+    void ackUpToVector(java.util.Map<Long, 
org.apache.pulsar.client.api.MessageId> vector) {
+        if (vector == null || vector.isEmpty()) {
+            return;
+        }
+        // The constructed id only needs the positionVector; v4MessageId / 
segmentId are
+        // unused on the cumulative-ack path. Pick any value for the 
non-vector slots —
+        // earliest is convenient and won't accidentally satisfy a peer's 
check.
+        var synthetic = new 
MessageIdV5(org.apache.pulsar.client.api.MessageId.earliest,
+                MessageIdV5.NO_SEGMENT, vector);
+        acknowledgeCumulative(synthetic);
+    }
+
     @Override
     public String topic() {
         return topicName;
@@ -358,7 +403,7 @@ final class ScalableStreamConsumer<T>
 
             // Create the V5 message with the position vector embedded in the 
ID
             var msgId = new MessageIdV5(v4Msg.getMessageId(), segmentId, 
positionVector);
-            messageQueue.add(new MessageV5<>(v4Msg, msgId));
+            messageSink.accept(new MessageV5<>(v4Msg, msgId));
 
             if (!closed) {
                 startReceiveLoop(v4Consumer, segmentId);
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index d3c356ecb55..6b9f6b67e07 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -42,7 +42,11 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
     private final PulsarClientV5 client;
     private final Schema<T> v5Schema;
     private final ConsumerConfigurationData<T> conf = new 
ConsumerConfigurationData<>();
+    // Exactly one of {topicName, namespaceName} must be set at subscribe() 
time —
+    // single-topic vs multi-topic mode.
     private String topicName;
+    private org.apache.pulsar.common.naming.NamespaceName namespaceName;
+    private Map<String, String> propertyFilters;
 
     StreamConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
         this.client = client;
@@ -63,21 +67,29 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
 
     @Override
     public CompletableFuture<StreamConsumer<T>> subscribeAsync() {
-        if (topicName == null || topicName.isEmpty()) {
+        boolean topicSet = topicName != null && !topicName.isEmpty();
+        boolean namespaceSet = namespaceName != null;
+        if (topicSet == namespaceSet) {
             return CompletableFuture.failedFuture(
-                    new 
PulsarClientException.InvalidConfigurationException("Topic name is required"));
+                    new PulsarClientException.InvalidConfigurationException(
+                            "Exactly one of .topic(name) or .namespace(...) 
must be set"));
         }
         if (conf.getSubscriptionName() == null || 
conf.getSubscriptionName().isEmpty()) {
             return CompletableFuture.failedFuture(
                     new 
PulsarClientException.InvalidConfigurationException("Subscription name is 
required"));
         }
-
-        TopicName topic = V5Utils.asScalableTopicName(topicName);
         // Default the consumer name to a stable random when the user didn't 
set one —
         // ScalableConsumerClient uses it as the registration key with the 
controller.
         if (conf.getConsumerName() == null || 
conf.getConsumerName().isEmpty()) {
             conf.setConsumerName("v5-stream-" + 
V5RandomIds.randomAlphanumeric(8));
         }
+
+        if (namespaceSet) {
+            return MultiTopicStreamConsumer.createAsync(
+                    client, v5Schema, conf, namespaceName, propertyFilters);
+        }
+
+        TopicName topic = V5Utils.asScalableTopicName(topicName);
         ScalableConsumerClient session = new ScalableConsumerClient(
                 client.v4Client(),
                 topic,
@@ -91,10 +103,20 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
     }
 
     @Override
-    public StreamConsumerBuilderV5<T> topic(String... topicNames) {
-        if (topicNames.length > 0) {
-            this.topicName = topicNames[0];
-        }
+    public StreamConsumerBuilderV5<T> topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public StreamConsumerBuilderV5<T> namespace(String namespace) {
+        return namespace(namespace, Map.of());
+    }
+
+    @Override
+    public StreamConsumerBuilderV5<T> namespace(String namespace, Map<String, 
String> propertyFilters) {
+        this.namespaceName = 
org.apache.pulsar.common.naming.NamespaceName.get(namespace);
+        this.propertyFilters = propertyFilters == null ? Map.of() : 
Map.copyOf(propertyFilters);
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
index 577023070ad..ebdbf73c0b4 100644
--- 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
+++ 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
@@ -168,6 +168,64 @@ public class MessageIdV5Test {
         assertThrows(java.io.IOException.class, () -> 
MessageIdV5.fromByteArray(null));
     }
 
+    @Test
+    public void testRoundtripWithParentTopic() throws Exception {
+        // Multi-topic queue consumer ids carry a parent-topic tag for ack 
routing
+        // but no cross-topic vector. Both must survive serialisation.
+        MessageIdV5 original = new MessageIdV5(v4(11, 22, 0), 3L,
+                Map.of(0L, v4(1, 2, 0)),
+                "topic://tenant/ns/my-topic");
+
+        MessageIdV5 decoded = 
MessageIdV5.fromByteArray(original.toByteArray());
+
+        assertEquals(decoded.segmentId(), 3L);
+        assertEquals(decoded.parentTopic(), "topic://tenant/ns/my-topic");
+        assertEquals(decoded.positionVector().size(), 1);
+        // No multi-topic vector was set; round-trip must preserve null.
+        assertEquals(decoded.multiTopicVector(), null);
+    }
+
+    @Test
+    public void testRoundtripWithMultiTopicVector() throws Exception {
+        // Multi-topic stream consumer ids carry a cross-topic position vector 
and
+        // the parent topic. Whole tree must survive byte-level round trip so 
an
+        // application that serialises the id and sends it through can still 
call
+        // acknowledgeCumulative against the right per-topic / per-segment 
positions.
+        Map<String, Map<Long, MessageId>> multi = Map.of(
+                "topic://tenant/ns/a", Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 
0)),
+                "topic://tenant/ns/b", Map.of(0L, v4(3, 3, 0)));
+        MessageIdV5 original = new MessageIdV5(v4(99, 100, 0), 5L,
+                Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)),
+                "topic://tenant/ns/a",
+                multi);
+
+        MessageIdV5 decoded = 
MessageIdV5.fromByteArray(original.toByteArray());
+
+        assertEquals(decoded.segmentId(), 5L);
+        assertEquals(decoded.parentTopic(), "topic://tenant/ns/a");
+        assertEquals(decoded.positionVector().size(), 2);
+        assertNotNull(decoded.multiTopicVector());
+        assertEquals(decoded.multiTopicVector().keySet(),
+                java.util.Set.of("topic://tenant/ns/a", 
"topic://tenant/ns/b"));
+        assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/a"),
+                Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)));
+        assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/b"),
+                Map.of(0L, v4(3, 3, 0)));
+    }
+
+    @Test
+    public void testRoundtripPreservesNullMultiTopicVector() throws Exception {
+        // Single-topic ids leave both new fields null; we must not 
accidentally
+        // hydrate them on decode.
+        MessageIdV5 original = new MessageIdV5(v4(7, 8, 0), 1L,
+                Map.of(0L, v4(1, 2, 0)));
+
+        MessageIdV5 decoded = 
MessageIdV5.fromByteArray(original.toByteArray());
+
+        assertEquals(decoded.parentTopic(), null);
+        assertEquals(decoded.multiTopicVector(), null);
+    }
+
     @Test
     public void testFromByteArrayRejectsTooShort() {
         assertThrows(java.io.IOException.class, () -> 
MessageIdV5.fromByteArray(new byte[3]));

Reply via email to