This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17c58a53931 [improve][client] PIP-224 Part 1: Add TopicMessageId for 
seek and acknowledge (#19158)
17c58a53931 is described below

commit 17c58a539315cc4ea39655d4328c5caf55f87d3d
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jan 31 22:06:16 2023 +0800

    [improve][client] PIP-224 Part 1: Add TopicMessageId for seek and 
acknowledge (#19158)
---
 .../broker/service/PersistentFailoverE2ETest.java  |   5 +-
 .../broker/service/SubscriptionSeekTest.java       |   4 +-
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 134 +++++++++++++++++++--
 .../api/PartitionedProducerConsumerTest.java       |   3 +-
 .../apache/pulsar/client/impl/MessageIdTest.java   |   5 +-
 .../pulsar/client/impl/NegativeAcksTest.java       |   3 +-
 .../org/apache/pulsar/client/api/Consumer.java     |  18 +--
 .../pulsar/client/api/MessageAcknowledger.java     |   6 +
 .../apache/pulsar/client/api/TopicMessageId.java   |  91 ++++++++++++++
 .../pulsar/client/impl/BatchMessageIdImpl.java     |   5 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  43 +++----
 .../apache/pulsar/client/impl/MessageIdImpl.java   |  25 ++--
 .../client/impl/MultiTopicsConsumerImpl.java       | 115 +++++++++++-------
 .../pulsar/client/impl/NegativeAcksTracker.java    |   5 +-
 .../pulsar/client/impl/TopicMessageIdImpl.java     |  10 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |   7 +-
 .../impl/UnAckedTopicMessageRedeliveryTracker.java |   9 +-
 .../client/impl/UnAckedTopicMessageTracker.java    |   5 +-
 .../org/apache/pulsar/client/impl/MessageTest.java |   4 +-
 .../pulsar/functions/utils/FunctionCommon.java     |   5 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   |   5 +-
 .../tests/integration/semantics/SemanticsTest.java |   4 +-
 22 files changed, 373 insertions(+), 138 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 7be0590fe53..ffc1444676b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -337,7 +336,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer1.acknowledge(msg);
-            MessageIdImpl msgId = (MessageIdImpl) 
(((TopicMessageImpl)msg).getInnerMessageId());
+            MessageIdImpl msgId = 
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
             receivedPtns.add(msgId.getPartitionIndex());
         }
 
@@ -354,7 +353,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer2.acknowledge(msg);
-            MessageIdImpl msgId = (MessageIdImpl) 
(((TopicMessageImpl)msg).getInnerMessageId());
+            MessageIdImpl msgId = 
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
             receivedPtns.add(msgId.getPartitionIndex());
         }
         assertTrue(Sets.difference(listener1.inactivePtns, 
receivedPtns).isEmpty());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index b6f1771c088..2c2f62529d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.awaitility.Awaitility;
@@ -679,8 +678,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
             if (message == null) {
                 break;
             }
-            TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) 
message.getMessageId();
-            received.add(topicMessageId.getInnerMessageId());
+            
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
         }
         int msgNumFromPartition1 = list.size() / 2;
         int msgNumFromPartition2 = 1;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 6bd11de5a2f..b8ea87ab401 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -23,8 +23,15 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -32,34 +39,38 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import lombok.Cleanup;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class MultiTopicsConsumerTest extends ProducerConsumerBase {
-    private static final Logger log = 
LoggerFactory.getLogger(MultiTopicsConsumerTest.class);
     private ScheduledExecutorService internalExecutorServiceDelegate;
 
-    @BeforeMethod(alwaysRun = true)
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -231,4 +242,113 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
             Assert.assertEquals(consumer.batchReceive().size(), 1);
         });
     }
