This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c50a6c2e91 [improve][client] PIP-229: Add a common interface to get 
fields of MessageIdData (#19414)
8c50a6c2e91 is described below

commit 8c50a6c2e91c81dbf187ce5e66cb39e2758a741e
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Apr 4 15:52:34 2023 +0800

    [improve][client] PIP-229: Add a common interface to get fields of 
MessageIdData (#19414)
---
 .../broker/service/PersistentFailoverE2ETest.java  |   8 +-
 .../broker/service/SubscriptionSeekTest.java       |   2 +-
 .../pulsar/client/api/CustomMessageIdTest.java     | 142 +++++++++++++
 .../api/PartitionedProducerConsumerTest.java       |   2 +-
 .../apache/pulsar/client/impl/ConsumerAckTest.java |   4 +-
 .../apache/pulsar/client/impl/MessageIdTest.java   |   3 -
 .../org/apache/pulsar/client/api/MessageIdAdv.java | 122 +++++++++++
 .../apache/pulsar/client/api/TopicMessageId.java   |  43 +++-
 .../impl/AcknowledgmentsGroupingTracker.java       |   2 +-
 .../pulsar/client/impl/BatchMessageAcker.java      |  95 ---------
 .../client/impl/BatchMessageAckerDisabled.java     |  50 -----
 .../pulsar/client/impl/BatchMessageIdImpl.java     |  82 +++-----
 .../pulsar/client/impl/ChunkMessageIdImpl.java     |   9 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  15 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 222 ++++++++-------------
 .../pulsar/client/impl/MessageIdAdvUtils.java      |  74 +++++++
 .../apache/pulsar/client/impl/MessageIdImpl.java   |  71 +------
 .../org/apache/pulsar/client/impl/MessageImpl.java |   8 +-
 .../client/impl/MessagePayloadContextImpl.java     |   9 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  44 ++--
 .../pulsar/client/impl/NegativeAcksTracker.java    |  10 +-
 ...NonPersistentAcknowledgmentGroupingTracker.java |   2 +-
 .../PersistentAcknowledgmentsGroupingTracker.java  | 158 +++++++--------
 .../apache/pulsar/client/impl/ResetCursorData.java |  20 +-
 .../pulsar/client/impl/TopicMessageIdImpl.java     |  39 +---
 .../pulsar/client/impl/TopicMessageImpl.java       |   2 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   3 +-
 .../src/main/resources/findbugsExclude.xml         |   5 +
 .../impl/AcknowledgementsGroupingTrackerTest.java  |  13 +-
 .../client/impl/BatchMessageAckerDisabledTest.java |  47 -----
 .../pulsar/client/impl/BatchMessageAckerTest.java  |  83 --------
 .../pulsar/client/impl/BatchMessageIdImplTest.java |  33 +--
 .../client/impl/MessageIdSerializationTest.java    |   3 +-
 .../pulsar/functions/utils/FunctionCommon.java     |   3 +-
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  36 ++--
 35 files changed, 686 insertions(+), 778 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index b263d4448d8..f5895ec3761 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -41,11 +41,11 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -370,8 +370,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer1.acknowledge(msg);
-            MessageIdImpl msgId = 
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
-            receivedPtns.add(msgId.getPartitionIndex());
+            receivedPtns.add(((MessageIdAdv) 
msg.getMessageId()).getPartitionIndex());
         }
 
         assertTrue(Sets.difference(listener1.activePtns, 
receivedPtns).isEmpty());
@@ -387,8 +386,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer2.acknowledge(msg);
-            MessageIdImpl msgId = 
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
-            receivedPtns.add(msgId.getPartitionIndex());
+            receivedPtns.add(((MessageIdAdv) 
msg.getMessageId()).getPartitionIndex());
         }
         assertTrue(Sets.difference(listener1.inactivePtns, 
receivedPtns).isEmpty());
         assertTrue(Sets.difference(listener2.activePtns, 
receivedPtns).isEmpty());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 93f2a42bcda..b11946069c9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -678,7 +678,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
             if (message == null) {
                 break;
             }
-            
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
+            received.add(message.getMessageId());
         }
         int msgNumFromPartition1 = list.size() / 2;
         int msgNumFromPartition2 = 1;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