+
+    @Test(timeOut = 30000)
+    public void testAcknowledgeWrongMessageId() throws Exception {
+        final var topic1 = newTopicName();
+        final var topic2 = newTopicName();
+
+        @Cleanup final var singleTopicConsumer = pulsarClient.newConsumer()
+                .topic(topic1)
+                .subscriptionName("sub-1")
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        assertTrue(singleTopicConsumer instanceof ConsumerImpl);
+
+        @Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer()
+                .topics(List.of(topic1, topic2))
+                .subscriptionName("sub-2")
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);
+
+        @Cleanup final var producer = 
pulsarClient.newProducer().topic(topic1).create();
+        final var nonTopicMessageIds = new ArrayList<MessageId>();
+        nonTopicMessageIds.add(producer.send(new byte[]{ 0x00 }));
+        nonTopicMessageIds.add(singleTopicConsumer.receive().getMessageId());
+
+        // Multi-topics consumers can only acknowledge TopicMessageId, 
otherwise NotAllowedException will be thrown
+        for (var msgId : nonTopicMessageIds) {
+            assertFalse(msgId instanceof TopicMessageId);
+            
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
+                    () -> multiTopicsConsumer.acknowledge(msgId));
+            
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
+                    () -> 
multiTopicsConsumer.acknowledge(Collections.singletonList(msgId)));
+            
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
+                    () -> multiTopicsConsumer.acknowledgeCumulative(msgId));
+        }
+
+        // Single-topic consumer can acknowledge TopicMessageId
+        final var topicMessageId = 
multiTopicsConsumer.receive().getMessageId();
+        assertTrue(topicMessageId instanceof TopicMessageId);
+        assertFalse(topicMessageId instanceof MessageIdImpl);
+        singleTopicConsumer.acknowledge(topicMessageId);
+    }
+
+    @DataProvider
+    public static Object[][] messageIdFromProducer() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(timeOut = 30000, dataProvider = "messageIdFromProducer")
+    public void testSeekCustomTopicMessageId(boolean messageIdFromProducer) 
throws Exception {
+        final var topic = TopicName.get(newTopicName()).toString();
+        final var numPartitions = 3;
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+
+        @Cleanup final var producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .messageRouter(new MessageRouter() {
+                    int index = 0;
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return index++ % metadata.numPartitions();
+                    }
+                })
+                .create();
+        @Cleanup final var consumer = 
pulsarClient.newConsumer(Schema.INT32).topic(topic)
+                .subscriptionName("sub").subscribe();
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+
+        final var msgIds = new HashMap<String, List<MessageId>>();
+        final var numMessagesPerPartition = 10;
+        final var numMessages = numPartitions * numMessagesPerPartition;
+        for (int i = 0; i < numMessages; i++) {
+            var msgId = (MessageIdImpl) producer.send(i);
+            if (messageIdFromProducer) {
+                msgIds.computeIfAbsent(topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + msgId.getPartitionIndex(),
+                        __ -> new ArrayList<>()).add(msgId);
+            } else {
+                var topicMessageId = (TopicMessageId) 
consumer.receive().getMessageId();
+                msgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> 
new ArrayList<>()).add(topicMessageId);
+            }
+        }
+
+        final var partitions = IntStream.range(0, numPartitions)
+                .mapToObj(i -> topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)
+                .collect(Collectors.toSet());
+        assertEquals(msgIds.keySet(), partitions);
+
+        for (var partition : partitions) {
+            final var msgIdList = msgIds.get(partition);
+            assertEquals(msgIdList.size(), numMessagesPerPartition);
+            if (messageIdFromProducer) {
+                consumer.seek(TopicMessageId.create(partition, 
msgIdList.get(numMessagesPerPartition / 2)));
+            } else {
+                consumer.seek(msgIdList.get(numMessagesPerPartition / 2));
+            }
+        }
+
+        var topicMsgIds = new HashMap<String, List<TopicMessageId>>();
+        for (int i = 0; i < ((numMessagesPerPartition / 2 - 1) * 
numPartitions); i++) {
+            TopicMessageId topicMessageId = (TopicMessageId) 
consumer.receive().getMessageId();
+            topicMsgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> 
new ArrayList<>()).add(topicMessageId);
+        }
+        assertEquals(topicMsgIds.keySet(), partitions);
+        for (var partition : partitions) {
+            assertEquals(topicMsgIds.get(partition),
+                    msgIds.get(partition).subList(numMessagesPerPartition / 2 
+ 1, numMessagesPerPartition));
+        }
+        consumer.close();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index e0e1bf20e7c..cd384e58789 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
@@ -768,7 +767,7 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
             for (int i = 0; i < totalMessages; i ++) {
                 msg = consumer1.receive(5, TimeUnit.SECONDS);
-                
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(),
 2);
+                
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(),
 2);
                 consumer1.acknowledge(msg);
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index c4cdcbd19d5..ceb5c51e6aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -119,7 +119,7 @@ public class MessageIdTest extends BrokerTestBase {
             assertEquals(new String(message.getData()), messagePrefix + i);
             MessageId messageId = message.getMessageId();
             if (topicType == TopicType.PARTITIONED) {
-                messageId = ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
+                messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
             }
             assertTrue(messageIds.remove(messageId), "Failed to receive 
message");
         }
@@ -166,9 +166,6 @@ public class MessageIdTest extends BrokerTestBase {
 
         for (int i = 0; i < numberOfMessages; i++) {
             MessageId messageId = consumer.receive().getMessageId();
-            if (topicType == TopicType.PARTITIONED) {
-                messageId = ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
-            }
             assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
         }
         log.info("Remaining message IDs = {}", messageIds);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 679d6c1a19e..b4d01e263bc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.awaitility.Awaitility;
 import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
@@ -292,7 +293,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
                 .subscribe();
 
         MessageId messageId = new MessageIdImpl(3, 1, 0);
-        TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", 
"topic-1", messageId);
+        TopicMessageId topicMessageId = TopicMessageId.create("topic-1", 
messageId);
         BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
         BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 
1);
         BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 