new file mode 100644
index 00000000000..52bfc9dda37
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class CustomMessageIdTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider
+    public static Object[][] enableBatching() {
+        return new Object[][]{
+                { true },
+                { false }
+        };
+    }
+
+    @Test
+    public void testSeek() throws Exception {
+        final var topic = "persistent://my-property/my-ns/test-seek-" + 
System.currentTimeMillis();
+        @Cleanup final var producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).create();
+        final var msgIds = new ArrayList<SimpleMessageIdImpl>();
+        for (int i = 0; i < 10; i++) {
+            msgIds.add(new SimpleMessageIdImpl((MessageIdAdv) 
producer.send(i)));
+        }
+        @Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic).subscriptionName("sub").subscribe();
+        consumer.seek(msgIds.get(6));
+        final var msg = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertEquals(msg.getValue(), 7);
+    }
+
+    @Test(dataProvider = "enableBatching")
+    public void testAcknowledgment(boolean enableBatching) throws Exception {
+        final var topic = "persistent://my-property/my-ns/test-ack-"
+                + enableBatching + System.currentTimeMillis();
+        final var producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(enableBatching)
+                .batchingMaxMessages(10)
+                .batchingMaxPublishDelay(300, TimeUnit.MILLISECONDS)
+                .create();
+        final var consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .enableBatchIndexAcknowledgment(true)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        for (int i = 0; i < 10; i++) {
+            producer.sendAsync(i);
+        }
+        final var msgIds = new ArrayList<SimpleMessageIdImpl>();
+        for (int i = 0; i < 10; i++) {
+            final var msg = consumer.receive();
+            final var msgId = new SimpleMessageIdImpl((MessageIdAdv) 
msg.getMessageId());
+            msgIds.add(msgId);
+            if (enableBatching) {
+                assertTrue(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() 
> 0);
+            } else {
+                assertFalse(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() 
> 0);
+            }
+        }
+        consumer.acknowledgeCumulative(msgIds.get(8));
+        consumer.redeliverUnacknowledgedMessages();
+        final var msg = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertEquals(msg.getValue(), 9);
+    }
+
+    private record SimpleMessageIdImpl(long ledgerId, long entryId, int 
batchIndex, int batchSize)
+            implements MessageIdAdv {
+
+        public SimpleMessageIdImpl(MessageIdAdv msgId) {
+            this(msgId.getLedgerId(), msgId.getEntryId(), 
msgId.getBatchIndex(), msgId.getBatchSize());
+        }
+
+        @Override
+        public byte[] toByteArray() {
+            return new byte[0]; // never used
+        }
+
+        @Override
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        @Override
+        public long getEntryId() {
+            return entryId;
+        }
+
+        @Override
+        public int getBatchIndex() {
+            return batchIndex;
+        }
+
+        @Override
+        public int getBatchSize() {
+            return batchSize;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index cd384e58789..13ae991a0e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -767,7 +767,7 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
             for (int i = 0; i < totalMessages; i ++) {
                 msg = consumer1.receive(5, TimeUnit.SECONDS);
-                
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(),
 2);
+                Assert.assertEquals(((MessageIdAdv) 
msg.getMessageId()).getPartitionIndex(), 2);
                 consumer1.acknowledge(msg);
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index 42da6090648..a83283bc267 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -176,10 +176,10 @@ public class ConsumerAckTest extends ProducerConsumerBase 
{
             messageIds.add(message.getMessageId());
         }
         MessageId firstEntryMessageId = messageIds.get(0);
-        MessageId secondEntryMessageId = ((BatchMessageIdImpl) 
messageIds.get(1)).toMessageIdImpl();
+        MessageId secondEntryMessageId = 
MessageIdAdvUtils.discardBatch(messageIds.get(1));
         // Verify messages 2 to N must be in the same entry
         for (int i = 2; i < messageIds.size(); i++) {
-            assertEquals(((BatchMessageIdImpl) 
messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
+            assertEquals(MessageIdAdvUtils.discardBatch(messageIds.get(i)), 
secondEntryMessageId);
         }
 
         assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index ceb5c51e6aa..375bbff8a4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -118,9 +118,6 @@ public class MessageIdTest extends BrokerTestBase {
             Message<byte[]> message = consumer.receive();
             assertEquals(new String(message.getData()), messagePrefix + i);
             MessageId messageId = message.getMessageId();
-            if (topicType == TopicType.PARTITIONED) {
-                messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
-            }
             assertTrue(messageIds.remove(messageId), "Failed to receive 
message");
         }
         log.info("Remaining message IDs = {}", messageIds);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
new file mode 100644
index 00000000000..73ecfed0ad0
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.BitSet;
+
+/**
+ * The {@link MessageId} interface provided for advanced users.
+ * <p>
+ * All built-in MessageId implementations should be able to be cast to 
MessageIdAdv.
+ * </p>
+ */
+public interface MessageIdAdv extends MessageId {
+
+    /**
+     * Get the ledger ID.
+     *
+     * @return the ledger ID
+     */
+    long getLedgerId();
+
+    /**
+     * Get the entry ID.
+     *
+     * @return the entry ID
+     */
+    long getEntryId();
+
+    /**
+     * Get the partition index.
+     *
+     * @return -1 if the message is from a non-partitioned topic, otherwise 
the non-negative partition index
+     */
+    default int getPartitionIndex() {
+        return -1;
+    }
+
+    /**
+     * Get the batch index.
+     *
+     * @return -1 if the message is not in a batch
+     */
+    default int getBatchIndex() {
+        return -1;
+    }
+
+    /**
+     * Get the batch size.
+     *
+     * @return 0 if the message is not in a batch
+     */
+    default int getBatchSize() {
+        return 0;
+    }
+
+    /**
+     * Get the BitSet that indicates which messages in the batch.
+     *
+     * @implNote The message IDs of a batch should share a BitSet. For 
example, given 3 messages in the same batch whose
+     * size is 3, all message IDs of them should return "111" (i.e. a BitSet 
whose size is 3 and all bits are 1). If the
+     * 1st message has been acknowledged, the returned BitSet should become 
"011" (i.e. the 1st bit become 0).
+     *
+     * @return null if the message is a non-batched message
+     */
+    default BitSet getAckSet() {
+        return null;
+    }
+
+    /**
+     * Get the message ID of the first chunk if the current message ID 
represents the position of a chunked message.
+     *
+     * @implNote A chunked message is distributed across different BookKeeper 
entries. The message ID of a chunked
+     * message is composed of two message IDs that represent positions of the 
first and the last chunk. The message ID
+     * itself represents the position of the last chunk.
+     *
+     * @return null if the message is not a chunked message
+     */
+    default MessageIdAdv getFirstChunkMessageId() {
+        return null;
+    }
+
+    /**
+     * The default implementation of {@link Comparable#compareTo(Object)}.
+     */
+    default int compareTo(MessageId o) {
+        if (!(o instanceof MessageIdAdv)) {
+            throw new UnsupportedOperationException("Unknown MessageId type: "
+                    + ((o != null) ? o.getClass().getName() : "null"));
+        }
+        final MessageIdAdv other = (MessageIdAdv) o;
+        int result = Long.compare(this.getLedgerId(), other.getLedgerId());
+        if (result != 0) {
+            return result;
+        }
+        result = Long.compare(this.getEntryId(), other.getEntryId());
+        if (result != 0) {
+            return result;
+        }
+        // TODO: Correct the following compare logics, see 
https://github.com/apache/pulsar/pull/18981
+        result = Integer.compare(this.getPartitionIndex(), 
other.getPartitionIndex());
+        if (result != 0) {
+            return result;
+        }
+        return Integer.compare(this.getBatchIndex(), other.getBatchIndex());
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
index f6109d5f8e8..b70267bb0fb 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.util.BitSet;
+
 /**
  * The MessageId used for a consumer that subscribes multiple topics or 
partitioned topics.
  *
@@ -49,13 +51,13 @@ public interface TopicMessageId extends MessageId {
     /**
      * The simplest implementation of a TopicMessageId interface.
      */
-    class Impl implements TopicMessageId {
+    class Impl implements MessageIdAdv, TopicMessageId {
         private final String topic;
-        private final MessageId messageId;
+        private final MessageIdAdv messageId;
 
         public Impl(String topic, MessageId messageId) {
             this.topic = topic;
-            this.messageId = messageId;
+            this.messageId = (MessageIdAdv) messageId;
         }
 
         @Override
@@ -68,6 +70,41 @@ public interface TopicMessageId extends MessageId {
             return topic;
         }
 
+        @Override
+        public long getLedgerId() {
+            return messageId.getLedgerId();
+        }
+
+        @Override
+        public long getEntryId() {
+            return messageId.getEntryId();
+        }
+
+        @Override
+        public int getPartitionIndex() {
+            return messageId.getPartitionIndex();
+        }
+
+        @Override
+        public int getBatchIndex() {
+            return messageId.getBatchIndex();
+        }
+
+        @Override
+        public int getBatchSize() {
+            return messageId.getBatchSize();
+        }
+
+        @Override
+        public BitSet getAckSet() {
+            return messageId.getAckSet();
+        }
+
+        @Override
+        public MessageIdAdv getFirstChunkMessageId() {
+            return messageId.getFirstChunkMessageId();
+        }
+
         @Override
         public int compareTo(MessageId o) {
             return messageId.compareTo(o);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index d46af1a99e7..60d7135e5e4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -31,7 +31,7 @@ public interface AcknowledgmentsGroupingTracker extends 
AutoCloseable {
 
     boolean isDuplicate(MessageId messageId);
 
-    CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType 
ackType, Map<String, Long> properties);
+    CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType 
ackType, Map<String, Long> properties);
 
     CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, 
AckType ackType,
                                                   Map<String, Long> 
properties);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
deleted file mode 100644
index 1c9b66fd2ba..00000000000
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import java.util.BitSet;
-
-public class BatchMessageAcker {
-
-    private BatchMessageAcker() {
-        this.bitSet = new BitSet();
-        this.batchSize = 0;
-    }
-
-    static BatchMessageAcker newAcker(int batchSize) {
-        BitSet bitSet = new BitSet(batchSize);
-        bitSet.set(0, batchSize);
-        return new BatchMessageAcker(bitSet, batchSize);
-    }
-
-    // Use the param bitSet as the BatchMessageAcker's bitSet, don't care 
about the batchSize.
-    static BatchMessageAcker newAcker(BitSet bitSet) {
-        return new BatchMessageAcker(bitSet, -1);
-    }
-
-    // bitset shared across messages in the same batch.
-    private final int batchSize;
-    private final BitSet bitSet;
-    private volatile boolean prevBatchCumulativelyAcked = false;
-
-    BatchMessageAcker(BitSet bitSet, int batchSize) {
-        this.bitSet = bitSet;
-        this.batchSize = batchSize;
-    }
-
-    BitSet getBitSet() {
-        return bitSet;
-    }
-
-    public synchronized int getBatchSize() {
-        return batchSize;
-    }
-
-    public synchronized boolean ackIndividual(int batchIndex) {
-        bitSet.clear(batchIndex);
-        return bitSet.isEmpty();
-    }
-
-    public synchronized int getBitSetSize() {
-        return bitSet.size();
-    }
-
-    public synchronized boolean ackCumulative(int batchIndex) {
-        // +1 since to argument is exclusive
-        bitSet.clear(0, batchIndex + 1);
-        return bitSet.isEmpty();
-    }
-
-    // debug purpose
-    public synchronized int getOutstandingAcks() {
-        return bitSet.cardinality();
-    }
-
-    public void setPrevBatchCumulativelyAcked(boolean acked) {
-        this.prevBatchCumulativelyAcked = acked;
-    }
-
-    public boolean isPrevBatchCumulativelyAcked() {
-        return prevBatchCumulativelyAcked;
-    }
-
-    @Override
-    public String toString() {
-        return "BatchMessageAcker{"
-                + "batchSize=" + batchSize
-                + ", bitSet=" + bitSet
-                + ", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked
-                + '}';
-    }
-}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
deleted file mode 100644
index b70c928b296..00000000000
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import java.util.BitSet;
-
-class BatchMessageAckerDisabled extends BatchMessageAcker {
-
-    static final BatchMessageAckerDisabled INSTANCE = new 
BatchMessageAckerDisabled();
-
-    private BatchMessageAckerDisabled() {
-        super(new BitSet(), 0);
-    }
-
-    @Override
-    public synchronized int getBatchSize() {
-        return 0;
-    }
-
-    @Override
-    public boolean ackIndividual(int batchIndex) {
-        return true;
-    }
-
-    @Override
-    public boolean ackCumulative(int batchIndex) {
-        return true;
-    }
-
-    @Override
-    public int getOutstandingAcks() {
-        return 0;
-    }
-}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index ed28082ff6a..e9cddeb65d7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -18,20 +18,16 @@
  */
 package org.apache.pulsar.client.impl;
 
-import javax.annotation.Nonnull;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.TopicMessageId;
+import java.util.BitSet;
+import org.apache.pulsar.client.api.MessageIdAdv;
 
-/**
- */
 public class BatchMessageIdImpl extends MessageIdImpl {
 
     private static final long serialVersionUID = 1L;
-    static final int NO_BATCH = -1;
     private final int batchIndex;
     private final int batchSize;
 
-    private final transient BatchMessageAcker acker;
+    private final BitSet ackSet;
 
     // Private constructor used only for json deserialization
     @SuppressWarnings("unused")
@@ -40,59 +36,35 @@ public class BatchMessageIdImpl extends MessageIdImpl {
     }
 
     public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, 
int batchIndex) {
-        this(ledgerId, entryId, partitionIndex, batchIndex, 0, 
BatchMessageAckerDisabled.INSTANCE);
+        this(ledgerId, entryId, partitionIndex, batchIndex, 0, null);
     }
 
     public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, 
int batchIndex, int batchSize,
-                              BatchMessageAcker acker) {
+                              BitSet ackSet) {
         super(ledgerId, entryId, partitionIndex);
         this.batchIndex = batchIndex;
         this.batchSize = batchSize;
-        this.acker = acker;
+        this.ackSet = ackSet;
     }
 
-    public BatchMessageIdImpl(MessageIdImpl other) {
-        super(other.ledgerId, other.entryId, other.partitionIndex);
-        if (other instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
-            this.batchIndex = otherId.batchIndex;
-            this.batchSize = otherId.batchSize;
-            this.acker = otherId.acker;
-        } else {
-            this.batchIndex = NO_BATCH;
-            this.batchSize = 0;
-            this.acker = BatchMessageAckerDisabled.INSTANCE;
-        }
+    public BatchMessageIdImpl(MessageIdAdv other) {
+        this(other.getLedgerId(), other.getEntryId(), 
other.getPartitionIndex(),
+                other.getBatchIndex(), other.getBatchSize(), 
other.getAckSet());
     }
 
+    @Override
     public int getBatchIndex() {
         return batchIndex;
     }
 
-    @Override
-    public int compareTo(@Nonnull MessageId o) {
-        if (o instanceof MessageIdImpl) {
-            MessageIdImpl other = (MessageIdImpl) o;
-            int batchIndex = (o instanceof BatchMessageIdImpl) ? 
((BatchMessageIdImpl) o).batchIndex : NO_BATCH;
-            return messageIdCompare(
-                this.ledgerId, this.entryId, this.partitionIndex, 
this.batchIndex,
-                other.ledgerId, other.entryId, other.partitionIndex, batchIndex
-            );
-        } else if (o instanceof TopicMessageId) {
-            return compareTo(MessageIdImpl.convertToMessageIdImpl(o));
-        } else {
-            throw new UnsupportedOperationException("Unknown MessageId type: " 
+ o.getClass().getName());
-        }
-    }
-
     @Override
     public int hashCode() {
-        return messageIdHashCode(ledgerId, entryId, partitionIndex, 
batchIndex);
+        return MessageIdAdvUtils.hashCode(this);
     }
 
     @Override
     public boolean equals(Object o) {
-        return super.equals(o);
+        return MessageIdAdvUtils.equals(this, o);
     }
 
     @Override
@@ -106,39 +78,51 @@ public class BatchMessageIdImpl extends MessageIdImpl {
         return toByteArray(batchIndex, batchSize);
     }
 
+    @Deprecated
     public boolean ackIndividual() {
-        return acker.ackIndividual(batchIndex);
+        return MessageIdAdvUtils.acknowledge(this, true);
     }
 
+    @Deprecated
     public boolean ackCumulative() {
-        return acker.ackCumulative(batchIndex);
+        return MessageIdAdvUtils.acknowledge(this, false);
     }
 
+    @Deprecated
     public int getOutstandingAcksInSameBatch() {
-        return acker.getOutstandingAcks();
+        return 0;
     }
 
+    @Override
     public int getBatchSize() {
-        return acker.getBatchSize();
+        return batchSize;
     }
 
+    @Deprecated
     public int getOriginalBatchSize() {
         return this.batchSize;
     }
 
+    @Deprecated
     public MessageIdImpl prevBatchMessageId() {
-        return new MessageIdImpl(
-            ledgerId, entryId - 1, partitionIndex);
+        return (MessageIdImpl) MessageIdAdvUtils.prevMessageId(this);
     }
 
     // MessageIdImpl is widely used as the key of a hash map, in this case, we 
should convert the batch message id to
     // have the correct hash code.
+    @Deprecated
     public MessageIdImpl toMessageIdImpl() {
-        return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+        return (MessageIdImpl) MessageIdAdvUtils.discardBatch(this);
     }
 
-    public BatchMessageAcker getAcker() {
-        return acker;
+    @Override
+    public BitSet getAckSet() {
+        return ackSet;
     }
 
+    static BitSet newAckSet(int batchSize) {
+        final BitSet ackSet = new BitSet(batchSize);
+        ackSet.set(0, batchSize);
+        return ackSet;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
index 28d5047c8ef..29ce160442a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.client.impl;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.Objects;
-import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 
-public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {
+public class ChunkMessageIdImpl extends MessageIdImpl {
     private final MessageIdImpl firstChunkMsgId;
 
     public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl 
lastChunkMsgId) {
@@ -32,11 +32,12 @@ public class ChunkMessageIdImpl extends MessageIdImpl 
implements MessageId {
         this.firstChunkMsgId = firstChunkMsgId;
     }
 
-    public MessageIdImpl getFirstChunkMessageId() {
+    @Override
+    public MessageIdAdv getFirstChunkMessageId() {
         return firstChunkMsgId;
     }
 
-    public MessageIdImpl getLastChunkMessageId() {
+    public MessageIdAdv getLastChunkMessageId() {
         return this;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 75d3b2edf6e..973b3302f41 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -82,7 +83,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final ExecutorService internalPinnedExecutor;
     protected UnAckedMessageTracker unAckedMessageTracker;
     final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
-    protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
+    protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
     protected final int maxReceiverQueueSize;
     private volatile int currentReceiverQueueSize;
@@ -128,7 +129,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
         this.unAckedChunkedMessageIdSequenceMap =
-                ConcurrentOpenHashMap.<MessageIdImpl, 
MessageIdImpl[]>newBuilder().build();
+                ConcurrentOpenHashMap.<MessageIdAdv, 
MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
         this.externalPinnedExecutor = executorProvider.getExecutor();
         this.internalPinnedExecutor = client.getInternalExecutorService();
@@ -223,14 +224,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
-    protected MessageId normalizeMessageId(MessageId messageId) {
-        if (messageId instanceof BatchMessageIdImpl) {
-            // do not add each item in batch message into tracker
-            return ((BatchMessageIdImpl) messageId).toMessageIdImpl();
-        }
-        return messageId;
-    }
-
     protected void reduceCurrentReceiverQueueSize() {
         if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
             return;
@@ -1131,7 +1124,7 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
                                 ? ((TopicMessageImpl<T>) msg).getMessage() : 
msg));
             MessageId id;
             if (this instanceof ConsumerImpl) {
-                id = normalizeMessageId(msg.getMessageId());
+                id = MessageIdAdvUtils.discardBatch(msg.getMessageId());
             } else {
                 id = msg.getMessageId();
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index beaa34bf205..1feef6ca0a6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -35,6 +35,7 @@ import io.netty.util.Timeout;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -71,6 +72,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -154,12 +156,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @Getter(AccessLevel.PACKAGE)
     private final int priorityLevel;
     private final SubscriptionMode subscriptionMode;
-    private volatile BatchMessageIdImpl startMessageId;
+    private volatile MessageIdAdv startMessageId;
 
-    private volatile BatchMessageIdImpl seekMessageId;
+    private volatile MessageIdAdv seekMessageId;
     private final AtomicBoolean duringSeek;
 
-    private final BatchMessageIdImpl initialStartMessageId;
+    private final MessageIdAdv initialStartMessageId;
 
     private final long startMessageRollbackDurationInSec;
 
@@ -178,7 +180,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final TopicName topicName;
     private final String topicNameWithoutPartition;
 
-    private final Map<MessageIdImpl, List<MessageImpl<T>>> 
possibleSendToDeadLetterTopicMessages;
+    private final Map<MessageIdAdv, List<MessageImpl<T>>> 
possibleSendToDeadLetterTopicMessages;
 
     private final DeadLetterPolicy deadLetterPolicy;
 
@@ -258,12 +260,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = conf.getSubscriptionMode();
         if (startMessageId != null) {
-            if (startMessageId instanceof ChunkMessageIdImpl) {
-                this.startMessageId = new BatchMessageIdImpl(
-                        ((ChunkMessageIdImpl) 
startMessageId).getFirstChunkMessageId());
-            } else {
-                this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) 
startMessageId);
-            }
+            MessageIdAdv firstChunkMessageId = ((MessageIdAdv) 
startMessageId).getFirstChunkMessageId();
+            this.startMessageId = (firstChunkMessageId == null) ? 
(MessageIdAdv) startMessageId : firstChunkMessageId;
         }
         this.initialStartMessageId = this.startMessageId;
         this.startMessageRollbackDurationInSec = 
startMessageRollbackDurationInSec;
@@ -535,7 +533,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String, Long> 
properties,
                                                     TransactionImpl txn) {
-        messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
             PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
@@ -551,16 +548,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return doTransactionAcknowledgeForResponse(messageId, ackType, 
null, properties,
                     new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
         }
-        return 
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, 
ackType, properties);
+        return acknowledgmentsGroupingTracker.addAcknowledgment(messageId, 
ackType, properties);
     }
 
     @Override
     protected CompletableFuture<Void> doAcknowledge(List<MessageId> 
messageIdList, AckType ackType,
                                                     Map<String, Long> 
properties, TransactionImpl txn) {
-
-        for (MessageId messageId : messageIdList) {
-            checkArgument(messageId instanceof MessageIdImpl);
-        }
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
             PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
@@ -572,7 +565,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return FutureUtil.failedFuture(exception);
         }
         if (txn != null) {
-            return doTransactionAcknowledgeForResponse(messageIdList, ackType, 
null,
+            return doTransactionAcknowledgeForResponse(messageIdList, ackType,
                     properties, new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
         } else {
             return 
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, 
ackType, properties);
@@ -592,7 +585,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     .InvalidMessageException("Cannot handle message with null 
messageId"));
         }
 
-        messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
             PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
@@ -922,7 +914,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
      * Clear the internal receiver queue and returns the message id of what 
was the 1st message in the queue that was
      * not seen by the application.
      */
-    private BatchMessageIdImpl clearReceiverQueue() {
+    private MessageIdAdv clearReceiverQueue() {
         List<Message<?>> currentMessageQueue = new 
ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
         resetIncomingMessageSize();
@@ -934,17 +926,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         if (!currentMessageQueue.isEmpty()) {
-            MessageIdImpl nextMessageInQueue = (MessageIdImpl) 
currentMessageQueue.get(0).getMessageId();
-            BatchMessageIdImpl previousMessage;
-            if (nextMessageInQueue instanceof BatchMessageIdImpl) {
+            MessageIdAdv nextMessageInQueue = (MessageIdAdv) 
currentMessageQueue.get(0).getMessageId();
+            MessageIdAdv previousMessage;
+            if (MessageIdAdvUtils.isBatch(nextMessageInQueue)) {
                 // Get on the previous message within the current batch
                 previousMessage = new 
BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
                         nextMessageInQueue.getEntryId(), 
nextMessageInQueue.getPartitionIndex(),
-                        ((BatchMessageIdImpl) 
nextMessageInQueue).getBatchIndex() - 1);
+                        nextMessageInQueue.getBatchIndex() - 1);
             } else {
                 // Get on previous message in previous entry
-                previousMessage = new 
BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
-                        nextMessageInQueue.getEntryId() - 1, 
nextMessageInQueue.getPartitionIndex(), -1);
+                previousMessage = 
MessageIdAdvUtils.prevMessageId(nextMessageInQueue);
             }
             // release messages if they are pooled messages
             currentMessageQueue.forEach(Message::release);
@@ -1126,7 +1117,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                   final Schema<V> schema,
                                                   final boolean 
containMetadata,
                                                   final BitSetRecyclable 
ackBitSet,
-                                                  final BatchMessageAcker 
acker,
+                                                  final BitSet 
ackSetInMessageId,
                                                   final int redeliveryCount,
                                                   final long consumerEpoch) {
         if (log.isDebugEnabled()) {
@@ -1161,7 +1152,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
 
             BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
-                    messageId.getEntryId(), getPartitionIndex(), index, 
numMessages, acker);
+                    messageId.getEntryId(), getPartitionIndex(), index, 
numMessages, ackSetInMessageId);
 
             final ByteBuf payloadBuffer = (singleMessagePayload != null) ? 
singleMessagePayload : payload;
             final MessageImpl<V> message = 
MessageImpl.create(topicName.toString(), batchMessageIdImpl,
@@ -1525,7 +1516,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             possibleToDeadLetter = new ArrayList<>();
         }
 
-        BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
+        BitSet ackSetInMessageId = BatchMessageIdImpl.newAckSet(batchSize);
         BitSetRecyclable ackBitSet = null;
         if (ackSet != null && ackSet.size() > 0) {
             ackBitSet = 
BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
@@ -1537,7 +1528,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             for (int i = 0; i < batchSize; ++i) {
                 final MessageImpl<T> message = newSingleMessage(i, batchSize, 
brokerEntryMetadata, msgMetadata,
                         singleMessageMetadata, uncompressedPayload, 
batchMessage, schema, true,
-                        ackBitSet, acker, redeliveryCount, consumerEpoch);
+                        ackBitSet, ackSetInMessageId, redeliveryCount, 
consumerEpoch);
                 if (message == null) {
                     skippedMessages++;
                     continue;
@@ -1634,7 +1625,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     protected void trackMessage(MessageId messageId, int redeliveryCount) {
         if (conf.getAckTimeoutMillis() > 0 && messageId instanceof 
MessageIdImpl) {
-            MessageId id = normalizeMessageId(messageId);
+            MessageId id = MessageIdAdvUtils.discardBatch(messageId);
             if (hasParentConsumer) {
                 //TODO: check parent consumer here
                 // we should no longer track this message, TopicsConsumer will 
take care from now onwards
@@ -1931,8 +1922,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
-        checkArgument(messageIds.stream().findFirst().get() instanceof 
MessageIdImpl);
-
         if (conf.getSubscriptionType() != SubscriptionType.Shared
                 && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
             // We cannot redeliver single messages if subscription type is not 
Shared
@@ -1942,11 +1931,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         ClientCnx cnx = cnx();
         if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v2.getValue()) {
             int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
-            Iterable<List<MessageIdImpl>> batches = Iterables.partition(
-                messageIds.stream()
-                    .map(messageId -> (MessageIdImpl) messageId)
-                    .collect(Collectors.toSet()), 
MAX_REDELIVER_UNACKNOWLEDGED);
-            batches.forEach(ids -> {
+            Iterables.partition(messageIds, 
MAX_REDELIVER_UNACKNOWLEDGED).forEach(ids -> {
                 getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
                     if (!messageIdData.isEmpty()) {
                         ByteBuf cmd = 
Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
@@ -1985,11 +1970,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         notifyPendingBatchReceivedCallBack(op.future);
     }
 
-    private CompletableFuture<List<MessageIdData>> 
getRedeliveryMessageIdData(List<MessageIdImpl> messageIds) {
+    private CompletableFuture<List<MessageIdData>> 
getRedeliveryMessageIdData(List<MessageId> messageIds) {
         if (messageIds == null || messageIds.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
-        List<CompletableFuture<MessageIdData>> futures = 
messageIds.stream().map(messageId -> {
+        List<CompletableFuture<MessageIdData>> futures = 
messageIds.stream().map(originalMessageId -> {
+            final MessageIdAdv messageId = (MessageIdAdv) originalMessageId;
             CompletableFuture<Boolean> future = 
processPossibleToDLQ(messageId);
             return future.thenApply(sendToDLQ -> {
                 if (!sendToDLQ) {
@@ -2005,20 +1991,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 
futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
     }
 
-    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl 
messageId) {
+    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv 
messageId) {
         List<MessageImpl<T>> deadLetterMessages = null;
         if (possibleSendToDeadLetterTopicMessages != null) {
-            if (messageId instanceof BatchMessageIdImpl) {
-                messageId = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(),
-                        getPartitionIndex());
-            }
-            deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(messageId);
+            deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(MessageIdAdvUtils.discardBatch(messageId));
         }
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         if (deadLetterMessages != null) {
             initDeadLetterProducerIfNeeded();
             List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
-            MessageIdImpl finalMessageId = messageId;
             deadLetterProducer.thenAcceptAsync(producerDLQ -> {
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
                     String originMessageIdStr = 
message.getMessageId().toString();
@@ -2032,12 +2013,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     }
                     typedMessageBuilderNew.sendAsync()
                             .thenAccept(messageIdInDLQ -> {
-                                
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
-                                
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
+                                
possibleSendToDeadLetterTopicMessages.remove(messageId);
+                                acknowledgeAsync(messageId).whenComplete((v, 
ex) -> {
                                     if (ex != null) {
                                         log.warn("[{}] [{}] [{}] Failed to 
acknowledge the message {} of the original"
                                                         + " topic but send to 
the DLQ successfully.",
-                                                topicName, subscription, 
consumerName, finalMessageId, ex);
+                                                topicName, subscription, 
consumerName, messageId, ex);
                                         result.complete(false);
                                     } else {
                                         result.complete(true);
@@ -2047,11 +2028,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 if (ex instanceof 
PulsarClientException.ProducerQueueIsFullError) {
                                     log.warn("[{}] [{}] [{}] Failed to send 
DLQ message to {} for message id {}: {}",
                                             topicName, subscription, 
consumerName,
-                                            
deadLetterPolicy.getDeadLetterTopic(), finalMessageId, ex.getMessage());
+                                            
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
                                 } else {
                                     log.warn("[{}] [{}] [{}] Failed to send 
DLQ message to {} for message id {}",
                                             topicName, subscription, 
consumerName,
-                                            
deadLetterPolicy.getDeadLetterTopic(), finalMessageId, ex);
+                                            
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
                                 }
                                 result.complete(false);
                                 return null;
@@ -2154,8 +2135,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
-        BatchMessageIdImpl originSeekMessageId = seekMessageId;
-        seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
+        MessageIdAdv originSeekMessageId = seekMessageId;
+        seekMessageId = (MessageIdAdv) seekId;
         duringSeek.set(true);
         log.info("[{}][{}] Seeking subscription to {}", topic, subscription, 
seekBy);
 
@@ -2193,29 +2174,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public CompletableFuture<Void> seekAsync(MessageId originalMessageId) {
-        final MessageIdImpl messageId = 
MessageIdImpl.convertToMessageIdImpl(originalMessageId);
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
         String seekBy = String.format("the message %s", messageId.toString());
         return seekAsyncCheckState(seekBy).orElseGet(() -> {
             long requestId = client.newRequestId();
-            ByteBuf seek = null;
-            if (messageId instanceof BatchMessageIdImpl) {
-                BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
-                // Initialize ack set
-                BitSetRecyclable ackSet = BitSetRecyclable.create();
-                ackSet.set(0, msgId.getBatchSize());
-                ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
-                long[] ackSetArr = ackSet.toLongArray();
-                ackSet.recycle();
-
-                seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
-            } else if (messageId instanceof ChunkMessageIdImpl) {
-                ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId;
-                seek = Commands.newSeek(consumerId, requestId, 
msgId.getFirstChunkMessageId().getLedgerId(),
-                        msgId.getFirstChunkMessageId().getEntryId(), new 
long[0]);
+            final MessageIdAdv msgId = (MessageIdAdv) messageId;
+            final MessageIdAdv firstChunkMsgId = 
msgId.getFirstChunkMessageId();
+            final ByteBuf seek;
+            if (msgId.getFirstChunkMessageId() != null) {
+                seek = Commands.newSeek(consumerId, requestId, 
firstChunkMsgId.getLedgerId(),
+                        firstChunkMsgId.getEntryId(), new long[0]);
             } else {
-                seek = Commands.newSeek(
-                        consumerId, requestId, messageId.getLedgerId(), 
messageId.getEntryId(), new long[0]);
+                final long[] ackSetArr;
+                if (MessageIdAdvUtils.isBatch(msgId)) {
+                    final BitSetRecyclable ackSet = BitSetRecyclable.create();
+                    ackSet.set(0, msgId.getBatchSize());
+                    ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+                    ackSetArr = ackSet.toLongArray();
+                    ackSet.recycle();
+                } else {
+                    ackSetArr = new long[0];
+                }
+                seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
             }
             return seekAsyncInternal(requestId, seek, messageId, seekBy);
         });
@@ -2247,9 +2227,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
 
                 future.thenAccept(response -> {
-                    MessageIdImpl lastMessageId = 
MessageIdImpl.convertToMessageIdImpl(response.lastMessageId);
-                    MessageIdImpl markDeletePosition = MessageIdImpl
-                            
.convertToMessageIdImpl(response.markDeletePosition);
+                    MessageIdAdv lastMessageId = (MessageIdAdv) 
response.lastMessageId;
+                    MessageIdAdv markDeletePosition = (MessageIdAdv) 
response.markDeletePosition;
 
                     if (markDeletePosition != null && 
!(markDeletePosition.getEntryId() < 0
                             && markDeletePosition.getLedgerId() > 
lastMessageId.getLedgerId())) {
@@ -2434,16 +2413,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
-    private MessageIdImpl getMessageIdImpl(Message<?> msg) {
-        MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
-        if (messageId instanceof BatchMessageIdImpl) {
-            // messageIds contain MessageIdImpl, not BatchMessageIdImpl
-            messageId = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex());
-        }
-        return messageId;
-    }
-
-
     private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
         return (msgMetadata.getEncryptionKeysCount() > 0 && 
conf.getCryptoKeyReader() == null
                 && conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME);
@@ -2486,7 +2455,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         int messagesFromQueue = 0;
         Message<T> peek = incomingMessages.peek();
         if (peek != null) {
-            MessageIdImpl messageId = getMessageIdImpl(peek);
+            MessageIdAdv messageId = 
MessageIdAdvUtils.discardBatch(peek.getMessageId());
             if (!messageIds.contains(messageId)) {
                 // first message is not expired, then no message is expired in 
queue.
                 return 0;
@@ -2497,7 +2466,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             while (message != null) {
                 decreaseIncomingMessageSize(message);
                 messagesFromQueue++;
-                MessageIdImpl id = getMessageIdImpl(message);
+                MessageIdAdv id = 
MessageIdAdvUtils.discardBatch(message.getMessageId());
                 if (!messageIds.contains(id)) {
                     messageIds.add(id);
                     break;
@@ -2691,33 +2660,26 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
                                                                         
ValidationError validationError,
                                                                         
Map<String, Long> properties, TxnID txnID) {
-        BitSetRecyclable bitSetRecyclable = null;
-        long ledgerId;
-        long entryId;
-        ByteBuf cmd;
         long requestId = client.newRequestId();
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-            bitSetRecyclable = BitSetRecyclable.create();
-            ledgerId = batchMessageId.getLedgerId();
-            entryId = batchMessageId.getEntryId();
+        final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+        final long ledgerId = messageIdAdv.getLedgerId();
+        final long entryId = messageIdAdv.getEntryId();
+        final ByteBuf cmd;
+        if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
+            BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+            bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
             if (ackType == AckType.Cumulative) {
-                batchMessageId.ackCumulative();
-                bitSetRecyclable.set(0, batchMessageId.getBatchSize());
-                bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+                MessageIdAdvUtils.acknowledge(messageIdAdv, false);
+                bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
             } else {
-                bitSetRecyclable.set(0, batchMessageId.getBatchSize());
-                bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
             }
             cmd = Commands.newAck(consumerId, ledgerId, entryId, 
bitSetRecyclable, ackType, validationError, properties,
-                    txnID.getLeastSigBits(), txnID.getMostSigBits(), 
requestId, batchMessageId.getBatchSize());
+                    txnID.getLeastSigBits(), txnID.getMostSigBits(), 
requestId, messageIdAdv.getBatchSize());
             bitSetRecyclable.recycle();
         } else {
-            MessageIdImpl singleMessage = (MessageIdImpl) messageId;
-            ledgerId = singleMessage.getLedgerId();
-            entryId = singleMessage.getEntryId();
-            cmd = Commands.newAck(consumerId, ledgerId, entryId, 
bitSetRecyclable, ackType,
-                    validationError, properties, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), requestId);
+            cmd = Commands.newAck(consumerId, ledgerId, entryId, null, 
ackType, validationError, properties,
+                    txnID.getLeastSigBits(), txnID.getMostSigBits(), 
requestId);
         }
 
         if (ackType == AckType.Cumulative) {
@@ -2736,58 +2698,42 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
-                                                                        
ValidationError validationError,
                                                                         
Map<String, Long> properties, TxnID txnID) {
-        BitSetRecyclable bitSetRecyclable = null;
-        long ledgerId;
-        long entryId;
-        ByteBuf cmd;
         long requestId = client.newRequestId();
         List<MessageIdData> messageIdDataList = new LinkedList<>();
         for (MessageId messageId : messageIds) {
-            if (messageId instanceof BatchMessageIdImpl) {
-                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                bitSetRecyclable = BitSetRecyclable.create();
+            final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+            final MessageIdData messageIdData = new MessageIdData();
+            messageIdData.setLedgerId(messageIdAdv.getLedgerId());
+            messageIdData.setEntryId(messageIdAdv.getEntryId());
+            if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
+                final BitSetRecyclable bitSetRecyclable = 
BitSetRecyclable.create();
+                bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
                 if (ackType == AckType.Cumulative) {
-                    batchMessageId.ackCumulative();
-                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
-                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 
1);
+                    MessageIdAdvUtils.acknowledge(messageIdAdv, false);
+                    bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 
1);
                 } else {
-                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
-                    bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                    bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
                 }
-                MessageIdData messageIdData = new MessageIdData();
-                messageIdData.setLedgerId(batchMessageId.getLedgerId());
-                messageIdData.setEntryId(batchMessageId.getEntryId());
-                messageIdData.setBatchSize(batchMessageId.getBatchSize());
-                long[] as = bitSetRecyclable.toLongArray();
-                for (int i = 0; i < as.length; i++) {
-                    messageIdData.addAckSet(as[i]);
+                for (long x : bitSetRecyclable.toLongArray()) {
+                    messageIdData.addAckSet(x);
                 }
                 bitSetRecyclable.recycle();
-                messageIdDataList.add(messageIdData);
-            } else {
-                MessageIdImpl singleMessage = (MessageIdImpl) messageId;
-                ledgerId = singleMessage.getLedgerId();
-                entryId = singleMessage.getEntryId();
-                MessageIdData messageIdData = new MessageIdData();
-                messageIdData.setLedgerId(ledgerId);
-                messageIdData.setEntryId(entryId);
-                messageIdDataList.add(messageIdData);
             }
 
+            messageIdDataList.add(messageIdData);
             if (ackType == AckType.Cumulative) {
                 unAckedMessageTracker.removeMessagesTill(messageId);
             } else {
                 unAckedMessageTracker.remove(messageId);
             }
         }
-        cmd = Commands.newAck(consumerId, messageIdDataList, ackType, 
validationError, properties,
+        final ByteBuf cmd = Commands.newAck(consumerId, messageIdDataList, 
ackType, null, properties,
                 txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
         return cnx().newAckForReceipt(cmd, requestId);
     }
 
-    public Map<MessageIdImpl, List<MessageImpl<T>>> 
getPossibleSendToDeadLetterTopicMessages() {
+    public Map<MessageIdAdv, List<MessageImpl<T>>> 
getPossibleSendToDeadLetterTopicMessages() {
         return possibleSendToDeadLetterTopicMessages;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
new file mode 100644
index 00000000000..c8b18524ec0
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.BitSet;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+
+public class MessageIdAdvUtils {
+
+    static int hashCode(MessageIdAdv msgId) {
+        return (int) (31 * (msgId.getLedgerId() + 31 * msgId.getEntryId())
+                + (31 * (long) msgId.getPartitionIndex()) + 
msgId.getBatchIndex());
+    }
+
+    static boolean equals(MessageIdAdv lhs, Object o) {
+        if (!(o instanceof MessageIdAdv)) {
+            return false;
+        }
+        final MessageIdAdv rhs = (MessageIdAdv) o;
+        return lhs.getLedgerId() == rhs.getLedgerId()
+                && lhs.getEntryId() == rhs.getEntryId()
+                && lhs.getPartitionIndex() == rhs.getPartitionIndex()
+                && lhs.getBatchIndex() == rhs.getBatchIndex();
+    }
+
+    static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
+        if (!isBatch(msgId)) {
+            return true;
+        }
+        final BitSet ackSet = msgId.getAckSet();
+        if (ackSet == null) {
+            // The internal MessageId implementation should never reach here. 
If users have implemented their own
+            // MessageId and getAckSet() is not override, return false to 
avoid acknowledge current entry.
+            return false;
+        }
+        int batchIndex = msgId.getBatchIndex();
+        if (individual) {
+            ackSet.clear(batchIndex);
+        } else {
+            ackSet.clear(0, batchIndex + 1);
+        }
+        return ackSet.isEmpty();
+    }
+
+    static boolean isBatch(MessageIdAdv msgId) {
+        return msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0;
+    }
+
+    static MessageIdAdv discardBatch(MessageId messageId) {
+        MessageIdAdv msgId = (MessageIdAdv) messageId;
+        return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), 
msgId.getPartitionIndex());
+    }
+
+    static MessageIdAdv prevMessageId(MessageIdAdv msgId) {
+        return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId() - 1, 
msgId.getPartitionIndex());
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 1a0f491a6a7..83ee7625783 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -18,21 +18,17 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.apache.pulsar.client.impl.BatchMessageIdImpl.NO_BATCH;
-import com.google.common.collect.ComparisonChain;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.util.Objects;
-import javax.annotation.Nonnull;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.common.api.proto.MessageIdData;
-import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.common.naming.TopicName;
 
-public class MessageIdImpl implements MessageId {
+public class MessageIdImpl implements MessageIdAdv {
     protected final long ledgerId;
     protected final long entryId;
     protected final int partitionIndex;
@@ -49,28 +45,29 @@ public class MessageIdImpl implements MessageId {
         this.partitionIndex = partitionIndex;
     }
 
+    @Override
     public long getLedgerId() {
         return ledgerId;
     }
 
+    @Override
     public long getEntryId() {
         return entryId;
     }
 
+    @Override
     public int getPartitionIndex() {
         return partitionIndex;
     }
 
     @Override
     public int hashCode() {
-        return messageIdHashCode(ledgerId, entryId, partitionIndex, NO_BATCH);
+        return MessageIdAdvUtils.hashCode(this);
     }
 
     @Override
     public boolean equals(Object o) {
-        return (o instanceof MessageId)
-                && !(o instanceof MultiMessageIdImpl)
-                && (compareTo((MessageId) o) == 0);
+        return MessageIdAdvUtils.equals(this, o);
     }
 
     @Override
@@ -100,7 +97,7 @@ public class MessageIdImpl implements MessageId {
         if (idData.hasBatchIndex()) {
             if (idData.hasBatchSize()) {
                 messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
-                    idData.getBatchIndex(), idData.getBatchSize(), 
BatchMessageAcker.newAcker(idData.getBatchSize()));
+                    idData.getBatchIndex(), idData.getBatchSize(), 
BatchMessageIdImpl.newAckSet(idData.getBatchSize()));
             } else {
                 messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
                     idData.getBatchIndex());
@@ -118,22 +115,6 @@ public class MessageIdImpl implements MessageId {
         return messageId;
     }
 
-    @InterfaceStability.Unstable
-    public static MessageIdImpl convertToMessageIdImpl(MessageId messageId) {
-        if (messageId instanceof TopicMessageId) {
-            if (messageId instanceof TopicMessageIdImpl) {
-                return (MessageIdImpl) ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
-            } else {
-                try {
-                    return (MessageIdImpl) 
MessageId.fromByteArray(messageId.toByteArray());
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        return (MessageIdImpl) messageId;
-    }
-
     public static MessageId fromByteArrayWithTopic(byte[] data, String 
topicName) throws IOException {
         return fromByteArrayWithTopic(data, TopicName.get(topicName));
     }
@@ -152,10 +133,10 @@ public class MessageIdImpl implements MessageId {
             if (idData.hasBatchSize()) {
                 messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
                         idData.getBatchIndex(), idData.getBatchSize(),
-                        BatchMessageAcker.newAcker(idData.getBatchSize()));
+                        BatchMessageIdImpl.newAckSet(idData.getBatchSize()));
             } else {
                 messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
-                        idData.getBatchIndex(), 0, 
BatchMessageAckerDisabled.INSTANCE);
+                        idData.getBatchIndex(), 0, null);
             }
         } else {
             messageId = new MessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition());
@@ -207,36 +188,4 @@ public class MessageIdImpl implements MessageId {
         // there is no message batch so we pass -1
         return toByteArray(-1, 0);
     }
-
-    @Override
-    public int compareTo(@Nonnull MessageId o) {
-        if (o instanceof MessageIdImpl) {
-            MessageIdImpl other = (MessageIdImpl) o;
-            int batchIndex = (o instanceof BatchMessageIdImpl) ? 
((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH;
-            return messageIdCompare(
-                this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH,
-                other.ledgerId, other.entryId, other.partitionIndex, batchIndex
-            );
-        } else if (o instanceof TopicMessageId) {
-            return compareTo(convertToMessageIdImpl(o));
-        } else {
-            throw new UnsupportedOperationException("Unknown MessageId type: " 
+ o.getClass().getName());
-        }
-    }
-
-    static int messageIdHashCode(long ledgerId, long entryId, int 
partitionIndex, int batchIndex) {
-        return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long) 
partitionIndex) + batchIndex);
-    }
-
-    static int messageIdCompare(
-        long ledgerId1, long entryId1, int partitionIndex1, int batchIndex1,
-        long ledgerId2, long entryId2, int partitionIndex2, int batchIndex2
-    ) {
-        return ComparisonChain.start()
-            .compare(ledgerId1, ledgerId2)
-            .compare(entryId1, entryId2)
-            .compare(partitionIndex1, partitionIndex2)
-            .compare(batchIndex1, batchIndex2)
-            .result();
-    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 0b6fb608ee6..d369d639a73 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.schema.AbstractSchema;
@@ -714,9 +715,10 @@ public class MessageImpl<T> implements Message<T> {
     @Override
     public Optional<Long> getIndex() {
         if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-            if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof 
BatchMessageIdImpl) {
-                int batchSize = ((BatchMessageIdImpl) 
messageId).getBatchSize();
-                int batchIndex = ((BatchMessageIdImpl) 
messageId).getBatchIndex();
+            MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+            if (msgMetadata.hasNumMessagesInBatch() && 
MessageIdAdvUtils.isBatch(messageIdAdv)) {
+                int batchSize = messageIdAdv.getBatchSize();
+                int batchIndex = messageIdAdv.getBatchIndex();
                 return Optional.of(brokerEntryMetadata.getIndex() - batchSize 
+ batchIndex + 1);
             }
             return Optional.of(brokerEntryMetadata.getIndex());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
index dcae86bd01a..f4c9aa27074 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
+import java.util.BitSet;
 import java.util.List;
 import lombok.NonNull;
 import org.apache.pulsar.client.api.Message;
@@ -50,7 +51,7 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
     private MessageIdImpl messageId;
     private ConsumerImpl<?> consumer;
     private int redeliveryCount;
-    private BatchMessageAcker acker;
+    private BitSet ackSetInMessageId;
     private BitSetRecyclable ackBitSet;
     private long consumerEpoch;
 
@@ -73,7 +74,7 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
         context.messageId = messageId;
         context.consumer = consumer;
         context.redeliveryCount = redeliveryCount;
-        context.acker = BatchMessageAcker.newAcker(context.getNumMessages());
+        context.ackSetInMessageId = 
BatchMessageIdImpl.newAckSet(context.getNumMessages());
         context.ackBitSet = (ackSet != null && ackSet.size() > 0)
                 ? 
BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet))
                 : null;
@@ -88,7 +89,7 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
         consumer = null;
         redeliveryCount = 0;
         consumerEpoch = DEFAULT_CONSUMER_EPOCH;
-        acker = null;
+        ackSetInMessageId = null;
         if (ackBitSet != null) {
             ackBitSet.recycle();
             ackBitSet = null;
@@ -134,7 +135,7 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
                     schema,
                     containMetadata,
                     ackBitSet,
-                    acker,
+                    ackSetInMessageId,
                     redeliveryCount,
                     consumerEpoch);
         } finally {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f993304b078..5fe0e4a82b8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
@@ -100,7 +101,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private final MultiTopicConsumerStatsRecorderImpl stats;
     private final ConsumerConfigurationData<T> internalConfig;
 
-    private volatile BatchMessageIdImpl startMessageId = null;
+    private volatile MessageIdAdv startMessageId;
     private final long startMessageRollbackDurationInSec;
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
             ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
@@ -139,9 +140,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         this.consumers = new ConcurrentHashMap<>();
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.allTopicPartitionsNumber = new AtomicInteger(0);
-        this.startMessageId = startMessageId != null
-                ? new 
BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
-                : null;
+        this.startMessageId = (MessageIdAdv) startMessageId;
         this.startMessageRollbackDurationInSec = 
startMessageRollbackDurationInSec;
         this.paused = conf.isStartPaused();
 
@@ -454,18 +453,15 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
         }
 
-        TopicMessageId topicMessageId = (TopicMessageId) messageId;
-        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getOwnerTopic());
+        ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         if (consumer == null) {
             return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
         }
-        MessageId innerMessageId = 
MessageIdImpl.convertToMessageIdImpl(topicMessageId);
         if (ackType == AckType.Cumulative) {
-            return consumer.acknowledgeCumulativeAsync(innerMessageId);
+            return consumer.acknowledgeCumulativeAsync(messageId);
         } else {
-            return consumer.doAcknowledgeWithTxn(innerMessageId, ackType, 
properties, txnImpl)
-                .thenRun(() ->
-                    unAckedMessageTracker.remove(topicMessageId));
+            return consumer.doAcknowledgeWithTxn(messageId, ackType, 
properties, txnImpl)
+                .thenRun(() -> unAckedMessageTracker.remove(messageId));
         }
     }
 
@@ -490,10 +486,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             }
             Map<String, List<MessageId>> topicToMessageIdMap = new HashMap<>();
             for (MessageId messageId : messageIdList) {
-                TopicMessageId topicMessageId = (TopicMessageId) messageId;
-                
topicToMessageIdMap.putIfAbsent(topicMessageId.getOwnerTopic(), new 
ArrayList<>());
-                topicToMessageIdMap.get(topicMessageId.getOwnerTopic())
-                        
.add(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
+                String ownerTopic = ((TopicMessageId) 
messageId).getOwnerTopic();
+                topicToMessageIdMap.putIfAbsent(ownerTopic, new ArrayList<>());
+                topicToMessageIdMap.get(ownerTopic).add(messageId);
             }
             final Map<ConsumerImpl<T>, List<MessageId>> consumerToMessageIds = 
new IdentityHashMap<>();
             for (Map.Entry<String, List<MessageId>> entry : 
topicToMessageIdMap.entrySet()) {
@@ -549,10 +544,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     @Override
     public void negativeAcknowledge(MessageId messageId) {
         checkArgument(messageId instanceof TopicMessageId);
-        TopicMessageId topicMessageId = (TopicMessageId) messageId;
-
-        ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getOwnerTopic());
-        
consumer.negativeAcknowledge(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
+        ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
+        consumer.negativeAcknowledge(messageId);
     }
 
     @Override
@@ -705,12 +698,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             return;
         }
         removeExpiredMessagesFromQueue(messageIds);
-        messageIds.stream().map(messageId -> (TopicMessageId) messageId)
-            .collect(Collectors.groupingBy(TopicMessageId::getOwnerTopic, 
Collectors.toSet()))
-            .forEach((topicName, messageIds1) ->
-                consumers.get(topicName)
-                    .redeliverUnacknowledgedMessages(messageIds1.stream()
-                        
.map(MessageIdImpl::convertToMessageIdImpl).collect(Collectors.toSet())));
+        messageIds.stream()
+                .collect(Collectors.groupingBy(
+                        msgId -> ((TopicMessageIdImpl) msgId).getOwnerTopic(), 
Collectors.toSet()))
+                .forEach((topicName, messageIds1) ->
+                        
consumers.get(topicName).redeliverUnacknowledgedMessages(messageIds1));
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
@@ -1508,7 +1500,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
         //only support earliest/latest
-        return !MessageId.earliest.equals(messageId) && 
!MessageId.latest.equals(messageId);
+        return !messageId.equals(MessageId.earliest) && 
!messageId.equals(MessageId.latest);
     }
 
     public void tryAcknowledgeMessage(Message<T> msg) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 70d57db3bb6..37f58a02180 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -95,14 +95,6 @@ class NegativeAcksTracker implements Closeable {
     }
 
     private synchronized void add(MessageId messageId, int redeliveryCount) {
-        messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
-
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-            messageId = new MessageIdImpl(batchMessageId.getLedgerId(), 
batchMessageId.getEntryId(),
-                    batchMessageId.getPartitionIndex());
-        }
-
         if (nackedMessages == null) {
             nackedMessages = new HashMap<>();
         }
@@ -113,7 +105,7 @@ class NegativeAcksTracker implements Closeable {
         } else {
             backoffNs = nackDelayNanos;
         }
-        nackedMessages.put(messageId, System.nanoTime() + backoffNs);
+        nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId), 
System.nanoTime() + backoffNs);
 
         if (this.timeout == null) {
             // Schedule a task and group all the redeliveries for same period. 
Leave a small buffer to allow for
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index 32f8fb92230..e8951cd3d16 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -43,7 +43,7 @@ public class NonPersistentAcknowledgmentGroupingTracker 
implements Acknowledgmen
         return false;
     }
 
-    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType, Map<String,
+    public CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType 
ackType, Map<String,
             Long> properties) {
         // no-op
         return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index fef0bcb8906..9086ccc4ef0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -43,6 +44,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.TimedCompletableFuture;
@@ -79,8 +81,8 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      * This is a set of all the individual acks that the application has 
issued and that were not already sent to
      * broker.
      */
-    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
-    private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> 
pendingIndividualBatchIndexAcks;
+    private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
+    private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> 
pendingIndividualBatchIndexAcks;
 
     private final ScheduledFuture<?> scheduledTask;
     private final boolean batchIndexAckEnabled;
@@ -113,18 +115,16 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      */
     @Override
     public boolean isDuplicate(MessageId messageId) {
-        if (!(messageId instanceof MessageIdImpl)) {
+        if (!(messageId instanceof MessageIdAdv)) {
             throw new IllegalArgumentException("isDuplicated cannot accept "
                     + messageId.getClass().getName() + ": " + messageId);
         }
-        if (lastCumulativeAck.compareTo(messageId) >= 0) {
+        final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+        if (lastCumulativeAck.compareTo(messageIdAdv) >= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            final MessageIdImpl messageIdImpl = (messageId instanceof 
BatchMessageIdImpl)
-                    ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
-                    : (MessageIdImpl) messageId;
-            return pendingIndividualAcks.contains(messageIdImpl);
+            return 
pendingIndividualAcks.contains(MessageIdAdvUtils.discardBatch(messageIdAdv));
         }
     }
 
@@ -135,10 +135,10 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             if (consumer.isAckReceiptEnabled()) {
                 Set<CompletableFuture<Void>> completableFutureSet = new 
HashSet<>();
                 messageIds.forEach(messageId ->
-                        
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, 
properties)));
+                        completableFutureSet.add(addAcknowledgment(messageId, 
ackType, properties)));
                 return FutureUtil.waitForAll(new 
ArrayList<>(completableFutureSet));
             } else {
-                messageIds.forEach(messageId -> 
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                messageIds.forEach(messageId -> addAcknowledgment(messageId, 
ackType, properties));
                 return CompletableFuture.completedFuture(null);
             }
         } else {
@@ -162,46 +162,43 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
 
     private void addListAcknowledgment(List<MessageId> messageIds) {
         for (MessageId messageId : messageIds) {
-            if (messageId instanceof BatchMessageIdImpl) {
-                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(),
-                        batchMessageId,
+            MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+            if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
+                
addIndividualAcknowledgment(MessageIdAdvUtils.discardBatch(messageIdAdv),
+                        messageIdAdv,
                         this::doIndividualAckAsync,
                         this::doIndividualBatchAckAsync);
-            } else if (messageId instanceof MessageIdImpl) {
-                addIndividualAcknowledgment((MessageIdImpl) messageId,
+            } else {
+                addIndividualAcknowledgment(messageIdAdv,
                         null,
                         this::doIndividualAckAsync,
                         this::doIndividualBatchAckAsync);
-            } else {
-                throw new IllegalStateException("Unsupported message id type 
in addListAcknowledgement: "
-                        + messageId.getClass().getCanonicalName());
             }
         }
     }
 
     @Override
-    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, 
AckType ackType,
+    public CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType 
ackType,
                                                      Map<String, Long> 
properties) {
-        if (msgId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            return addAcknowledgment(batchMessageId.toMessageIdImpl(), 
ackType, properties, batchMessageId);
+        MessageIdAdv msgIdAdv = (MessageIdAdv) msgId;
+        if (MessageIdAdvUtils.isBatch(msgIdAdv)) {
+            return addAcknowledgment(MessageIdAdvUtils.discardBatch(msgId), 
ackType, properties, msgIdAdv);
         } else {
-            return addAcknowledgment(msgId, ackType, properties, null);
+            return addAcknowledgment(msgIdAdv, ackType, properties, null);
         }
     }
 
     private CompletableFuture<Void> addIndividualAcknowledgment(
-            MessageIdImpl msgId,
-            @Nullable BatchMessageIdImpl batchMessageId,
-            Function<MessageIdImpl, CompletableFuture<Void>> 
individualAckFunction,
-            Function<BatchMessageIdImpl, CompletableFuture<Void>> 
batchAckFunction) {
+            MessageIdAdv msgId,
+            @Nullable MessageIdAdv batchMessageId,
+            Function<MessageIdAdv, CompletableFuture<Void>> 
individualAckFunction,
+            Function<MessageIdAdv, CompletableFuture<Void>> batchAckFunction) {
         if (batchMessageId != null) {
             consumer.onAcknowledge(batchMessageId, null);
         } else {
             consumer.onAcknowledge(msgId, null);
         }
-        if (batchMessageId == null || batchMessageId.ackIndividual()) {
+        if (batchMessageId == null || 
MessageIdAdvUtils.acknowledge(batchMessageId, true)) {
             consumer.getStats().incrementNumAcksSent((batchMessageId != null) 
? batchMessageId.getBatchSize() : 1);
             consumer.getUnAckedMessageTracker().remove(msgId);
             if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
@@ -215,10 +212,10 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
+    private CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId,
                                                       AckType ackType,
                                                       Map<String, Long> 
properties,
-                                                      @Nullable 
BatchMessageIdImpl batchMessageId) {
+                                                      @Nullable MessageIdAdv 
batchMessageId) {
         switch (ackType) {
             case Individual:
                 return addIndividualAcknowledgment(msgId,
@@ -231,15 +228,12 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                 } else {
                     consumer.onAcknowledgeCumulative(msgId, null);
                 }
-                if (batchMessageId == null || batchMessageId.ackCumulative()) {
+                if (batchMessageId == null || 
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
                     return doCumulativeAck(msgId, properties, null);
                 } else if (batchIndexAckEnabled) {
                     return doCumulativeBatchIndexAck(batchMessageId, 
properties);
                 } else {
-                    if 
(!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                        doCumulativeAck(batchMessageId.prevBatchMessageId(), 
properties, null);
-                        
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                    }
+                    
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, 
null);
                     return CompletableFuture.completedFuture(null);
                 }
             default:
@@ -247,7 +241,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, 
Map<String, Long> properties) {
+    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, 
Map<String, Long> properties) {
         if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
             // We cannot group acks if the delay is 0 or when there are 
properties attached to it. Fortunately that's an
             // uncommon condition since it's only used for the compaction 
subscription.
@@ -267,13 +261,13 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
     }
 
 
-    private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl 
messageId) {
+    private CompletableFuture<Void> doIndividualAckAsync(MessageIdAdv 
messageId) {
         pendingIndividualAcks.add(messageId);
         pendingIndividualBatchIndexAcks.remove(messageId);
         return CompletableFuture.completedFuture(null);
     }
 
-    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl 
batchMessageId,
+    private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv 
batchMessageId,
                                                          Map<String, Long> 
properties) {
         if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
             return doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
@@ -283,7 +277,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl 
batchMessageId) {
+    private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv 
batchMessageId) {
         Optional<Lock> readLock = acquireReadLock();
         try {
             doIndividualBatchAckAsync(batchMessageId);
@@ -296,7 +290,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, 
Map<String, Long> properties,
+    private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, 
Map<String, Long> properties,
                                                     BitSetRecyclable bitSet) {
         
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
         if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
@@ -314,29 +308,29 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> 
doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+    private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv 
msgId) {
         ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
-                batchMessageId.toMessageIdImpl(), __ -> {
-                    ConcurrentBitSetRecyclable value;
-                    if (batchMessageId.getAcker() != null
-                            && !(batchMessageId.getAcker() instanceof 
BatchMessageAckerDisabled)) {
-                        value = 
ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
+                MessageIdAdvUtils.discardBatch(msgId), __ -> {
+                    final BitSet ackSet = msgId.getAckSet();
+                    final ConcurrentBitSetRecyclable value;
+                    if (ackSet != null && !ackSet.isEmpty()) {
+                        value = ConcurrentBitSetRecyclable.create(ackSet);
                     } else {
                         value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, batchMessageId.getOriginalBatchSize());
+                        value.set(0, msgId.getBatchSize());
                     }
                     return value;
                 });
-        bitSet.clear(batchMessageId.getBatchIndex());
+        bitSet.clear(msgId.getBatchIndex());
         return CompletableFuture.completedFuture(null);
     }
 
-    private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable 
bitSet) {
+    private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable 
bitSet) {
         // Handle concurrent updates from different threads
         lastCumulativeAck.update(msgId, bitSet);
     }
 
-    private CompletableFuture<Void> 
doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
+    private CompletableFuture<Void> doCumulativeBatchIndexAck(MessageIdAdv 
batchMessageId,
                                                               Map<String, 
Long> properties) {
         if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
             return doImmediateBatchIndexAck(batchMessageId, 
batchMessageId.getBatchIndex(),
@@ -349,7 +343,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> doImmediateAck(MessageIdImpl msgId, 
AckType ackType, Map<String, Long> properties,
+    private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType 
ackType, Map<String, Long> properties,
                                                    BitSetRecyclable bitSet) {
         ClientCnx cnx = consumer.getClientCnx();
 
@@ -360,7 +354,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, 
ackType, properties, cnx);
     }
 
-    private CompletableFuture<Void> 
doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int 
batchSize,
+    private CompletableFuture<Void> doImmediateBatchIndexAck(MessageIdAdv 
msgId, int batchIndex, int batchSize,
                                                              AckType ackType, 
Map<String, Long> properties) {
         ClientCnx cnx = consumer.getClientCnx();
 
@@ -369,8 +363,8 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                     .ConnectException("Consumer connect fail! consumer state:" 
+ consumer.getState()));
         }
         BitSetRecyclable bitSet;
-        if (msgId.getAcker() != null && !(msgId.getAcker() instanceof 
BatchMessageAckerDisabled)) {
-            bitSet = 
BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
+        if (msgId.getAckSet() != null) {
+            bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray());
         } else {
             bitSet = BitSetRecyclable.create();
             bitSet.set(0, batchSize);
@@ -382,7 +376,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         CompletableFuture<Void> completableFuture = 
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
-                msgId.ledgerId, msgId.entryId, bitSet, ackType, properties, 
true, null, null);
+                msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, 
properties, true, null, null);
         bitSet.recycle();
         return completableFuture;
     }
@@ -414,7 +408,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         boolean shouldFlush = false;
         if (lastCumulativeAckToFlush != null) {
             shouldFlush = true;
-            final MessageIdImpl messageId = 
lastCumulativeAckToFlush.getMessageId();
+            final MessageIdAdv messageId = 
lastCumulativeAckToFlush.getMessageId();
             newMessageAckCommandAndWrite(cnx, consumer.consumerId, 
messageId.getLedgerId(), messageId.getEntryId(),
                     lastCumulativeAckToFlush.getBitSetRecyclable(), 
AckType.Cumulative,
                     Collections.emptyMap(), false,
@@ -429,7 +423,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 // We can send 1 single protobuf command with all individual 
acks
                 while (true) {
-                    MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
+                    MessageIdAdv msgId = pendingIndividualAcks.pollFirst();
                     if (msgId == null) {
                         break;
                     }
@@ -452,7 +446,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             } else {
                 // When talking to older brokers, send the acknowledgements 
individually
                 while (true) {
-                    MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
+                    MessageIdAdv msgId = pendingIndividualAcks.pollFirst();
                     if (msgId == null) {
                         break;
                     }
@@ -465,12 +459,13 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         if (!pendingIndividualBatchIndexAcks.isEmpty()) {
-            Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> 
iterator =
+            Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> 
iterator =
                     pendingIndividualBatchIndexAcks.entrySet().iterator();
 
             while (iterator.hasNext()) {
-                Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry = 
iterator.next();
-                entriesToAck.add(Triple.of(entry.getKey().ledgerId, 
entry.getKey().entryId, entry.getValue()));
+                Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = 
iterator.next();
+                entriesToAck.add(Triple.of(
+                        entry.getKey().getLedgerId(), 
entry.getKey().getEntryId(), entry.getValue()));
                 iterator.remove();
             }
         }
@@ -509,7 +504,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdImpl msgId,
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, 
MessageIdAdv msgId,
                                                             BitSetRecyclable 
bitSet, AckType ackType,
                                                             Map<String, Long> 
map, ClientCnx cnx) {
         MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
@@ -535,7 +530,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                 completableFuture = CompletableFuture.completedFuture(null);
             }
         } else {
-            completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, 
msgId.ledgerId, msgId.getEntryId(),
+            completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, 
msgId.getLedgerId(), msgId.getEntryId(),
                     bitSet, ackType, map, true, null, null);
         }
         return completableFuture;
@@ -621,13 +616,13 @@ class LastCumulativeAck {
                     return new LastCumulativeAck();
                 }
             };
-    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) 
MessageIdImpl.earliest;
+    public static final MessageIdAdv DEFAULT_MESSAGE_ID = (MessageIdAdv) 
MessageId.earliest;
 
-    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private volatile MessageIdAdv messageId = DEFAULT_MESSAGE_ID;
     private BitSetRecyclable bitSetRecyclable = null;
     private boolean flushRequired = false;
 
-    public synchronized void update(final MessageIdImpl messageId, final 
BitSetRecyclable bitSetRecyclable) {
+    public synchronized void update(final MessageIdAdv messageId, final 
BitSetRecyclable bitSetRecyclable) {
         if (compareTo(messageId) < 0) {
             if (this.bitSetRecyclable != null && this.bitSetRecyclable != 
bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
@@ -662,25 +657,22 @@ class LastCumulativeAck {
         flushRequired = false;
     }
 
-    public synchronized int compareTo(MessageId messageId) {
-        if (this.messageId instanceof BatchMessageIdImpl && (!(messageId 
instanceof BatchMessageIdImpl))) {
-            final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
-            final MessageIdImpl rhs = (MessageIdImpl) messageId;
-            return MessageIdImpl.messageIdCompare(
-                    lhs.getLedgerId(), lhs.getEntryId(), 
lhs.getPartitionIndex(), lhs.getBatchIndex(),
-                    rhs.getLedgerId(), rhs.getEntryId(), 
rhs.getPartitionIndex(), Integer.MAX_VALUE);
-        } else if (messageId instanceof BatchMessageIdImpl && 
(!(this.messageId instanceof BatchMessageIdImpl))){
-            final MessageIdImpl lhs = this.messageId;
-            final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
-            return MessageIdImpl.messageIdCompare(
-                    lhs.getLedgerId(), lhs.getEntryId(), 
lhs.getPartitionIndex(), Integer.MAX_VALUE,
-                    rhs.getLedgerId(), rhs.getEntryId(), 
rhs.getPartitionIndex(), rhs.getBatchIndex());
-        } else {
-            return this.messageId.compareTo(messageId);
+    public synchronized int compareTo(MessageIdAdv messageId) {
+        int result = Long.compare(this.messageId.getLedgerId(), 
messageId.getLedgerId());
+        if (result != 0) {
+            return result;
+        }
+        result = Long.compare(this.messageId.getEntryId(), 
messageId.getEntryId());
+        if (result != 0) {
+            return result;
         }
+        return Integer.compare(
+                (this.messageId.getBatchIndex() >= 0) ? 
this.messageId.getBatchIndex() : Integer.MAX_VALUE,
+                (messageId.getBatchIndex() >= 0) ? messageId.getBatchIndex() : 
Integer.MAX_VALUE
+        );
     }
 
-    private synchronized void set(final MessageIdImpl messageId, final 
BitSetRecyclable bitSetRecyclable) {
+    private synchronized void set(final MessageIdAdv messageId, final 
BitSetRecyclable bitSetRecyclable) {
         this.messageId = messageId;
         this.bitSetRecyclable = bitSetRecyclable;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index 06f79024c4f..1c4230470db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.TopicMessageId;
 
 @Data
 @NoArgsConstructor
@@ -67,18 +69,12 @@ public class ResetCursorData {
     }
 
     public ResetCursorData(MessageId messageId) {
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-            this.ledgerId = batchMessageId.getLedgerId();
-            this.entryId = batchMessageId.getEntryId();
-            this.batchIndex = batchMessageId.getBatchIndex();
-            this.partitionIndex = batchMessageId.partitionIndex;
-        } else if (messageId instanceof MessageIdImpl) {
-            MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
-            this.ledgerId = messageIdImpl.getLedgerId();
-            this.entryId = messageIdImpl.getEntryId();
-            this.partitionIndex = messageIdImpl.partitionIndex;
-        }  else if (messageId instanceof TopicMessageIdImpl) {
+        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+        this.ledgerId = messageIdAdv.getLedgerId();
+        this.entryId = messageIdAdv.getEntryId();
+        this.batchIndex = messageIdAdv.getBatchIndex();
+        this.partitionIndex = messageIdAdv.getPartitionIndex();
+        if (messageId instanceof TopicMessageId) {
             throw new IllegalArgumentException("Not supported operation on 
partitioned-topic");
         }
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 941f18cf65a..189dc1c6083 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -21,16 +21,12 @@ package org.apache.pulsar.client.impl;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.TopicMessageId;
 
-public class TopicMessageIdImpl implements TopicMessageId {
+public class TopicMessageIdImpl extends TopicMessageId.Impl {
 
-    /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
-    private final String topicPartitionName;
     private final String topicName;
-    private final MessageId messageId;
 
     public TopicMessageIdImpl(String topicPartitionName, String topicName, 
MessageId messageId) {
-        this.messageId = messageId;
-        this.topicPartitionName = topicPartitionName;
+        super(topicPartitionName, messageId);
         this.topicName = topicName;
     }
 
@@ -49,40 +45,21 @@ public class TopicMessageIdImpl implements TopicMessageId {
      */
     @Deprecated
     public String getTopicPartitionName() {
-        return this.topicPartitionName;
+        return getOwnerTopic();
     }
 
+    @Deprecated
     public MessageId getInnerMessageId() {
-        return messageId;
-    }
-
-    @Override
-    public String toString() {
-        return messageId.toString();
-    }
-
-    @Override
-    public byte[] toByteArray() {
-        return messageId.toByteArray();
-    }
-
-    @Override
-    public int hashCode() {
-        return messageId.hashCode();
+        return new MessageIdImpl(getLedgerId(), getEntryId(), 
getPartitionIndex());
     }
 
     @Override
     public boolean equals(Object obj) {
-        return messageId.equals(obj);
+        return super.equals(obj);
     }
 
     @Override
-    public int compareTo(MessageId o) {
-        return messageId.compareTo(o);
-    }
-
-    @Override
-    public String getOwnerTopic() {
-        return topicPartitionName;
+    public int hashCode() {
+        return super.hashCode();
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index c3fcb0a16a3..d24ecbd6aa9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -70,7 +70,7 @@ public class TopicMessageImpl<T> implements Message<T> {
 
     @Deprecated
     public MessageId getInnerMessageId() {
-        return MessageIdImpl.convertToMessageIdImpl(messageId);
+        return messageId.getInnerMessageId();
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 42eb197d632..ae874b4da6d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -174,7 +174,8 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
                 }
                 waitingOnListenerForZeroQueueSize = true;
                 trackMessage(message);
-                
unAckedMessageTracker.add(normalizeMessageId(message.getMessageId()), 
message.getRedeliveryCount());
+                unAckedMessageTracker.add(
+                        
MessageIdAdvUtils.discardBatch(message.getMessageId()), 
message.getRedeliveryCount());
                 listener.received(ZeroQueueConsumerImpl.this, 
beforeConsume(message));
             } catch (Throwable t) {
                 log.error("[{}][{}] Message listener error in processing 
unqueued message: {}", topic, subscription,
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml 
b/pulsar-client/src/main/resources/findbugsExclude.xml
index e5f8babe841..92ec9e934ee 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -1007,4 +1007,9 @@
         <Method name="getThread"/>
         <Bug pattern="EI_EXPOSE_REP"/>
     </Match>
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.BatchMessageIdImpl"/>
+        <Method name="getAckSet"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
 </FindBugsFilter>
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index ddca6951e49..0418a54c772 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.TimedCompletableFuture;
@@ -61,7 +62,7 @@ public class AcknowledgementsGroupingTrackerTest {
         eventLoopGroup = new NioEventLoopGroup(1);
         consumer = mock(ConsumerImpl.class);
         consumer.unAckedChunkedMessageIdSequenceMap =
-                ConcurrentOpenHashMap.<MessageIdImpl, 
MessageIdImpl[]>newBuilder().build();
+                ConcurrentOpenHashMap.<MessageIdAdv, 
MessageIdImpl[]>newBuilder().build();
         cnx = spy(new ClientCnxTest(new ClientConfigurationData(), 
eventLoopGroup));
         PulsarClientImpl client = mock(PulsarClientImpl.class);
         doReturn(client).when(consumer).getClient();
@@ -391,21 +392,21 @@ public class AcknowledgementsGroupingTrackerTest {
     public void testDoIndividualBatchAckAsync() throws Exception{
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
         AcknowledgmentsGroupingTracker tracker = new 
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
-        MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, 
BatchMessageAckerDisabled.INSTANCE);
+        MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null);
         BitSet bitSet = new BitSet(20);
         for(int i = 0; i < 20; i ++) {
             bitSet.set(i, true);
         }
-        MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, 
BatchMessageAcker.newAcker(bitSet));
+        MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet);
         Method doIndividualBatchAckAsync = 
PersistentAcknowledgmentsGroupingTracker.class
-                .getDeclaredMethod("doIndividualBatchAckAsync", 
BatchMessageIdImpl.class);
+                .getDeclaredMethod("doIndividualBatchAckAsync", 
MessageIdAdv.class);
         doIndividualBatchAckAsync.setAccessible(true);
         doIndividualBatchAckAsync.invoke(tracker, messageId1);
         doIndividualBatchAckAsync.invoke(tracker, messageId2);
         Field pendingIndividualBatchIndexAcks = 
PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
         pendingIndividualBatchIndexAcks.setAccessible(true);
-        ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> 
batchIndexAcks =
-                (ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>) 
pendingIndividualBatchIndexAcks.get(tracker);
+        ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> 
batchIndexAcks =
+                (ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>) 
pendingIndividualBatchIndexAcks.get(tracker);
         MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
         MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
         assertTrue(batchIndexAcks.containsKey(position1));
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
deleted file mode 100644
index 1b3795d878c..00000000000
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import org.testng.annotations.Test;
-
-public class BatchMessageAckerDisabledTest {
-
-    @Test
-    public void testAckIndividual() {
-        for (int i = 0; i < 10; i++) {
-            assertTrue(BatchMessageAckerDisabled.INSTANCE.ackIndividual(i));
-        }
-    }
-
-    @Test
-    public void testAckCumulative() {
-        for (int i = 0; i < 10; i++) {
-            assertTrue(BatchMessageAckerDisabled.INSTANCE.ackCumulative(i));
-        }
-    }
-
-    @Test
-    public void testGetOutstandingAcks() {
-        assertEquals(0, 
BatchMessageAckerDisabled.INSTANCE.getOutstandingAcks());
-    }
-
-}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
deleted file mode 100644
index d31fd18cba9..00000000000
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.BitSet;
-
-public class BatchMessageAckerTest {
-
-    private static final int BATCH_SIZE = 10;
-
-    private BatchMessageAcker acker;
-
-    @BeforeMethod
-    public void setup() {
-        acker = BatchMessageAcker.newAcker(10);
-    }
-
-    @Test
-    public void testAckers() {
-        assertEquals(BATCH_SIZE, acker.getOutstandingAcks());
-        assertEquals(BATCH_SIZE, acker.getBatchSize());
-
-        assertFalse(acker.ackIndividual(4));
-        for (int i = 0; i < BATCH_SIZE; i++) {
-            if (4 == i) {
-                assertFalse(acker.getBitSet().get(i));
-            } else {
-                assertTrue(acker.getBitSet().get(i));
-            }
-        }
-
-        assertFalse(acker.ackCumulative(6));
-        for (int i = 0; i < BATCH_SIZE; i++) {
-            if (i <= 6) {
-                assertFalse(acker.getBitSet().get(i));
-            } else {
-                assertTrue(acker.getBitSet().get(i));
-            }
-        }
-
-        for (int i = BATCH_SIZE - 1; i >= 8; i--) {
-            assertFalse(acker.ackIndividual(i));
-            assertFalse(acker.getBitSet().get(i));
-        }
-
-        assertTrue(acker.ackIndividual(7));
-        assertEquals(0, acker.getOutstandingAcks());
-    }
-
-    @Test
-    public void testBitSetAcker() {
-        BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray());
-        BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet);
-
-        Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet());
-        Assert.assertEquals(acker.getOutstandingAcks(), 
bitSetAcker.getOutstandingAcks());
-    }
-
-}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
index 6bf9cd94348..10d805cdc4d 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
@@ -20,13 +20,8 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectWriter;
 import java.io.IOException;
 import java.util.Collections;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.annotations.Test;
 
 public class BatchMessageIdImplTest {
@@ -123,36 +118,10 @@ public class BatchMessageIdImplTest {
         assertEquals(batchMsgId2.hashCode(), msgId2.hashCode());
     }
 
-    @Test
-    public void deserializationTest() {
-        // initialize BitSet with null
-        BatchMessageAcker ackerDisabled = new BatchMessageAcker(null, 0);
-        BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0, 0, 
ackerDisabled);
-
-        ObjectWriter writer = 
ObjectMapperFactory.create().writerWithDefaultPrettyPrinter();
-
-        try {
-            writer.writeValueAsString(batchMsgId);
-            fail("Shouldn't be deserialized");
-        } catch (JsonProcessingException e) {
-            // expected
-            assertTrue(e.getCause() instanceof NullPointerException);
-        }
-
-        // use the default BatchMessageAckerDisabled
-        BatchMessageIdImpl batchMsgIdToDeserialize = new BatchMessageIdImpl(0, 
0, 0, 0);
-
-        try {
-            writer.writeValueAsString(batchMsgIdToDeserialize);
-        } catch (JsonProcessingException e) {
-            fail("Should be successful");
-        }
-    }
-
     @Test
     public void serializeAndDeserializeTest() throws IOException {
         BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(1, 1, 0,
-            1, 10, BatchMessageAcker.newAcker(10));
+            1, 10, BatchMessageIdImpl.newAckSet(10));
         byte[] serialized = batchMessageId.toByteArray();
         BatchMessageIdImpl deserialized = (BatchMessageIdImpl) 
MessageIdImpl.fromByteArray(serialized);
         assertEquals(deserialized.getBatchSize(), 
batchMessageId.getBatchSize());
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
index 7f029635241..4173d6439b9 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
@@ -43,8 +43,7 @@ public class MessageIdSerializationTest {
 
     @Test
     public void testBatchSizeNotSet() throws Exception {
-        MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
-                BatchMessageAckerDisabled.INSTANCE);
+        MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1, null);
         byte[] serialized = id.toByteArray();
         assertEquals(MessageId.fromByteArray(serialized), id);
         assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), 
id);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 28cce0fe622..7df173da0f1 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -48,6 +48,7 @@ import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationDataBasic;
@@ -325,7 +326,7 @@ public class FunctionCommon {
     }
 
     public static final long getSequenceId(MessageId messageId) {
-        MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(messageId);
+        MessageIdAdv msgId = (MessageIdAdv) messageId;
         long ledgerId = msgId.getLedgerId();
         long entryId = msgId.getEntryId();
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 10efc91ccda..ff0bfd391e8 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -389,6 +390,23 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         int batchIdx;
     }
 
+    private static Method getMethodOfMessageId(MessageId messageId, String 
name) throws NoSuchMethodException {
+        Class<?> clazz = messageId.getClass();
+        NoSuchMethodException firstException = null;
+        while (clazz != null) {
+            try {
+                return clazz.getDeclaredMethod(name);
+            } catch (NoSuchMethodException e) {
+                if (firstException == null) {
+                    firstException = e;
+                }
+                clazz = clazz.getSuperclass();
+            }
+        }
+        assert firstException != null;
+        throw firstException;
+    }
+
     @VisibleForTesting
     static BatchMessageSequenceRef 
getMessageSequenceRefForBatchMessage(MessageId messageId) {
         long ledgerId;
@@ -396,23 +414,17 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
         int batchIdx;
         try {
             try {
-                messageId = (MessageId) 
messageId.getClass().getDeclaredMethod("getInnerMessageId").invoke(messageId);
-            } catch (NoSuchMethodException noSuchMethodException) {
-                // not a TopicMessageIdImpl
-            }
-
-            try {
-                batchIdx = (int) 
messageId.getClass().getDeclaredMethod("getBatchIndex").invoke(messageId);
+                batchIdx = (int) getMethodOfMessageId(messageId, 
"getBatchIndex").invoke(messageId);
+                if (batchIdx < 0) {
+                    return null;
+                }
             } catch (NoSuchMethodException noSuchMethodException) {
                 // not a BatchMessageIdImpl, returning null to use the 
standard sequenceId
                 return null;
             }
 
-            // if getBatchIndex exists it means messageId is a 
'BatchMessageIdImpl' instance.
-            final Class<?> messageIdImplClass = 
messageId.getClass().getSuperclass();
-
-            ledgerId = (long) 
messageIdImplClass.getDeclaredMethod("getLedgerId").invoke(messageId);
-            entryId = (long) 
messageIdImplClass.getDeclaredMethod("getEntryId").invoke(messageId);
+            ledgerId = (long) getMethodOfMessageId(messageId, 
"getLedgerId").invoke(messageId);
+            entryId = (long) getMethodOfMessageId(messageId, 
"getEntryId").invoke(messageId);
         } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException ex) {
             log.error("Unexpected error while retrieving sequenceId, messageId 
class: {}, error: {}",
                     messageId.getClass().getName(), ex.getMessage(), ex);


Reply via email to