2);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 9a3ef7833df..69409900496 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -471,7 +471,9 @@ public interface Consumer<T> extends Closeable, 
MessageAcknowledger {
      * <li><code>MessageId.latest</code> : Reset the subscription on the 
latest message in the topic
      * </ul>
      *
-     * <p>Note: For multi-topics consumer, you can only seek to the earliest 
or latest message.
+     * <p>Note: For multi-topics consumer, if `messageId` is a {@link 
TopicMessageId}, the seek operation will happen
+     * on the owner topic of the message, which is returned by {@link 
TopicMessageId#getOwnerTopic()}. Otherwise, you
+     * can only seek to the earliest or latest message for all topics 
subscribed.
      *
      * @param messageId
      *            the message id where to reposition the subscription
@@ -519,19 +521,7 @@ public interface Consumer<T> extends Closeable, 
MessageAcknowledger {
     CompletableFuture<Void> seekAsync(Function<String, Object> function);
 
     /**
-     * Reset the subscription associated with this consumer to a specific 
message id.
-     *
-     * <p>The message id can either be a specific message or represent the 
first or last messages in the topic.
-     * <ul>
-     * <li><code>MessageId.earliest</code> : Reset the subscription on the 
earliest message available in the topic
-     * <li><code>MessageId.latest</code> : Reset the subscription on the 
latest message in the topic
-     * </ul>
-     *
-     * <p>Note: For multi-topics consumer, you can only seek to the earliest 
or latest message.
-     *
-     * @param messageId
-     *            the message id where to reposition the subscription
-     * @return a future to track the completion of the seek operation
+     * The asynchronous version of {@link Consumer#seek(MessageId)}.
      */
     CompletableFuture<Void> seekAsync(MessageId messageId);
 
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
index c0a53983c5a..d1bab3abb5b 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
@@ -49,6 +49,8 @@ public interface MessageAcknowledger {
      *
      * @throws PulsarClientException.AlreadyClosedException}
      *             if the consumer was already closed
+     * @throws PulsarClientException.NotAllowedException
+     *             if `messageId` is not a {@link TopicMessageId} when 
multiple topics are subscribed
      */
     void acknowledge(MessageId messageId) throws PulsarClientException;
 
@@ -59,6 +61,8 @@ public interface MessageAcknowledger {
     /**
      * Acknowledge the consumption of a list of message.
      * @param messageIdList the list of message IDs.
+     * @throws PulsarClientException.NotAllowedException
+     *     if any message id in the list is not a {@link TopicMessageId} when 
multiple topics are subscribed
      */
     void acknowledge(List<MessageId> messageIdList) throws 
PulsarClientException;
 
@@ -82,6 +86,8 @@ public interface MessageAcknowledger {
      *            The {@code MessageId} to be cumulatively acknowledged
      * @throws PulsarClientException.AlreadyClosedException
      *             if the consumer was already closed
+     * @throws PulsarClientException.NotAllowedException
+     *             if `messageId` is not a {@link TopicMessageId} when 
multiple topics are subscribed
      */
     void acknowledgeCumulative(MessageId messageId) throws 
PulsarClientException;
 
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
new file mode 100644
index 00000000000..f6109d5f8e8
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The MessageId used for a consumer that subscribes multiple topics or 
partitioned topics.
+ *
+ * <p>
+ * It's guaranteed that {@link Message#getMessageId()} must return a 
TopicMessageId instance if the Message is received
+ * from a consumer that subscribes multiple topics or partitioned topics.
+ * The topic name used in APIs related to this class like `getOwnerTopic` and 
`create` must be the full topic name. For
+ * example, "my-topic" is invalid while "persistent://public/default/my-topic" 
is valid.
+ * If the topic is a partitioned topic, the topic name should be the name of 
the specific partition, e.g.
+ * "persistent://public/default/my-topic-partition-0".
+ * </p>
+ */
+public interface TopicMessageId extends MessageId {
+
+    /**
+     * Return the owner topic name of a message.
+     *
+     * @return the owner topic
+     */
+    String getOwnerTopic();
+
+    static TopicMessageId create(String topic, MessageId messageId) {
+        if (messageId instanceof TopicMessageId) {
+            return (TopicMessageId) messageId;
+        }
+        return new Impl(topic, messageId);
+    }
+
+    /**
+     * The simplest implementation of a TopicMessageId interface.
+     */
+    class Impl implements TopicMessageId {
+        private final String topic;
+        private final MessageId messageId;
+
+        public Impl(String topic, MessageId messageId) {
+            this.topic = topic;
+            this.messageId = messageId;
+        }
+
+        @Override
+        public byte[] toByteArray() {
+            return messageId.toByteArray();
+        }
+
+        @Override
+        public String getOwnerTopic() {
+            return topic;
+        }
+
+        @Override
+        public int compareTo(MessageId o) {
+            return messageId.compareTo(o);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return messageId.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return messageId.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return messageId.toString();
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index 7e3a143dff8..ed28082ff6a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import javax.annotation.Nonnull;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
 
 /**
  */
@@ -77,8 +78,8 @@ public class BatchMessageIdImpl extends MessageIdImpl {
                 this.ledgerId, this.entryId, this.partitionIndex, 
this.batchIndex,
                 other.ledgerId, other.entryId, other.partitionIndex, batchIndex
             );
-        } else if (o instanceof TopicMessageIdImpl) {
-            return compareTo(((TopicMessageIdImpl) o).getInnerMessageId());
+        } else if (o instanceof TopicMessageId) {
+            return compareTo(MessageIdImpl.convertToMessageIdImpl(o));
         } else {
             throw new UnsupportedOperationException("Unknown MessageId type: " 
+ o.getClass().getName());
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8fef7399836..a59a67adbaa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -533,7 +534,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String, Long> 
properties,
                                                     TransactionImpl txn) {
-        checkArgument(messageId instanceof MessageIdImpl);
+        messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
             PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
@@ -590,10 +591,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     .InvalidMessageException("Cannot handle message with null 
messageId"));
         }
 
-        if (messageId instanceof TopicMessageIdImpl) {
-            messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
-        }
-        checkArgument(messageId instanceof MessageIdImpl);
+        messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
             PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
@@ -629,7 +627,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (retryLetterProducer != null) {
             try {
                 MessageImpl<T> retryMessage = (MessageImpl<T>) 
getMessageImpl(message);
-                String originMessageIdStr = getOriginMessageIdStr(message);
+                String originMessageIdStr = message.getMessageId().toString();
                 String originTopicNameStr = getOriginTopicNameStr(message);
                 SortedMap<String, String> propertiesMap =
                         getPropertiesMap(message, originMessageIdStr, 
originTopicNameStr);
@@ -721,22 +719,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return propertiesMap;
     }
 
-    private String getOriginMessageIdStr(Message<?> message) {
-        if (message instanceof TopicMessageImpl) {
-            return ((TopicMessageIdImpl) 
message.getMessageId()).getInnerMessageId().toString();
-        } else if (message instanceof MessageImpl) {
-            return message.getMessageId().toString();
-        }
-        return null;
-    }
-
     private String getOriginTopicNameStr(Message<?> message) {
-        if (message instanceof TopicMessageImpl) {
-            return ((TopicMessageIdImpl) 
message.getMessageId()).getTopicName();
-        } else if (message instanceof MessageImpl) {
+        MessageId messageId = message.getMessageId();
+        if (messageId instanceof TopicMessageId) {
+            String topic = ((TopicMessageId) messageId).getOwnerTopic();
+            int index = topic.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
+            if (index < 0) {
+                return topic;
+            } else {
+                return topic.substring(0, index);
+            }
+        } else {
             return message.getTopicName();
         }
-        return null;
     }
 
     private MessageImpl<?> getMessageImpl(Message<?> message) {
@@ -2008,7 +2003,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             MessageIdImpl finalMessageId = messageId;
             deadLetterProducer.thenAcceptAsync(producerDLQ -> {
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
-                    String originMessageIdStr = getOriginMessageIdStr(message);
+                    String originMessageIdStr = 
message.getMessageId().toString();
                     String originTopicNameStr = getOriginTopicNameStr(message);
                     TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                             
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
@@ -2177,7 +2172,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+    public CompletableFuture<Void> seekAsync(MessageId originalMessageId) {
+        final MessageIdImpl messageId = 
MessageIdImpl.convertToMessageIdImpl(originalMessageId);
         String seekBy = String.format("the message %s", messageId.toString());
         return seekAsyncCheckState(seekBy).orElseGet(() -> {
             long requestId = client.newRequestId();
@@ -2197,8 +2193,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 seek = Commands.newSeek(consumerId, requestId, 
msgId.getFirstChunkMessageId().getLedgerId(),
                         msgId.getFirstChunkMessageId().getEntryId(), new 
long[0]);
             } else {
-                MessageIdImpl msgId = (MessageIdImpl) messageId;
-                seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+                seek = Commands.newSeek(
+                        consumerId, requestId, messageId.getLedgerId(), 
messageId.getEntryId(), new long[0]);
             }
             return seekAsyncInternal(requestId, seek, messageId, seekBy);
         });
@@ -2573,6 +2569,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.connectionHandler.grabCnx();
     }
 
+    @Deprecated
     public String getTopicNameWithoutPartition() {
         return topicNameWithoutPartition;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 02298e0f9d6..1a0f491a6a7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -27,7 +27,9 @@ import java.io.IOException;
 import java.util.Objects;
 import javax.annotation.Nonnull;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.common.naming.TopicName;
 
 public class MessageIdImpl implements MessageId {
@@ -116,15 +118,20 @@ public class MessageIdImpl implements MessageId {
         return messageId;
     }
 
+    @InterfaceStability.Unstable
     public static MessageIdImpl convertToMessageIdImpl(MessageId messageId) {
-        if (messageId instanceof BatchMessageIdImpl) {
-            return (BatchMessageIdImpl) messageId;
-        } else if (messageId instanceof MessageIdImpl) {
-            return (MessageIdImpl) messageId;
-        } else if (messageId instanceof TopicMessageIdImpl) {
-            return convertToMessageIdImpl(((TopicMessageIdImpl) 
messageId).getInnerMessageId());
+        if (messageId instanceof TopicMessageId) {
+            if (messageId instanceof TopicMessageIdImpl) {
+                return (MessageIdImpl) ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
+            } else {
+                try {
+                    return (MessageIdImpl) 
MessageId.fromByteArray(messageId.toByteArray());
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
-        return null;
+        return (MessageIdImpl) messageId;
     }
 
     public static MessageId fromByteArrayWithTopic(byte[] data, String 
topicName) throws IOException {
@@ -210,8 +217,8 @@ public class MessageIdImpl implements MessageId {
                 this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH,
                 other.ledgerId, other.entryId, other.partitionIndex, batchIndex
             );
-        } else if (o instanceof TopicMessageIdImpl) {
-            return compareTo(((TopicMessageIdImpl) o).getInnerMessageId());
+        } else if (o instanceof TopicMessageId) {
+            return compareTo(convertToMessageIdImpl(o));
         } else {
             throw new UnsupportedOperationException("Unknown MessageId type: " 
+ o.getClass().getName());
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 224276ba5f0..341a91e9734 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
@@ -290,8 +291,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // Must be called from the internalPinnedExecutor thread
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) 
{
         checkArgument(message instanceof MessageImpl);
-        TopicMessageImpl<T> topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(),
-                consumer.getTopicNameWithoutPartition(), message, consumer);
+        TopicMessageImpl<T> topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(), message, consumer);
 
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message from topics-consumer {}",
@@ -443,26 +443,26 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String, Long> 
properties,
                                                     TransactionImpl txnImpl) {
-        checkArgument(messageId instanceof TopicMessageIdImpl);
-        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+        if (!(messageId instanceof TopicMessageId)) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                    "Only TopicMessageId is allowed to acknowledge for a 
multi-topics consumer, while messageId is "
+                            + messageId.getClass().getName()));
+        }
 
         if (getState() != State.Ready) {
             return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
         }
 
+        TopicMessageId topicMessageId = (TopicMessageId) messageId;
+        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getOwnerTopic());
+        if (consumer == null) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
+        }
+        MessageId innerMessageId = 
MessageIdImpl.convertToMessageIdImpl(topicMessageId);
         if (ackType == AckType.Cumulative) {
-            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
-            if (individualConsumer != null) {
-                MessageId innerId = topicMessageId.getInnerMessageId();
-                return individualConsumer.acknowledgeCumulativeAsync(innerId);
-            } else {
-                return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
-            }
+            return consumer.acknowledgeCumulativeAsync(innerMessageId);
         } else {
-            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
-
-            MessageId innerId = topicMessageId.getInnerMessageId();
-            return consumer.doAcknowledgeWithTxn(innerId, ackType, properties, 
txnImpl)
+            return consumer.doAcknowledgeWithTxn(innerMessageId, ackType, 
properties, txnImpl)
                 .thenRun(() ->
                     unAckedMessageTracker.remove(topicMessageId));
         }
@@ -473,31 +473,34 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                                                     AckType ackType,
                                                     Map<String, Long> 
properties,
                                                     TransactionImpl txn) {
+        for (MessageId messageId : messageIdList) {
+            if (!(messageId instanceof TopicMessageId)) {
+                return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                        "Only TopicMessageId is allowed to acknowledge for a 
multi-topics consumer, while messageId is "
+                                + messageId.getClass().getName()));
+            }
+        }
         List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
         if (ackType == AckType.Cumulative) {
             messageIdList.forEach(messageId -> 
resultFutures.add(doAcknowledge(messageId, ackType, properties, txn)));
-            return CompletableFuture.allOf(resultFutures.toArray(new 
CompletableFuture[0]));
         } else {
             if (getState() != State.Ready) {
                 return FutureUtil.failedFuture(new 
PulsarClientException("Consumer already closed"));
             }
             Map<String, List<MessageId>> topicToMessageIdMap = new HashMap<>();
             for (MessageId messageId : messageIdList) {
-                if (!(messageId instanceof TopicMessageIdImpl)) {
-                    return FutureUtil.failedFuture(
-                            new IllegalArgumentException("messageId is not 
instance of TopicMessageIdImpl"));
-                }
-                TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) 
messageId;
-                
topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new 
ArrayList<>());
-                
topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId());
+                TopicMessageId topicMessageId = (TopicMessageId) messageId;
+                
topicToMessageIdMap.putIfAbsent(topicMessageId.getOwnerTopic(), new 
ArrayList<>());
+                topicToMessageIdMap.get(topicMessageId.getOwnerTopic())
+                        
.add(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
             }
             topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> {
                 ConsumerImpl<T> consumer = consumers.get(topicPartitionName);
                 resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, 
ackType, properties, txn)
                         .thenAccept((res) -> 
messageIdList.forEach(unAckedMessageTracker::remove)));
             });
-            return CompletableFuture.allOf(resultFutures.toArray(new 
CompletableFuture[0]));
         }
+        return CompletableFuture.allOf(resultFutures.toArray(new 
CompletableFuture[0]));
     }
 
     @Override
@@ -510,21 +513,25 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             return FutureUtil.failedFuture(new PulsarClientException
                     .InvalidMessageException("Cannot handle message with null 
messageId"));
         }
-        checkArgument(messageId instanceof TopicMessageIdImpl);
-        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+        if (!(messageId instanceof TopicMessageId)) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                    "Only TopicMessageId is allowed for reconsumeLater for a 
multi-topics consumer, while messageId is "
+                            + message.getClass().getName()));
+        }
+        TopicMessageId topicMessageId = (TopicMessageId) messageId;
         if (getState() != State.Ready) {
             return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
         }
 
         if (ackType == AckType.Cumulative) {
-            Consumer<?> individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
+            Consumer<T> individualConsumer = 
consumers.get(topicMessageId.getOwnerTopic());
             if (individualConsumer != null) {
                 return 
individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
             } else {
                 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
             }
         } else {
-            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
+            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getOwnerTopic());
             return consumer.doReconsumeLater(message, ackType, 
customProperties, delayTime, unit)
                      .thenRun(() 
->unAckedMessageTracker.remove(topicMessageId));
         }
@@ -532,20 +539,18 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void negativeAcknowledge(MessageId messageId) {
-        checkArgument(messageId instanceof TopicMessageIdImpl);
-        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+        checkArgument(messageId instanceof TopicMessageId);
+        TopicMessageId topicMessageId = (TopicMessageId) messageId;
 
-        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
-        consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
+        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getOwnerTopic());
+        
consumer.negativeAcknowledge(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
     }
 
     @Override
     public void negativeAcknowledge(Message<?> message) {
         MessageId messageId = message.getMessageId();
-        checkArgument(messageId instanceof TopicMessageIdImpl);
-        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
-
-        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
+        checkArgument(messageId instanceof TopicMessageId);
+        ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(message);
     }
 
@@ -680,7 +685,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             return;
         }
 
-        checkArgument(messageIds.stream().findFirst().get() instanceof 
TopicMessageIdImpl);
+        for (MessageId messageId : messageIds) {
+            checkArgument(messageId instanceof TopicMessageId);
+        }
 
         if (conf.getSubscriptionType() != SubscriptionType.Shared
                 && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
@@ -689,12 +696,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             return;
         }
         removeExpiredMessagesFromQueue(messageIds);
-        messageIds.stream().map(messageId -> (TopicMessageIdImpl) messageId)
-            
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, 
Collectors.toSet()))
+        messageIds.stream().map(messageId -> (TopicMessageId) messageId)
+            .collect(Collectors.groupingBy(TopicMessageId::getOwnerTopic, 
Collectors.toSet()))
             .forEach((topicName, messageIds1) ->
                 consumers.get(topicName)
                     .redeliverUnacknowledgedMessages(messageIds1.stream()
-                        .map(mid -> 
mid.getInnerMessageId()).collect(Collectors.toSet())));
+                        
.map(MessageIdImpl::convertToMessageIdImpl).collect(Collectors.toSet())));
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
@@ -748,19 +755,35 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        MessageIdImpl targetMessageId = 
MessageIdImpl.convertToMessageIdImpl(messageId);
-        if (targetMessageId == null || 
isIllegalMultiTopicsMessageId(messageId)) {
+        final Consumer<T> internalConsumer;
+        if (messageId instanceof TopicMessageId) {
+            TopicMessageId topicMessageId = (TopicMessageId) messageId;
+            internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
+            if (internalConsumer == null) {
+                return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                        "The owner topic " + topicMessageId.getOwnerTopic() + 
" is not subscribed"));
+            }
+        } else {
+            internalConsumer = null;
+        }
+        if (internalConsumer == null && 
isIllegalMultiTopicsMessageId(messageId)) {
             return FutureUtil.failedFuture(
                     new PulsarClientException("Illegal messageId, messageId 
can only be earliest/latest")
             );
         }
-        List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
-        consumers.values().forEach(consumerImpl -> 
futures.add(consumerImpl.seekAsync(targetMessageId)));
+
+        final CompletableFuture<Void> seekFuture;
+        if (internalConsumer == null) {
+            List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
+            consumers.values().forEach(consumerImpl -> 
futures.add(consumerImpl.seekAsync(messageId)));
+            seekFuture = FutureUtil.waitForAll(futures);
+        } else {
+            seekFuture = internalConsumer.seekAsync(messageId);
+        }
 
         unAckedMessageTracker.clear();
         clearIncomingMessages();
-
-        return FutureUtil.waitForAll(futures);
+        return seekFuture;
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 8680f0f0e6c..70d57db3bb6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -95,10 +95,7 @@ class NegativeAcksTracker implements Closeable {
     }
 
     private synchronized void add(MessageId messageId, int redeliveryCount) {
-        if (messageId instanceof TopicMessageIdImpl) {
-            TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
-            messageId = topicMessageId.getInnerMessageId();
-        }
+        messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
 
         if (messageId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index c20960950d5..941f18cf65a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -19,8 +19,9 @@
 package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
 
-public class TopicMessageIdImpl implements MessageId {
+public class TopicMessageIdImpl implements TopicMessageId {
 
     /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
     private final String topicPartitionName;
@@ -37,6 +38,7 @@ public class TopicMessageIdImpl implements MessageId {
      * Get the topic name without partition part of this message.
      * @return the name of the topic on which this message was published
      */
+    @Deprecated
     public String getTopicName() {
         return this.topicName;
     }
@@ -45,6 +47,7 @@ public class TopicMessageIdImpl implements MessageId {
      * Get the topic name which contains partition part for this message.
      * @return the topic name which contains Partition part
      */
+    @Deprecated
     public String getTopicPartitionName() {
         return this.topicPartitionName;
     }
@@ -77,4 +80,9 @@ public class TopicMessageIdImpl implements MessageId {
     public int compareTo(MessageId o) {
         return messageId.compareTo(o);
     }
+
+    @Override
+    public String getOwnerTopic() {
+        return topicPartitionName;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index f6c33cc930f..c3fcb0a16a3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -36,14 +36,13 @@ public class TopicMessageImpl<T> implements Message<T> {
     final ConsumerImpl receivedByconsumer;
 
     TopicMessageImpl(String topicPartitionName,
-                     String topicName,
                      Message<T> msg,
                      ConsumerImpl receivedByConsumer) {
         this.topicPartitionName = topicPartitionName;
         this.receivedByconsumer = receivedByConsumer;
 
         this.msg = msg;
-        this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, 
msg.getMessageId());
+        this.messageId = new TopicMessageIdImpl(topicPartitionName, 
topicPartitionName, msg.getMessageId());
     }
 
     /**
@@ -59,6 +58,7 @@ public class TopicMessageImpl<T> implements Message<T> {
      * Get the topic name which contains partition part for this message.
      * @return the topic name which contains Partition part
      */
+    @Deprecated
     public String getTopicPartitionName() {
         return topicPartitionName;
     }
@@ -68,8 +68,9 @@ public class TopicMessageImpl<T> implements Message<T> {
         return messageId;
     }
 
+    @Deprecated
     public MessageId getInnerMessageId() {
-        return messageId.getInnerMessageId();
+        return MessageIdImpl.convertToMessageIdImpl(messageId);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
index 907e6109e19..823dd4ad5f4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedeliveryTracker {
@@ -41,8 +42,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedelive
                 Entry<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>> 
entry = iterator.next();
                 UnackMessageIdWrapper messageIdWrapper = entry.getKey();
                 MessageId messageId = messageIdWrapper.getMessageId();
-                if (messageId instanceof TopicMessageIdImpl
-                        && ((TopicMessageIdImpl) 
messageId).getTopicPartitionName().contains(topicName)) {
+                if (messageId instanceof TopicMessageId
+                        && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
                     HashSet<UnackMessageIdWrapper> exist = 
redeliveryMessageIdPartitionMap.get(messageIdWrapper);
                     entry.getValue().remove(messageIdWrapper);
                     iterator.remove();
@@ -54,8 +55,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedelive
             Iterator<MessageId> iteratorAckTimeOut = 
ackTimeoutMessages.keySet().iterator();
             while (iterator.hasNext()) {
                 MessageId messageId = iteratorAckTimeOut.next();
-                if (messageId instanceof TopicMessageIdImpl
-                        && ((TopicMessageIdImpl) 
messageId).getTopicPartitionName().contains(topicName)) {
+                if (messageId instanceof TopicMessageId
+                        && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
                     iterator.remove();
                     removed++;
                 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index e8e80c5e690..1cbab584404 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
@@ -39,8 +40,8 @@ public class UnAckedTopicMessageTracker extends 
UnAckedMessageTracker {
             while (iterator.hasNext()) {
                 Entry<MessageId, HashSet<MessageId>> entry = iterator.next();
                 MessageId messageId = entry.getKey();
-                if (messageId instanceof TopicMessageIdImpl
-                        && ((TopicMessageIdImpl) 
messageId).getTopicPartitionName().contains(topicName)) {
+                if (messageId instanceof TopicMessageId
+                        && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
                     entry.getValue().remove(messageId);
                     iterator.remove();
                     removed++;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index ad56df918c0..feb4539971a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -63,7 +63,7 @@ public class MessageTest {
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
         MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
         msg.setMessageId(new MessageIdImpl(-1, -1, -1));
-        TopicMessageImpl<byte[]> topicMessage = new 
TopicMessageImpl<>(topicName, topicName, msg, null);
+        TopicMessageImpl<byte[]> topicMessage = new 
TopicMessageImpl<>(topicName, msg, null);
 
         assertTrue(topicMessage.isReplicated());
         assertEquals(msg.getReplicatedFrom(), from);
@@ -76,7 +76,7 @@ public class MessageTest {
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
         MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
         msg.setMessageId(new MessageIdImpl(-1, -1, -1));
-        TopicMessageImpl<byte[]> topicMessage = new 
TopicMessageImpl<>(topicName, topicName, msg, null);
+        TopicMessageImpl<byte[]> topicMessage = new 
TopicMessageImpl<>(topicName, msg, null);
 
         assertFalse(topicMessage.isReplicated());
         assertNull(topicMessage.getReplicatedFrom());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 96f6f4708a7..bda99a39478 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -47,7 +47,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -315,9 +314,7 @@ public class FunctionCommon {
     }
 
     public static final long getSequenceId(MessageId messageId) {
-        MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof 
TopicMessageIdImpl)
-                ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
-                : messageId);
+        MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(messageId);
         long ledgerId = msgId.getLedgerId();
         long entryId = msgId.getEntryId();
 
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 1aa99722512..579b4233399 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -292,8 +293,8 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
 
     private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
-        MessageId msgId = 
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
-                topic.toString());
+        TopicMessageId msgId = TopicMessageId.create(topic.toString(),
+                
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
         if (log.isDebugEnabled()) {
             log.debug("[{}/{}] Received ack request of message {} from {} ", 
consumer.getTopic(),
                     subscription, msgId, 
getRemote().getInetSocketAddress().toString());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 0d523ff56ea..76f09675245 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -40,8 +40,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
@@ -219,7 +219,7 @@ public class SemanticsTest extends PulsarTestSuite {
                     Message<String> m = consumer.receive();
                     int topicIdx;
                     if (numTopics > 1) {
-                        String topic = ((TopicMessageIdImpl) 
m.getMessageId()).getTopicPartitionName();
+                        String topic = ((TopicMessageId) 
m.getMessageId()).getOwnerTopic();
 
                         String[] topicParts = StringUtils.split(topic, '-');
                         topicIdx = 
Integer.parseInt(topicParts[topicParts.length - 1]);

Reply via email to