This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c50a6c2e91 [improve][client] PIP-229: Add a common interface to get
fields of MessageIdData (#19414)
8c50a6c2e91 is described below
commit 8c50a6c2e91c81dbf187ce5e66cb39e2758a741e
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Apr 4 15:52:34 2023 +0800
[improve][client] PIP-229: Add a common interface to get fields of
MessageIdData (#19414)
---
.../broker/service/PersistentFailoverE2ETest.java | 8 +-
.../broker/service/SubscriptionSeekTest.java | 2 +-
.../pulsar/client/api/CustomMessageIdTest.java | 142 +++++++++++++
.../api/PartitionedProducerConsumerTest.java | 2 +-
.../apache/pulsar/client/impl/ConsumerAckTest.java | 4 +-
.../apache/pulsar/client/impl/MessageIdTest.java | 3 -
.../org/apache/pulsar/client/api/MessageIdAdv.java | 122 +++++++++++
.../apache/pulsar/client/api/TopicMessageId.java | 43 +++-
.../impl/AcknowledgmentsGroupingTracker.java | 2 +-
.../pulsar/client/impl/BatchMessageAcker.java | 95 ---------
.../client/impl/BatchMessageAckerDisabled.java | 50 -----
.../pulsar/client/impl/BatchMessageIdImpl.java | 82 +++-----
.../pulsar/client/impl/ChunkMessageIdImpl.java | 9 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 15 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 222 ++++++++-------------
.../pulsar/client/impl/MessageIdAdvUtils.java | 74 +++++++
.../apache/pulsar/client/impl/MessageIdImpl.java | 71 +------
.../org/apache/pulsar/client/impl/MessageImpl.java | 8 +-
.../client/impl/MessagePayloadContextImpl.java | 9 +-
.../client/impl/MultiTopicsConsumerImpl.java | 44 ++--
.../pulsar/client/impl/NegativeAcksTracker.java | 10 +-
...NonPersistentAcknowledgmentGroupingTracker.java | 2 +-
.../PersistentAcknowledgmentsGroupingTracker.java | 158 +++++++--------
.../apache/pulsar/client/impl/ResetCursorData.java | 20 +-
.../pulsar/client/impl/TopicMessageIdImpl.java | 39 +---
.../pulsar/client/impl/TopicMessageImpl.java | 2 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 3 +-
.../src/main/resources/findbugsExclude.xml | 5 +
.../impl/AcknowledgementsGroupingTrackerTest.java | 13 +-
.../client/impl/BatchMessageAckerDisabledTest.java | 47 -----
.../pulsar/client/impl/BatchMessageAckerTest.java | 83 --------
.../pulsar/client/impl/BatchMessageIdImplTest.java | 33 +--
.../client/impl/MessageIdSerializationTest.java | 3 +-
.../pulsar/functions/utils/FunctionCommon.java | 3 +-
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 36 ++--
35 files changed, 686 insertions(+), 778 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index b263d4448d8..f5895ec3761 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -41,11 +41,11 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -370,8 +370,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
totalMessages++;
consumer1.acknowledge(msg);
- MessageIdImpl msgId =
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
- receivedPtns.add(msgId.getPartitionIndex());
+ receivedPtns.add(((MessageIdAdv)
msg.getMessageId()).getPartitionIndex());
}
assertTrue(Sets.difference(listener1.activePtns,
receivedPtns).isEmpty());
@@ -387,8 +386,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
totalMessages++;
consumer2.acknowledge(msg);
- MessageIdImpl msgId =
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
- receivedPtns.add(msgId.getPartitionIndex());
+ receivedPtns.add(((MessageIdAdv)
msg.getMessageId()).getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns,
receivedPtns).isEmpty());
assertTrue(Sets.difference(listener2.activePtns,
receivedPtns).isEmpty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 93f2a42bcda..b11946069c9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -678,7 +678,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
if (message == null) {
break;
}
-
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
+ received.add(message.getMessageId());
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
new file mode 100644
index 00000000000..52bfc9dda37
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class CustomMessageIdTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider
+ public static Object[][] enableBatching() {
+ return new Object[][]{
+ { true },
+ { false }
+ };
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ final var topic = "persistent://my-property/my-ns/test-seek-" +
System.currentTimeMillis();
+ @Cleanup final var producer =
pulsarClient.newProducer(Schema.INT32).topic(topic).create();
+ final var msgIds = new ArrayList<SimpleMessageIdImpl>();
+ for (int i = 0; i < 10; i++) {
+ msgIds.add(new SimpleMessageIdImpl((MessageIdAdv)
producer.send(i)));
+ }
+ @Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic).subscriptionName("sub").subscribe();
+ consumer.seek(msgIds.get(6));
+ final var msg = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ assertEquals(msg.getValue(), 7);
+ }
+
+ @Test(dataProvider = "enableBatching")
+ public void testAcknowledgment(boolean enableBatching) throws Exception {
+ final var topic = "persistent://my-property/my-ns/test-ack-"
+ + enableBatching + System.currentTimeMillis();
+ final var producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(enableBatching)
+ .batchingMaxMessages(10)
+ .batchingMaxPublishDelay(300, TimeUnit.MILLISECONDS)
+ .create();
+ final var consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .enableBatchIndexAcknowledgment(true)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ for (int i = 0; i < 10; i++) {
+ producer.sendAsync(i);
+ }
+ final var msgIds = new ArrayList<SimpleMessageIdImpl>();
+ for (int i = 0; i < 10; i++) {
+ final var msg = consumer.receive();
+ final var msgId = new SimpleMessageIdImpl((MessageIdAdv)
msg.getMessageId());
+ msgIds.add(msgId);
+ if (enableBatching) {
+ assertTrue(msgId.getBatchIndex() >= 0 && msgId.getBatchSize()
> 0);
+ } else {
+ assertFalse(msgId.getBatchIndex() >= 0 && msgId.getBatchSize()
> 0);
+ }
+ }
+ consumer.acknowledgeCumulative(msgIds.get(8));
+ consumer.redeliverUnacknowledgedMessages();
+ final var msg = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ assertEquals(msg.getValue(), 9);
+ }
+
+ private record SimpleMessageIdImpl(long ledgerId, long entryId, int
batchIndex, int batchSize)
+ implements MessageIdAdv {
+
+ public SimpleMessageIdImpl(MessageIdAdv msgId) {
+ this(msgId.getLedgerId(), msgId.getEntryId(),
msgId.getBatchIndex(), msgId.getBatchSize());
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return new byte[0]; // never used
+ }
+
+ @Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ @Override
+ public int getBatchIndex() {
+ return batchIndex;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index cd384e58789..13ae991a0e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -767,7 +767,7 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
-
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(),
2);
+ Assert.assertEquals(((MessageIdAdv)
msg.getMessageId()).getPartitionIndex(), 2);
consumer1.acknowledge(msg);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index 42da6090648..a83283bc267 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -176,10 +176,10 @@ public class ConsumerAckTest extends ProducerConsumerBase
{
messageIds.add(message.getMessageId());
}
MessageId firstEntryMessageId = messageIds.get(0);
- MessageId secondEntryMessageId = ((BatchMessageIdImpl)
messageIds.get(1)).toMessageIdImpl();
+ MessageId secondEntryMessageId =
MessageIdAdvUtils.discardBatch(messageIds.get(1));
// Verify messages 2 to N must be in the same entry
for (int i = 2; i < messageIds.size(); i++) {
- assertEquals(((BatchMessageIdImpl)
messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
+ assertEquals(MessageIdAdvUtils.discardBatch(messageIds.get(i)),
secondEntryMessageId);
}
assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index ceb5c51e6aa..375bbff8a4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -118,9 +118,6 @@ public class MessageIdTest extends BrokerTestBase {
Message<byte[]> message = consumer.receive();
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
- if (topicType == TopicType.PARTITIONED) {
- messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
- }
assertTrue(messageIds.remove(messageId), "Failed to receive
message");
}
log.info("Remaining message IDs = {}", messageIds);
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
new file mode 100644
index 00000000000..73ecfed0ad0
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.BitSet;
+
+/**
+ * The {@link MessageId} interface provided for advanced users.
+ * <p>
+ * All built-in MessageId implementations should be able to be cast to
MessageIdAdv.
+ * </p>
+ */
+public interface MessageIdAdv extends MessageId {
+
+ /**
+ * Get the ledger ID.
+ *
+ * @return the ledger ID
+ */
+ long getLedgerId();
+
+ /**
+ * Get the entry ID.
+ *
+ * @return the entry ID
+ */
+ long getEntryId();
+
+ /**
+ * Get the partition index.
+ *
+ * @return -1 if the message is from a non-partitioned topic, otherwise
the non-negative partition index
+ */
+ default int getPartitionIndex() {
+ return -1;
+ }
+
+ /**
+ * Get the batch index.
+ *
+ * @return -1 if the message is not in a batch
+ */
+ default int getBatchIndex() {
+ return -1;
+ }
+
+ /**
+ * Get the batch size.
+ *
+ * @return 0 if the message is not in a batch
+ */
+ default int getBatchSize() {
+ return 0;
+ }
+
+ /**
+ * Get the BitSet that indicates which messages in the batch.
+ *
+ * @implNote The message IDs of a batch should share a BitSet. For
example, given 3 messages in the same batch whose
+ * size is 3, all message IDs of them should return "111" (i.e. a BitSet
whose size is 3 and all bits are 1). If the
+ * 1st message has been acknowledged, the returned BitSet should become
"011" (i.e. the 1st bit become 0).
+ *
+ * @return null if the message is a non-batched message
+ */
+ default BitSet getAckSet() {
+ return null;
+ }
+
+ /**
+ * Get the message ID of the first chunk if the current message ID
represents the position of a chunked message.
+ *
+ * @implNote A chunked message is distributed across different BookKeeper
entries. The message ID of a chunked
+ * message is composed of two message IDs that represent positions of the
first and the last chunk. The message ID
+ * itself represents the position of the last chunk.
+ *
+ * @return null if the message is not a chunked message
+ */
+ default MessageIdAdv getFirstChunkMessageId() {
+ return null;
+ }
+
+ /**
+ * The default implementation of {@link Comparable#compareTo(Object)}.
+ */
+ default int compareTo(MessageId o) {
+ if (!(o instanceof MessageIdAdv)) {
+ throw new UnsupportedOperationException("Unknown MessageId type: "
+ + ((o != null) ? o.getClass().getName() : "null"));
+ }
+ final MessageIdAdv other = (MessageIdAdv) o;
+ int result = Long.compare(this.getLedgerId(), other.getLedgerId());
+ if (result != 0) {
+ return result;
+ }
+ result = Long.compare(this.getEntryId(), other.getEntryId());
+ if (result != 0) {
+ return result;
+ }
+ // TODO: Correct the following compare logics, see
https://github.com/apache/pulsar/pull/18981
+ result = Integer.compare(this.getPartitionIndex(),
other.getPartitionIndex());
+ if (result != 0) {
+ return result;
+ }
+ return Integer.compare(this.getBatchIndex(), other.getBatchIndex());
+ }
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
index f6109d5f8e8..b70267bb0fb 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;
+import java.util.BitSet;
+
/**
* The MessageId used for a consumer that subscribes multiple topics or
partitioned topics.
*
@@ -49,13 +51,13 @@ public interface TopicMessageId extends MessageId {
/**
* The simplest implementation of a TopicMessageId interface.
*/
- class Impl implements TopicMessageId {
+ class Impl implements MessageIdAdv, TopicMessageId {
private final String topic;
- private final MessageId messageId;
+ private final MessageIdAdv messageId;
public Impl(String topic, MessageId messageId) {
this.topic = topic;
- this.messageId = messageId;
+ this.messageId = (MessageIdAdv) messageId;
}
@Override
@@ -68,6 +70,41 @@ public interface TopicMessageId extends MessageId {
return topic;
}
+ @Override
+ public long getLedgerId() {
+ return messageId.getLedgerId();
+ }
+
+ @Override
+ public long getEntryId() {
+ return messageId.getEntryId();
+ }
+
+ @Override
+ public int getPartitionIndex() {
+ return messageId.getPartitionIndex();
+ }
+
+ @Override
+ public int getBatchIndex() {
+ return messageId.getBatchIndex();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return messageId.getBatchSize();
+ }
+
+ @Override
+ public BitSet getAckSet() {
+ return messageId.getAckSet();
+ }
+
+ @Override
+ public MessageIdAdv getFirstChunkMessageId() {
+ return messageId.getFirstChunkMessageId();
+ }
+
@Override
public int compareTo(MessageId o) {
return messageId.compareTo(o);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index d46af1a99e7..60d7135e5e4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -31,7 +31,7 @@ public interface AcknowledgmentsGroupingTracker extends
AutoCloseable {
boolean isDuplicate(MessageId messageId);
- CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType
ackType, Map<String, Long> properties);
+ CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType
ackType, Map<String, Long> properties);
CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
AckType ackType,
Map<String, Long>
properties);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
deleted file mode 100644
index 1c9b66fd2ba..00000000000
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import java.util.BitSet;
-
-public class BatchMessageAcker {
-
- private BatchMessageAcker() {
- this.bitSet = new BitSet();
- this.batchSize = 0;
- }
-
- static BatchMessageAcker newAcker(int batchSize) {
- BitSet bitSet = new BitSet(batchSize);
- bitSet.set(0, batchSize);
- return new BatchMessageAcker(bitSet, batchSize);
- }
-
- // Use the param bitSet as the BatchMessageAcker's bitSet, don't care
about the batchSize.
- static BatchMessageAcker newAcker(BitSet bitSet) {
- return new BatchMessageAcker(bitSet, -1);
- }
-
- // bitset shared across messages in the same batch.
- private final int batchSize;
- private final BitSet bitSet;
- private volatile boolean prevBatchCumulativelyAcked = false;
-
- BatchMessageAcker(BitSet bitSet, int batchSize) {
- this.bitSet = bitSet;
- this.batchSize = batchSize;
- }
-
- BitSet getBitSet() {
- return bitSet;
- }
-
- public synchronized int getBatchSize() {
- return batchSize;
- }
-
- public synchronized boolean ackIndividual(int batchIndex) {
- bitSet.clear(batchIndex);
- return bitSet.isEmpty();
- }
-
- public synchronized int getBitSetSize() {
- return bitSet.size();
- }
-
- public synchronized boolean ackCumulative(int batchIndex) {
- // +1 since to argument is exclusive
- bitSet.clear(0, batchIndex + 1);
- return bitSet.isEmpty();
- }
-
- // debug purpose
- public synchronized int getOutstandingAcks() {
- return bitSet.cardinality();
- }
-
- public void setPrevBatchCumulativelyAcked(boolean acked) {
- this.prevBatchCumulativelyAcked = acked;
- }
-
- public boolean isPrevBatchCumulativelyAcked() {
- return prevBatchCumulativelyAcked;
- }
-
- @Override
- public String toString() {
- return "BatchMessageAcker{"
- + "batchSize=" + batchSize
- + ", bitSet=" + bitSet
- + ", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked
- + '}';
- }
-}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
deleted file mode 100644
index b70c928b296..00000000000
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import java.util.BitSet;
-
-class BatchMessageAckerDisabled extends BatchMessageAcker {
-
- static final BatchMessageAckerDisabled INSTANCE = new
BatchMessageAckerDisabled();
-
- private BatchMessageAckerDisabled() {
- super(new BitSet(), 0);
- }
-
- @Override
- public synchronized int getBatchSize() {
- return 0;
- }
-
- @Override
- public boolean ackIndividual(int batchIndex) {
- return true;
- }
-
- @Override
- public boolean ackCumulative(int batchIndex) {
- return true;
- }
-
- @Override
- public int getOutstandingAcks() {
- return 0;
- }
-}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index ed28082ff6a..e9cddeb65d7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -18,20 +18,16 @@
*/
package org.apache.pulsar.client.impl;
-import javax.annotation.Nonnull;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.TopicMessageId;
+import java.util.BitSet;
+import org.apache.pulsar.client.api.MessageIdAdv;
-/**
- */
public class BatchMessageIdImpl extends MessageIdImpl {
private static final long serialVersionUID = 1L;
- static final int NO_BATCH = -1;
private final int batchIndex;
private final int batchSize;
- private final transient BatchMessageAcker acker;
+ private final BitSet ackSet;
// Private constructor used only for json deserialization
@SuppressWarnings("unused")
@@ -40,59 +36,35 @@ public class BatchMessageIdImpl extends MessageIdImpl {
}
public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex,
int batchIndex) {
- this(ledgerId, entryId, partitionIndex, batchIndex, 0,
BatchMessageAckerDisabled.INSTANCE);
+ this(ledgerId, entryId, partitionIndex, batchIndex, 0, null);
}
public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex,
int batchIndex, int batchSize,
- BatchMessageAcker acker) {
+ BitSet ackSet) {
super(ledgerId, entryId, partitionIndex);
this.batchIndex = batchIndex;
this.batchSize = batchSize;
- this.acker = acker;
+ this.ackSet = ackSet;
}
- public BatchMessageIdImpl(MessageIdImpl other) {
- super(other.ledgerId, other.entryId, other.partitionIndex);
- if (other instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
- this.batchIndex = otherId.batchIndex;
- this.batchSize = otherId.batchSize;
- this.acker = otherId.acker;
- } else {
- this.batchIndex = NO_BATCH;
- this.batchSize = 0;
- this.acker = BatchMessageAckerDisabled.INSTANCE;
- }
+ public BatchMessageIdImpl(MessageIdAdv other) {
+ this(other.getLedgerId(), other.getEntryId(),
other.getPartitionIndex(),
+ other.getBatchIndex(), other.getBatchSize(),
other.getAckSet());
}
+ @Override
public int getBatchIndex() {
return batchIndex;
}
- @Override
- public int compareTo(@Nonnull MessageId o) {
- if (o instanceof MessageIdImpl) {
- MessageIdImpl other = (MessageIdImpl) o;
- int batchIndex = (o instanceof BatchMessageIdImpl) ?
((BatchMessageIdImpl) o).batchIndex : NO_BATCH;
- return messageIdCompare(
- this.ledgerId, this.entryId, this.partitionIndex,
this.batchIndex,
- other.ledgerId, other.entryId, other.partitionIndex, batchIndex
- );
- } else if (o instanceof TopicMessageId) {
- return compareTo(MessageIdImpl.convertToMessageIdImpl(o));
- } else {
- throw new UnsupportedOperationException("Unknown MessageId type: "
+ o.getClass().getName());
- }
- }
-
@Override
public int hashCode() {
- return messageIdHashCode(ledgerId, entryId, partitionIndex,
batchIndex);
+ return MessageIdAdvUtils.hashCode(this);
}
@Override
public boolean equals(Object o) {
- return super.equals(o);
+ return MessageIdAdvUtils.equals(this, o);
}
@Override
@@ -106,39 +78,51 @@ public class BatchMessageIdImpl extends MessageIdImpl {
return toByteArray(batchIndex, batchSize);
}
+ @Deprecated
public boolean ackIndividual() {
- return acker.ackIndividual(batchIndex);
+ return MessageIdAdvUtils.acknowledge(this, true);
}
+ @Deprecated
public boolean ackCumulative() {
- return acker.ackCumulative(batchIndex);
+ return MessageIdAdvUtils.acknowledge(this, false);
}
+ @Deprecated
public int getOutstandingAcksInSameBatch() {
- return acker.getOutstandingAcks();
+ return 0;
}
+ @Override
public int getBatchSize() {
- return acker.getBatchSize();
+ return batchSize;
}
+ @Deprecated
public int getOriginalBatchSize() {
return this.batchSize;
}
+ @Deprecated
public MessageIdImpl prevBatchMessageId() {
- return new MessageIdImpl(
- ledgerId, entryId - 1, partitionIndex);
+ return (MessageIdImpl) MessageIdAdvUtils.prevMessageId(this);
}
// MessageIdImpl is widely used as the key of a hash map, in this case, we
should convert the batch message id to
// have the correct hash code.
+ @Deprecated
public MessageIdImpl toMessageIdImpl() {
- return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+ return (MessageIdImpl) MessageIdAdvUtils.discardBatch(this);
}
- public BatchMessageAcker getAcker() {
- return acker;
+ @Override
+ public BitSet getAckSet() {
+ return ackSet;
}
+ static BitSet newAckSet(int batchSize) {
+ final BitSet ackSet = new BitSet(batchSize);
+ ackSet.set(0, batchSize);
+ return ackSet;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
index 28d5047c8ef..29ce160442a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.client.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Objects;
-import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.common.api.proto.MessageIdData;
-public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {
+public class ChunkMessageIdImpl extends MessageIdImpl {
private final MessageIdImpl firstChunkMsgId;
public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl
lastChunkMsgId) {
@@ -32,11 +32,12 @@ public class ChunkMessageIdImpl extends MessageIdImpl
implements MessageId {
this.firstChunkMsgId = firstChunkMsgId;
}
- public MessageIdImpl getFirstChunkMessageId() {
+ @Override
+ public MessageIdAdv getFirstChunkMessageId() {
return firstChunkMsgId;
}
- public MessageIdImpl getLastChunkMessageId() {
+ public MessageIdAdv getLastChunkMessageId() {
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 75d3b2edf6e..973b3302f41 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -82,7 +83,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
- protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap;
+ protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives;
protected final int maxReceiverQueueSize;
private volatile int currentReceiverQueueSize;
@@ -128,7 +129,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
this.unAckedChunkedMessageIdSequenceMap =
- ConcurrentOpenHashMap.<MessageIdImpl,
MessageIdImpl[]>newBuilder().build();
+ ConcurrentOpenHashMap.<MessageIdAdv,
MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.externalPinnedExecutor = executorProvider.getExecutor();
this.internalPinnedExecutor = client.getInternalExecutorService();
@@ -223,14 +224,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
}
- protected MessageId normalizeMessageId(MessageId messageId) {
- if (messageId instanceof BatchMessageIdImpl) {
- // do not add each item in batch message into tracker
- return ((BatchMessageIdImpl) messageId).toMessageIdImpl();
- }
- return messageId;
- }
-
protected void reduceCurrentReceiverQueueSize() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
@@ -1131,7 +1124,7 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
? ((TopicMessageImpl<T>) msg).getMessage() :
msg));
MessageId id;
if (this instanceof ConsumerImpl) {
- id = normalizeMessageId(msg.getMessageId());
+ id = MessageIdAdvUtils.discardBatch(msg.getMessageId());
} else {
id = msg.getMessageId();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index beaa34bf205..1feef6ca0a6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -35,6 +35,7 @@ import io.netty.util.Timeout;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -71,6 +72,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -154,12 +156,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Getter(AccessLevel.PACKAGE)
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
- private volatile BatchMessageIdImpl startMessageId;
+ private volatile MessageIdAdv startMessageId;
- private volatile BatchMessageIdImpl seekMessageId;
+ private volatile MessageIdAdv seekMessageId;
private final AtomicBoolean duringSeek;
- private final BatchMessageIdImpl initialStartMessageId;
+ private final MessageIdAdv initialStartMessageId;
private final long startMessageRollbackDurationInSec;
@@ -178,7 +180,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final TopicName topicName;
private final String topicNameWithoutPartition;
- private final Map<MessageIdImpl, List<MessageImpl<T>>>
possibleSendToDeadLetterTopicMessages;
+ private final Map<MessageIdAdv, List<MessageImpl<T>>>
possibleSendToDeadLetterTopicMessages;
private final DeadLetterPolicy deadLetterPolicy;
@@ -258,12 +260,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.consumerId = client.newConsumerId();
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
- if (startMessageId instanceof ChunkMessageIdImpl) {
- this.startMessageId = new BatchMessageIdImpl(
- ((ChunkMessageIdImpl)
startMessageId).getFirstChunkMessageId());
- } else {
- this.startMessageId = new BatchMessageIdImpl((MessageIdImpl)
startMessageId);
- }
+ MessageIdAdv firstChunkMessageId = ((MessageIdAdv)
startMessageId).getFirstChunkMessageId();
+ this.startMessageId = (firstChunkMessageId == null) ?
(MessageIdAdv) startMessageId : firstChunkMessageId;
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec =
startMessageRollbackDurationInSec;
@@ -535,7 +533,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String, Long>
properties,
TransactionImpl txn) {
- messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
@@ -551,16 +548,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return doTransactionAcknowledgeForResponse(messageId, ackType,
null, properties,
new TxnID(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits()));
}
- return
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId,
ackType, properties);
+ return acknowledgmentsGroupingTracker.addAcknowledgment(messageId,
ackType, properties);
}
@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId>
messageIdList, AckType ackType,
Map<String, Long>
properties, TransactionImpl txn) {
-
- for (MessageId messageId : messageIdList) {
- checkArgument(messageId instanceof MessageIdImpl);
- }
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
@@ -572,7 +565,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return FutureUtil.failedFuture(exception);
}
if (txn != null) {
- return doTransactionAcknowledgeForResponse(messageIdList, ackType,
null,
+ return doTransactionAcknowledgeForResponse(messageIdList, ackType,
properties, new TxnID(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits()));
} else {
return
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList,
ackType, properties);
@@ -592,7 +585,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.InvalidMessageException("Cannot handle message with null
messageId"));
}
- messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
@@ -922,7 +914,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
* Clear the internal receiver queue and returns the message id of what
was the 1st message in the queue that was
* not seen by the application.
*/
- private BatchMessageIdImpl clearReceiverQueue() {
+ private MessageIdAdv clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new
ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
resetIncomingMessageSize();
@@ -934,17 +926,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
if (!currentMessageQueue.isEmpty()) {
- MessageIdImpl nextMessageInQueue = (MessageIdImpl)
currentMessageQueue.get(0).getMessageId();
- BatchMessageIdImpl previousMessage;
- if (nextMessageInQueue instanceof BatchMessageIdImpl) {
+ MessageIdAdv nextMessageInQueue = (MessageIdAdv)
currentMessageQueue.get(0).getMessageId();
+ MessageIdAdv previousMessage;
+ if (MessageIdAdvUtils.isBatch(nextMessageInQueue)) {
// Get on the previous message within the current batch
previousMessage = new
BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
nextMessageInQueue.getEntryId(),
nextMessageInQueue.getPartitionIndex(),
- ((BatchMessageIdImpl)
nextMessageInQueue).getBatchIndex() - 1);
+ nextMessageInQueue.getBatchIndex() - 1);
} else {
// Get on previous message in previous entry
- previousMessage = new
BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
- nextMessageInQueue.getEntryId() - 1,
nextMessageInQueue.getPartitionIndex(), -1);
+ previousMessage =
MessageIdAdvUtils.prevMessageId(nextMessageInQueue);
}
// release messages if they are pooled messages
currentMessageQueue.forEach(Message::release);
@@ -1126,7 +1117,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final Schema<V> schema,
final boolean
containMetadata,
final BitSetRecyclable
ackBitSet,
- final BatchMessageAcker
acker,
+ final BitSet
ackSetInMessageId,
final int redeliveryCount,
final long consumerEpoch) {
if (log.isDebugEnabled()) {
@@ -1161,7 +1152,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
BatchMessageIdImpl batchMessageIdImpl = new
BatchMessageIdImpl(messageId.getLedgerId(),
- messageId.getEntryId(), getPartitionIndex(), index,
numMessages, acker);
+ messageId.getEntryId(), getPartitionIndex(), index,
numMessages, ackSetInMessageId);
final ByteBuf payloadBuffer = (singleMessagePayload != null) ?
singleMessagePayload : payload;
final MessageImpl<V> message =
MessageImpl.create(topicName.toString(), batchMessageIdImpl,
@@ -1525,7 +1516,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
possibleToDeadLetter = new ArrayList<>();
}
- BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
+ BitSet ackSetInMessageId = BatchMessageIdImpl.newAckSet(batchSize);
BitSetRecyclable ackBitSet = null;
if (ackSet != null && ackSet.size() > 0) {
ackBitSet =
BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
@@ -1537,7 +1528,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
for (int i = 0; i < batchSize; ++i) {
final MessageImpl<T> message = newSingleMessage(i, batchSize,
brokerEntryMetadata, msgMetadata,
singleMessageMetadata, uncompressedPayload,
batchMessage, schema, true,
- ackBitSet, acker, redeliveryCount, consumerEpoch);
+ ackBitSet, ackSetInMessageId, redeliveryCount,
consumerEpoch);
if (message == null) {
skippedMessages++;
continue;
@@ -1634,7 +1625,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
protected void trackMessage(MessageId messageId, int redeliveryCount) {
if (conf.getAckTimeoutMillis() > 0 && messageId instanceof
MessageIdImpl) {
- MessageId id = normalizeMessageId(messageId);
+ MessageId id = MessageIdAdvUtils.discardBatch(messageId);
if (hasParentConsumer) {
//TODO: check parent consumer here
// we should no longer track this message, TopicsConsumer will
take care from now onwards
@@ -1931,8 +1922,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return;
}
- checkArgument(messageIds.stream().findFirst().get() instanceof
MessageIdImpl);
-
if (conf.getSubscriptionType() != SubscriptionType.Shared
&& conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
// We cannot redeliver single messages if subscription type is not
Shared
@@ -1942,11 +1931,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ClientCnx cnx = cnx();
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v2.getValue()) {
int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
- Iterable<List<MessageIdImpl>> batches = Iterables.partition(
- messageIds.stream()
- .map(messageId -> (MessageIdImpl) messageId)
- .collect(Collectors.toSet()),
MAX_REDELIVER_UNACKNOWLEDGED);
- batches.forEach(ids -> {
+ Iterables.partition(messageIds,
MAX_REDELIVER_UNACKNOWLEDGED).forEach(ids -> {
getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
if (!messageIdData.isEmpty()) {
ByteBuf cmd =
Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
@@ -1985,11 +1970,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
notifyPendingBatchReceivedCallBack(op.future);
}
- private CompletableFuture<List<MessageIdData>>
getRedeliveryMessageIdData(List<MessageIdImpl> messageIds) {
+ private CompletableFuture<List<MessageIdData>>
getRedeliveryMessageIdData(List<MessageId> messageIds) {
if (messageIds == null || messageIds.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
- List<CompletableFuture<MessageIdData>> futures =
messageIds.stream().map(messageId -> {
+ List<CompletableFuture<MessageIdData>> futures =
messageIds.stream().map(originalMessageId -> {
+ final MessageIdAdv messageId = (MessageIdAdv) originalMessageId;
CompletableFuture<Boolean> future =
processPossibleToDLQ(messageId);
return future.thenApply(sendToDLQ -> {
if (!sendToDLQ) {
@@ -2005,20 +1991,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
}
- private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl
messageId) {
+ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv
messageId) {
List<MessageImpl<T>> deadLetterMessages = null;
if (possibleSendToDeadLetterTopicMessages != null) {
- if (messageId instanceof BatchMessageIdImpl) {
- messageId = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(),
- getPartitionIndex());
- }
- deadLetterMessages =
possibleSendToDeadLetterTopicMessages.get(messageId);
+ deadLetterMessages =
possibleSendToDeadLetterTopicMessages.get(MessageIdAdvUtils.discardBatch(messageId));
}
CompletableFuture<Boolean> result = new CompletableFuture<>();
if (deadLetterMessages != null) {
initDeadLetterProducerIfNeeded();
List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
- MessageIdImpl finalMessageId = messageId;
deadLetterProducer.thenAcceptAsync(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
String originMessageIdStr =
message.getMessageId().toString();
@@ -2032,12 +2013,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
-
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
-
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
+
possibleSendToDeadLetterTopicMessages.remove(messageId);
+ acknowledgeAsync(messageId).whenComplete((v,
ex) -> {
if (ex != null) {
log.warn("[{}] [{}] [{}] Failed to
acknowledge the message {} of the original"
+ " topic but send to
the DLQ successfully.",
- topicName, subscription,
consumerName, finalMessageId, ex);
+ topicName, subscription,
consumerName, messageId, ex);
result.complete(false);
} else {
result.complete(true);
@@ -2047,11 +2028,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (ex instanceof
PulsarClientException.ProducerQueueIsFullError) {
log.warn("[{}] [{}] [{}] Failed to send
DLQ message to {} for message id {}: {}",
topicName, subscription,
consumerName,
-
deadLetterPolicy.getDeadLetterTopic(), finalMessageId, ex.getMessage());
+
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
} else {
log.warn("[{}] [{}] [{}] Failed to send
DLQ message to {} for message id {}",
topicName, subscription,
consumerName,
-
deadLetterPolicy.getDeadLetterTopic(), finalMessageId, ex);
+
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
}
result.complete(false);
return null;
@@ -2154,8 +2135,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
- BatchMessageIdImpl originSeekMessageId = seekMessageId;
- seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
+ MessageIdAdv originSeekMessageId = seekMessageId;
+ seekMessageId = (MessageIdAdv) seekId;
duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription,
seekBy);
@@ -2193,29 +2174,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public CompletableFuture<Void> seekAsync(MessageId originalMessageId) {
- final MessageIdImpl messageId =
MessageIdImpl.convertToMessageIdImpl(originalMessageId);
+ public CompletableFuture<Void> seekAsync(MessageId messageId) {
String seekBy = String.format("the message %s", messageId.toString());
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
- ByteBuf seek = null;
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
- // Initialize ack set
- BitSetRecyclable ackSet = BitSetRecyclable.create();
- ackSet.set(0, msgId.getBatchSize());
- ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
- long[] ackSetArr = ackSet.toLongArray();
- ackSet.recycle();
-
- seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
- } else if (messageId instanceof ChunkMessageIdImpl) {
- ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId;
- seek = Commands.newSeek(consumerId, requestId,
msgId.getFirstChunkMessageId().getLedgerId(),
- msgId.getFirstChunkMessageId().getEntryId(), new
long[0]);
+ final MessageIdAdv msgId = (MessageIdAdv) messageId;
+ final MessageIdAdv firstChunkMsgId =
msgId.getFirstChunkMessageId();
+ final ByteBuf seek;
+ if (msgId.getFirstChunkMessageId() != null) {
+ seek = Commands.newSeek(consumerId, requestId,
firstChunkMsgId.getLedgerId(),
+ firstChunkMsgId.getEntryId(), new long[0]);
} else {
- seek = Commands.newSeek(
- consumerId, requestId, messageId.getLedgerId(),
messageId.getEntryId(), new long[0]);
+ final long[] ackSetArr;
+ if (MessageIdAdvUtils.isBatch(msgId)) {
+ final BitSetRecyclable ackSet = BitSetRecyclable.create();
+ ackSet.set(0, msgId.getBatchSize());
+ ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+ ackSetArr = ackSet.toLongArray();
+ ackSet.recycle();
+ } else {
+ ackSetArr = new long[0];
+ }
+ seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
});
@@ -2247,9 +2227,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
future.thenAccept(response -> {
- MessageIdImpl lastMessageId =
MessageIdImpl.convertToMessageIdImpl(response.lastMessageId);
- MessageIdImpl markDeletePosition = MessageIdImpl
-
.convertToMessageIdImpl(response.markDeletePosition);
+ MessageIdAdv lastMessageId = (MessageIdAdv)
response.lastMessageId;
+ MessageIdAdv markDeletePosition = (MessageIdAdv)
response.markDeletePosition;
if (markDeletePosition != null &&
!(markDeletePosition.getEntryId() < 0
&& markDeletePosition.getLedgerId() >
lastMessageId.getLedgerId())) {
@@ -2434,16 +2413,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
- private MessageIdImpl getMessageIdImpl(Message<?> msg) {
- MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
- if (messageId instanceof BatchMessageIdImpl) {
- // messageIds contain MessageIdImpl, not BatchMessageIdImpl
- messageId = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex());
- }
- return messageId;
- }
-
-
private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
return (msgMetadata.getEncryptionKeysCount() > 0 &&
conf.getCryptoKeyReader() == null
&& conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME);
@@ -2486,7 +2455,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
- MessageIdImpl messageId = getMessageIdImpl(peek);
+ MessageIdAdv messageId =
MessageIdAdvUtils.discardBatch(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in
queue.
return 0;
@@ -2497,7 +2466,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
- MessageIdImpl id = getMessageIdImpl(message);
+ MessageIdAdv id =
MessageIdAdvUtils.discardBatch(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
@@ -2691,33 +2660,26 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
- BitSetRecyclable bitSetRecyclable = null;
- long ledgerId;
- long entryId;
- ByteBuf cmd;
long requestId = client.newRequestId();
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- bitSetRecyclable = BitSetRecyclable.create();
- ledgerId = batchMessageId.getLedgerId();
- entryId = batchMessageId.getEntryId();
+ final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ final long ledgerId = messageIdAdv.getLedgerId();
+ final long entryId = messageIdAdv.getEntryId();
+ final ByteBuf cmd;
+ if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
+ BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+ bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
- batchMessageId.ackCumulative();
- bitSetRecyclable.set(0, batchMessageId.getBatchSize());
- bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+ MessageIdAdvUtils.acknowledge(messageIdAdv, false);
+ bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
} else {
- bitSetRecyclable.set(0, batchMessageId.getBatchSize());
- bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+ bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
cmd = Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType, validationError, properties,
- txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId, batchMessageId.getBatchSize());
+ txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId, messageIdAdv.getBatchSize());
bitSetRecyclable.recycle();
} else {
- MessageIdImpl singleMessage = (MessageIdImpl) messageId;
- ledgerId = singleMessage.getLedgerId();
- entryId = singleMessage.getEntryId();
- cmd = Commands.newAck(consumerId, ledgerId, entryId,
bitSetRecyclable, ackType,
- validationError, properties, txnID.getLeastSigBits(),
txnID.getMostSigBits(), requestId);
+ cmd = Commands.newAck(consumerId, ledgerId, entryId, null,
ackType, validationError, properties,
+ txnID.getLeastSigBits(), txnID.getMostSigBits(),
requestId);
}
if (ackType == AckType.Cumulative) {
@@ -2736,58 +2698,42 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private CompletableFuture<Void>
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
-
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
- BitSetRecyclable bitSetRecyclable = null;
- long ledgerId;
- long entryId;
- ByteBuf cmd;
long requestId = client.newRequestId();
List<MessageIdData> messageIdDataList = new LinkedList<>();
for (MessageId messageId : messageIds) {
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- bitSetRecyclable = BitSetRecyclable.create();
+ final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ final MessageIdData messageIdData = new MessageIdData();
+ messageIdData.setLedgerId(messageIdAdv.getLedgerId());
+ messageIdData.setEntryId(messageIdAdv.getEntryId());
+ if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
+ final BitSetRecyclable bitSetRecyclable =
BitSetRecyclable.create();
+ bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
- batchMessageId.ackCumulative();
- bitSetRecyclable.set(0, batchMessageId.getBatchSize());
- bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() +
1);
+ MessageIdAdvUtils.acknowledge(messageIdAdv, false);
+ bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() +
1);
} else {
- bitSetRecyclable.set(0, batchMessageId.getBatchSize());
- bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+ bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
- MessageIdData messageIdData = new MessageIdData();
- messageIdData.setLedgerId(batchMessageId.getLedgerId());
- messageIdData.setEntryId(batchMessageId.getEntryId());
- messageIdData.setBatchSize(batchMessageId.getBatchSize());
- long[] as = bitSetRecyclable.toLongArray();
- for (int i = 0; i < as.length; i++) {
- messageIdData.addAckSet(as[i]);
+ for (long x : bitSetRecyclable.toLongArray()) {
+ messageIdData.addAckSet(x);
}
bitSetRecyclable.recycle();
- messageIdDataList.add(messageIdData);
- } else {
- MessageIdImpl singleMessage = (MessageIdImpl) messageId;
- ledgerId = singleMessage.getLedgerId();
- entryId = singleMessage.getEntryId();
- MessageIdData messageIdData = new MessageIdData();
- messageIdData.setLedgerId(ledgerId);
- messageIdData.setEntryId(entryId);
- messageIdDataList.add(messageIdData);
}
+ messageIdDataList.add(messageIdData);
if (ackType == AckType.Cumulative) {
unAckedMessageTracker.removeMessagesTill(messageId);
} else {
unAckedMessageTracker.remove(messageId);
}
}
- cmd = Commands.newAck(consumerId, messageIdDataList, ackType,
validationError, properties,
+ final ByteBuf cmd = Commands.newAck(consumerId, messageIdDataList,
ackType, null, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
return cnx().newAckForReceipt(cmd, requestId);
}
- public Map<MessageIdImpl, List<MessageImpl<T>>>
getPossibleSendToDeadLetterTopicMessages() {
+ public Map<MessageIdAdv, List<MessageImpl<T>>>
getPossibleSendToDeadLetterTopicMessages() {
return possibleSendToDeadLetterTopicMessages;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
new file mode 100644
index 00000000000..c8b18524ec0
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.BitSet;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+
+public class MessageIdAdvUtils {
+
+ static int hashCode(MessageIdAdv msgId) {
+ return (int) (31 * (msgId.getLedgerId() + 31 * msgId.getEntryId())
+ + (31 * (long) msgId.getPartitionIndex()) +
msgId.getBatchIndex());
+ }
+
+ static boolean equals(MessageIdAdv lhs, Object o) {
+ if (!(o instanceof MessageIdAdv)) {
+ return false;
+ }
+ final MessageIdAdv rhs = (MessageIdAdv) o;
+ return lhs.getLedgerId() == rhs.getLedgerId()
+ && lhs.getEntryId() == rhs.getEntryId()
+ && lhs.getPartitionIndex() == rhs.getPartitionIndex()
+ && lhs.getBatchIndex() == rhs.getBatchIndex();
+ }
+
+ static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
+ if (!isBatch(msgId)) {
+ return true;
+ }
+ final BitSet ackSet = msgId.getAckSet();
+ if (ackSet == null) {
+ // The internal MessageId implementation should never reach here.
If users have implemented their own
+ // MessageId and getAckSet() is not override, return false to
avoid acknowledge current entry.
+ return false;
+ }
+ int batchIndex = msgId.getBatchIndex();
+ if (individual) {
+ ackSet.clear(batchIndex);
+ } else {
+ ackSet.clear(0, batchIndex + 1);
+ }
+ return ackSet.isEmpty();
+ }
+
+ static boolean isBatch(MessageIdAdv msgId) {
+ return msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0;
+ }
+
+ static MessageIdAdv discardBatch(MessageId messageId) {
+ MessageIdAdv msgId = (MessageIdAdv) messageId;
+ return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(),
msgId.getPartitionIndex());
+ }
+
+ static MessageIdAdv prevMessageId(MessageIdAdv msgId) {
+ return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId() - 1,
msgId.getPartitionIndex());
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 1a0f491a6a7..83ee7625783 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -18,21 +18,17 @@
*/
package org.apache.pulsar.client.impl;
-import static org.apache.pulsar.client.impl.BatchMessageIdImpl.NO_BATCH;
-import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.util.Objects;
-import javax.annotation.Nonnull;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.common.api.proto.MessageIdData;
-import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.TopicName;
-public class MessageIdImpl implements MessageId {
+public class MessageIdImpl implements MessageIdAdv {
protected final long ledgerId;
protected final long entryId;
protected final int partitionIndex;
@@ -49,28 +45,29 @@ public class MessageIdImpl implements MessageId {
this.partitionIndex = partitionIndex;
}
+ @Override
public long getLedgerId() {
return ledgerId;
}
+ @Override
public long getEntryId() {
return entryId;
}
+ @Override
public int getPartitionIndex() {
return partitionIndex;
}
@Override
public int hashCode() {
- return messageIdHashCode(ledgerId, entryId, partitionIndex, NO_BATCH);
+ return MessageIdAdvUtils.hashCode(this);
}
@Override
public boolean equals(Object o) {
- return (o instanceof MessageId)
- && !(o instanceof MultiMessageIdImpl)
- && (compareTo((MessageId) o) == 0);
+ return MessageIdAdvUtils.equals(this, o);
}
@Override
@@ -100,7 +97,7 @@ public class MessageIdImpl implements MessageId {
if (idData.hasBatchIndex()) {
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
- idData.getBatchIndex(), idData.getBatchSize(),
BatchMessageAcker.newAcker(idData.getBatchSize()));
+ idData.getBatchIndex(), idData.getBatchSize(),
BatchMessageIdImpl.newAckSet(idData.getBatchSize()));
} else {
messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex());
@@ -118,22 +115,6 @@ public class MessageIdImpl implements MessageId {
return messageId;
}
- @InterfaceStability.Unstable
- public static MessageIdImpl convertToMessageIdImpl(MessageId messageId) {
- if (messageId instanceof TopicMessageId) {
- if (messageId instanceof TopicMessageIdImpl) {
- return (MessageIdImpl) ((TopicMessageIdImpl)
messageId).getInnerMessageId();
- } else {
- try {
- return (MessageIdImpl)
MessageId.fromByteArray(messageId.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- return (MessageIdImpl) messageId;
- }
-
public static MessageId fromByteArrayWithTopic(byte[] data, String
topicName) throws IOException {
return fromByteArrayWithTopic(data, TopicName.get(topicName));
}
@@ -152,10 +133,10 @@ public class MessageIdImpl implements MessageId {
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(),
- BatchMessageAcker.newAcker(idData.getBatchSize()));
+ BatchMessageIdImpl.newAckSet(idData.getBatchSize()));
} else {
messageId = new BatchMessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition(),
- idData.getBatchIndex(), 0,
BatchMessageAckerDisabled.INSTANCE);
+ idData.getBatchIndex(), 0, null);
}
} else {
messageId = new MessageIdImpl(idData.getLedgerId(),
idData.getEntryId(), idData.getPartition());
@@ -207,36 +188,4 @@ public class MessageIdImpl implements MessageId {
// there is no message batch so we pass -1
return toByteArray(-1, 0);
}
-
- @Override
- public int compareTo(@Nonnull MessageId o) {
- if (o instanceof MessageIdImpl) {
- MessageIdImpl other = (MessageIdImpl) o;
- int batchIndex = (o instanceof BatchMessageIdImpl) ?
((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH;
- return messageIdCompare(
- this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH,
- other.ledgerId, other.entryId, other.partitionIndex, batchIndex
- );
- } else if (o instanceof TopicMessageId) {
- return compareTo(convertToMessageIdImpl(o));
- } else {
- throw new UnsupportedOperationException("Unknown MessageId type: "
+ o.getClass().getName());
- }
- }
-
- static int messageIdHashCode(long ledgerId, long entryId, int
partitionIndex, int batchIndex) {
- return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long)
partitionIndex) + batchIndex);
- }
-
- static int messageIdCompare(
- long ledgerId1, long entryId1, int partitionIndex1, int batchIndex1,
- long ledgerId2, long entryId2, int partitionIndex2, int batchIndex2
- ) {
- return ComparisonChain.start()
- .compare(ledgerId1, ledgerId2)
- .compare(entryId1, entryId2)
- .compare(partitionIndex1, partitionIndex2)
- .compare(batchIndex1, batchIndex2)
- .result();
- }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 0b6fb608ee6..d369d639a73 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
@@ -714,9 +715,10 @@ public class MessageImpl<T> implements Message<T> {
@Override
public Optional<Long> getIndex() {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
- if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof
BatchMessageIdImpl) {
- int batchSize = ((BatchMessageIdImpl)
messageId).getBatchSize();
- int batchIndex = ((BatchMessageIdImpl)
messageId).getBatchIndex();
+ MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ if (msgMetadata.hasNumMessagesInBatch() &&
MessageIdAdvUtils.isBatch(messageIdAdv)) {
+ int batchSize = messageIdAdv.getBatchSize();
+ int batchIndex = messageIdAdv.getBatchIndex();
return Optional.of(brokerEntryMetadata.getIndex() - batchSize
+ batchIndex + 1);
}
return Optional.of(brokerEntryMetadata.getIndex());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
index dcae86bd01a..f4c9aa27074 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
+import java.util.BitSet;
import java.util.List;
import lombok.NonNull;
import org.apache.pulsar.client.api.Message;
@@ -50,7 +51,7 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
private MessageIdImpl messageId;
private ConsumerImpl<?> consumer;
private int redeliveryCount;
- private BatchMessageAcker acker;
+ private BitSet ackSetInMessageId;
private BitSetRecyclable ackBitSet;
private long consumerEpoch;
@@ -73,7 +74,7 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
context.messageId = messageId;
context.consumer = consumer;
context.redeliveryCount = redeliveryCount;
- context.acker = BatchMessageAcker.newAcker(context.getNumMessages());
+ context.ackSetInMessageId =
BatchMessageIdImpl.newAckSet(context.getNumMessages());
context.ackBitSet = (ackSet != null && ackSet.size() > 0)
?
BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet))
: null;
@@ -88,7 +89,7 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
consumer = null;
redeliveryCount = 0;
consumerEpoch = DEFAULT_CONSUMER_EPOCH;
- acker = null;
+ ackSetInMessageId = null;
if (ackBitSet != null) {
ackBitSet.recycle();
ackBitSet = null;
@@ -134,7 +135,7 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
schema,
containMetadata,
ackBitSet,
- acker,
+ ackSetInMessageId,
redeliveryCount,
consumerEpoch);
} finally {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f993304b078..5fe0e4a82b8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
@@ -100,7 +101,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private final MultiTopicConsumerStatsRecorderImpl stats;
private final ConsumerConfigurationData<T> internalConfig;
- private volatile BatchMessageIdImpl startMessageId = null;
+ private volatile MessageIdAdv startMessageId;
private final long startMessageRollbackDurationInSec;
MultiTopicsConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>>
subscribeFuture, Schema<T> schema,
@@ -139,9 +140,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.allTopicPartitionsNumber = new AtomicInteger(0);
- this.startMessageId = startMessageId != null
- ? new
BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
- : null;
+ this.startMessageId = (MessageIdAdv) startMessageId;
this.startMessageRollbackDurationInSec =
startMessageRollbackDurationInSec;
this.paused = conf.isStartPaused();
@@ -454,18 +453,15 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
}
- TopicMessageId topicMessageId = (TopicMessageId) messageId;
- ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getOwnerTopic());
+ ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
if (consumer == null) {
return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
}
- MessageId innerMessageId =
MessageIdImpl.convertToMessageIdImpl(topicMessageId);
if (ackType == AckType.Cumulative) {
- return consumer.acknowledgeCumulativeAsync(innerMessageId);
+ return consumer.acknowledgeCumulativeAsync(messageId);
} else {
- return consumer.doAcknowledgeWithTxn(innerMessageId, ackType,
properties, txnImpl)
- .thenRun(() ->
- unAckedMessageTracker.remove(topicMessageId));
+ return consumer.doAcknowledgeWithTxn(messageId, ackType,
properties, txnImpl)
+ .thenRun(() -> unAckedMessageTracker.remove(messageId));
}
}
@@ -490,10 +486,9 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
Map<String, List<MessageId>> topicToMessageIdMap = new HashMap<>();
for (MessageId messageId : messageIdList) {
- TopicMessageId topicMessageId = (TopicMessageId) messageId;
-
topicToMessageIdMap.putIfAbsent(topicMessageId.getOwnerTopic(), new
ArrayList<>());
- topicToMessageIdMap.get(topicMessageId.getOwnerTopic())
-
.add(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
+ String ownerTopic = ((TopicMessageId)
messageId).getOwnerTopic();
+ topicToMessageIdMap.putIfAbsent(ownerTopic, new ArrayList<>());
+ topicToMessageIdMap.get(ownerTopic).add(messageId);
}
final Map<ConsumerImpl<T>, List<MessageId>> consumerToMessageIds =
new IdentityHashMap<>();
for (Map.Entry<String, List<MessageId>> entry :
topicToMessageIdMap.entrySet()) {
@@ -549,10 +544,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void negativeAcknowledge(MessageId messageId) {
checkArgument(messageId instanceof TopicMessageId);
- TopicMessageId topicMessageId = (TopicMessageId) messageId;
-
- ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getOwnerTopic());
-
consumer.negativeAcknowledge(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
+ ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
+ consumer.negativeAcknowledge(messageId);
}
@Override
@@ -705,12 +698,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return;
}
removeExpiredMessagesFromQueue(messageIds);
- messageIds.stream().map(messageId -> (TopicMessageId) messageId)
- .collect(Collectors.groupingBy(TopicMessageId::getOwnerTopic,
Collectors.toSet()))
- .forEach((topicName, messageIds1) ->
- consumers.get(topicName)
- .redeliverUnacknowledgedMessages(messageIds1.stream()
-
.map(MessageIdImpl::convertToMessageIdImpl).collect(Collectors.toSet())));
+ messageIds.stream()
+ .collect(Collectors.groupingBy(
+ msgId -> ((TopicMessageIdImpl) msgId).getOwnerTopic(),
Collectors.toSet()))
+ .forEach((topicName, messageIds1) ->
+
consumers.get(topicName).redeliverUnacknowledgedMessages(messageIds1));
resumeReceivingFromPausedConsumersIfNeeded();
}
@@ -1508,7 +1500,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
//only support earliest/latest
- return !MessageId.earliest.equals(messageId) &&
!MessageId.latest.equals(messageId);
+ return !messageId.equals(MessageId.earliest) &&
!messageId.equals(MessageId.latest);
}
public void tryAcknowledgeMessage(Message<T> msg) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 70d57db3bb6..37f58a02180 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -95,14 +95,6 @@ class NegativeAcksTracker implements Closeable {
}
private synchronized void add(MessageId messageId, int redeliveryCount) {
- messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
-
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(),
- batchMessageId.getPartitionIndex());
- }
-
if (nackedMessages == null) {
nackedMessages = new HashMap<>();
}
@@ -113,7 +105,7 @@ class NegativeAcksTracker implements Closeable {
} else {
backoffNs = nackDelayNanos;
}
- nackedMessages.put(messageId, System.nanoTime() + backoffNs);
+ nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId),
System.nanoTime() + backoffNs);
if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period.
Leave a small buffer to allow for
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index 32f8fb92230..e8951cd3d16 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -43,7 +43,7 @@ public class NonPersistentAcknowledgmentGroupingTracker
implements Acknowledgmen
return false;
}
- public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType, Map<String,
+ public CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType
ackType, Map<String,
Long> properties) {
// no-op
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index fef0bcb8906..9086ccc4ef0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -43,6 +44,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
@@ -79,8 +81,8 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
* This is a set of all the individual acks that the application has
issued and that were not already sent to
* broker.
*/
- private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
- private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>
pendingIndividualBatchIndexAcks;
+ private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
+ private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>
pendingIndividualBatchIndexAcks;
private final ScheduledFuture<?> scheduledTask;
private final boolean batchIndexAckEnabled;
@@ -113,18 +115,16 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
*/
@Override
public boolean isDuplicate(MessageId messageId) {
- if (!(messageId instanceof MessageIdImpl)) {
+ if (!(messageId instanceof MessageIdAdv)) {
throw new IllegalArgumentException("isDuplicated cannot accept "
+ messageId.getClass().getName() + ": " + messageId);
}
- if (lastCumulativeAck.compareTo(messageId) >= 0) {
+ final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ if (lastCumulativeAck.compareTo(messageIdAdv) >= 0) {
// Already included in a cumulative ack
return true;
} else {
- final MessageIdImpl messageIdImpl = (messageId instanceof
BatchMessageIdImpl)
- ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
- : (MessageIdImpl) messageId;
- return pendingIndividualAcks.contains(messageIdImpl);
+ return
pendingIndividualAcks.contains(MessageIdAdvUtils.discardBatch(messageIdAdv));
}
}
@@ -135,10 +135,10 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
if (consumer.isAckReceiptEnabled()) {
Set<CompletableFuture<Void>> completableFutureSet = new
HashSet<>();
messageIds.forEach(messageId ->
-
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType,
properties)));
+ completableFutureSet.add(addAcknowledgment(messageId,
ackType, properties)));
return FutureUtil.waitForAll(new
ArrayList<>(completableFutureSet));
} else {
- messageIds.forEach(messageId ->
addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+ messageIds.forEach(messageId -> addAcknowledgment(messageId,
ackType, properties));
return CompletableFuture.completedFuture(null);
}
} else {
@@ -162,46 +162,43 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
private void addListAcknowledgment(List<MessageId> messageIds) {
for (MessageId messageId : messageIds) {
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
- addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(),
- batchMessageId,
+ MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
+
addIndividualAcknowledgment(MessageIdAdvUtils.discardBatch(messageIdAdv),
+ messageIdAdv,
this::doIndividualAckAsync,
this::doIndividualBatchAckAsync);
- } else if (messageId instanceof MessageIdImpl) {
- addIndividualAcknowledgment((MessageIdImpl) messageId,
+ } else {
+ addIndividualAcknowledgment(messageIdAdv,
null,
this::doIndividualAckAsync,
this::doIndividualBatchAckAsync);
- } else {
- throw new IllegalStateException("Unsupported message id type
in addListAcknowledgement: "
- + messageId.getClass().getCanonicalName());
}
}
}
@Override
- public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType,
+ public CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType
ackType,
Map<String, Long>
properties) {
- if (msgId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
- return addAcknowledgment(batchMessageId.toMessageIdImpl(),
ackType, properties, batchMessageId);
+ MessageIdAdv msgIdAdv = (MessageIdAdv) msgId;
+ if (MessageIdAdvUtils.isBatch(msgIdAdv)) {
+ return addAcknowledgment(MessageIdAdvUtils.discardBatch(msgId),
ackType, properties, msgIdAdv);
} else {
- return addAcknowledgment(msgId, ackType, properties, null);
+ return addAcknowledgment(msgIdAdv, ackType, properties, null);
}
}
private CompletableFuture<Void> addIndividualAcknowledgment(
- MessageIdImpl msgId,
- @Nullable BatchMessageIdImpl batchMessageId,
- Function<MessageIdImpl, CompletableFuture<Void>>
individualAckFunction,
- Function<BatchMessageIdImpl, CompletableFuture<Void>>
batchAckFunction) {
+ MessageIdAdv msgId,
+ @Nullable MessageIdAdv batchMessageId,
+ Function<MessageIdAdv, CompletableFuture<Void>>
individualAckFunction,
+ Function<MessageIdAdv, CompletableFuture<Void>> batchAckFunction) {
if (batchMessageId != null) {
consumer.onAcknowledge(batchMessageId, null);
} else {
consumer.onAcknowledge(msgId, null);
}
- if (batchMessageId == null || batchMessageId.ackIndividual()) {
+ if (batchMessageId == null ||
MessageIdAdvUtils.acknowledge(batchMessageId, true)) {
consumer.getStats().incrementNumAcksSent((batchMessageId != null)
? batchMessageId.getBatchSize() : 1);
consumer.getUnAckedMessageTracker().remove(msgId);
if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
@@ -215,10 +212,10 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
+ private CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId,
AckType ackType,
Map<String, Long>
properties,
- @Nullable
BatchMessageIdImpl batchMessageId) {
+ @Nullable MessageIdAdv
batchMessageId) {
switch (ackType) {
case Individual:
return addIndividualAcknowledgment(msgId,
@@ -231,15 +228,12 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
- if (batchMessageId == null || batchMessageId.ackCumulative()) {
+ if (batchMessageId == null ||
MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
return doCumulativeAck(msgId, properties, null);
} else if (batchIndexAckEnabled) {
return doCumulativeBatchIndexAck(batchMessageId,
properties);
} else {
- if
(!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
- doCumulativeAck(batchMessageId.prevBatchMessageId(),
properties, null);
-
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
- }
+
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties,
null);
return CompletableFuture.completedFuture(null);
}
default:
@@ -247,7 +241,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId,
Map<String, Long> properties) {
+ private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId,
Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are
properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction
subscription.
@@ -267,13 +261,13 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
- private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl
messageId) {
+ private CompletableFuture<Void> doIndividualAckAsync(MessageIdAdv
messageId) {
pendingIndividualAcks.add(messageId);
pendingIndividualBatchIndexAcks.remove(messageId);
return CompletableFuture.completedFuture(null);
}
- private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl
batchMessageId,
+ private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv
batchMessageId,
Map<String, Long>
properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
return doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
@@ -283,7 +277,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl
batchMessageId) {
+ private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv
batchMessageId) {
Optional<Lock> readLock = acquireReadLock();
try {
doIndividualBatchAckAsync(batchMessageId);
@@ -296,7 +290,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId,
Map<String, Long> properties,
+ private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId,
Map<String, Long> properties,
BitSetRecyclable bitSet) {
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
@@ -314,29 +308,29 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void>
doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+ private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv
msgId) {
ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.computeIfAbsent(
- batchMessageId.toMessageIdImpl(), __ -> {
- ConcurrentBitSetRecyclable value;
- if (batchMessageId.getAcker() != null
- && !(batchMessageId.getAcker() instanceof
BatchMessageAckerDisabled)) {
- value =
ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
+ MessageIdAdvUtils.discardBatch(msgId), __ -> {
+ final BitSet ackSet = msgId.getAckSet();
+ final ConcurrentBitSetRecyclable value;
+ if (ackSet != null && !ackSet.isEmpty()) {
+ value = ConcurrentBitSetRecyclable.create(ackSet);
} else {
value = ConcurrentBitSetRecyclable.create();
- value.set(0, batchMessageId.getOriginalBatchSize());
+ value.set(0, msgId.getBatchSize());
}
return value;
});
- bitSet.clear(batchMessageId.getBatchIndex());
+ bitSet.clear(msgId.getBatchIndex());
return CompletableFuture.completedFuture(null);
}
- private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable
bitSet) {
+ private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable
bitSet) {
// Handle concurrent updates from different threads
lastCumulativeAck.update(msgId, bitSet);
}
- private CompletableFuture<Void>
doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
+ private CompletableFuture<Void> doCumulativeBatchIndexAck(MessageIdAdv
batchMessageId,
Map<String,
Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
return doImmediateBatchIndexAck(batchMessageId,
batchMessageId.getBatchIndex(),
@@ -349,7 +343,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> doImmediateAck(MessageIdImpl msgId,
AckType ackType, Map<String, Long> properties,
+ private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType
ackType, Map<String, Long> properties,
BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();
@@ -360,7 +354,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet,
ackType, properties, cnx);
}
- private CompletableFuture<Void>
doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int
batchSize,
+ private CompletableFuture<Void> doImmediateBatchIndexAck(MessageIdAdv
msgId, int batchIndex, int batchSize,
AckType ackType,
Map<String, Long> properties) {
ClientCnx cnx = consumer.getClientCnx();
@@ -369,8 +363,8 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
.ConnectException("Consumer connect fail! consumer state:"
+ consumer.getState()));
}
BitSetRecyclable bitSet;
- if (msgId.getAcker() != null && !(msgId.getAcker() instanceof
BatchMessageAckerDisabled)) {
- bitSet =
BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
+ if (msgId.getAckSet() != null) {
+ bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray());
} else {
bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
@@ -382,7 +376,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
CompletableFuture<Void> completableFuture =
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
- msgId.ledgerId, msgId.entryId, bitSet, ackType, properties,
true, null, null);
+ msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType,
properties, true, null, null);
bitSet.recycle();
return completableFuture;
}
@@ -414,7 +408,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
boolean shouldFlush = false;
if (lastCumulativeAckToFlush != null) {
shouldFlush = true;
- final MessageIdImpl messageId =
lastCumulativeAckToFlush.getMessageId();
+ final MessageIdAdv messageId =
lastCumulativeAckToFlush.getMessageId();
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
messageId.getLedgerId(), messageId.getEntryId(),
lastCumulativeAckToFlush.getBitSetRecyclable(),
AckType.Cumulative,
Collections.emptyMap(), false,
@@ -429,7 +423,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
// We can send 1 single protobuf command with all individual
acks
while (true) {
- MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
+ MessageIdAdv msgId = pendingIndividualAcks.pollFirst();
if (msgId == null) {
break;
}
@@ -452,7 +446,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
} else {
// When talking to older brokers, send the acknowledgements
individually
while (true) {
- MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
+ MessageIdAdv msgId = pendingIndividualAcks.pollFirst();
if (msgId == null) {
break;
}
@@ -465,12 +459,13 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
if (!pendingIndividualBatchIndexAcks.isEmpty()) {
- Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>>
iterator =
+ Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>>
iterator =
pendingIndividualBatchIndexAcks.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry =
iterator.next();
- entriesToAck.add(Triple.of(entry.getKey().ledgerId,
entry.getKey().entryId, entry.getValue()));
+ Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
iterator.next();
+ entriesToAck.add(Triple.of(
+ entry.getKey().getLedgerId(),
entry.getKey().getEntryId(), entry.getValue()));
iterator.remove();
}
}
@@ -509,7 +504,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
- private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId,
MessageIdImpl msgId,
+ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId,
MessageIdAdv msgId,
BitSetRecyclable
bitSet, AckType ackType,
Map<String, Long>
map, ClientCnx cnx) {
MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
@@ -535,7 +530,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
completableFuture = CompletableFuture.completedFuture(null);
}
} else {
- completableFuture = newMessageAckCommandAndWrite(cnx, consumerId,
msgId.ledgerId, msgId.getEntryId(),
+ completableFuture = newMessageAckCommandAndWrite(cnx, consumerId,
msgId.getLedgerId(), msgId.getEntryId(),
bitSet, ackType, map, true, null, null);
}
return completableFuture;
@@ -621,13 +616,13 @@ class LastCumulativeAck {
return new LastCumulativeAck();
}
};
- public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl)
MessageIdImpl.earliest;
+ public static final MessageIdAdv DEFAULT_MESSAGE_ID = (MessageIdAdv)
MessageId.earliest;
- private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+ private volatile MessageIdAdv messageId = DEFAULT_MESSAGE_ID;
private BitSetRecyclable bitSetRecyclable = null;
private boolean flushRequired = false;
- public synchronized void update(final MessageIdImpl messageId, final
BitSetRecyclable bitSetRecyclable) {
+ public synchronized void update(final MessageIdAdv messageId, final
BitSetRecyclable bitSetRecyclable) {
if (compareTo(messageId) < 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable !=
bitSetRecyclable) {
this.bitSetRecyclable.recycle();
@@ -662,25 +657,22 @@ class LastCumulativeAck {
flushRequired = false;
}
- public synchronized int compareTo(MessageId messageId) {
- if (this.messageId instanceof BatchMessageIdImpl && (!(messageId
instanceof BatchMessageIdImpl))) {
- final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
- final MessageIdImpl rhs = (MessageIdImpl) messageId;
- return MessageIdImpl.messageIdCompare(
- lhs.getLedgerId(), lhs.getEntryId(),
lhs.getPartitionIndex(), lhs.getBatchIndex(),
- rhs.getLedgerId(), rhs.getEntryId(),
rhs.getPartitionIndex(), Integer.MAX_VALUE);
- } else if (messageId instanceof BatchMessageIdImpl &&
(!(this.messageId instanceof BatchMessageIdImpl))){
- final MessageIdImpl lhs = this.messageId;
- final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
- return MessageIdImpl.messageIdCompare(
- lhs.getLedgerId(), lhs.getEntryId(),
lhs.getPartitionIndex(), Integer.MAX_VALUE,
- rhs.getLedgerId(), rhs.getEntryId(),
rhs.getPartitionIndex(), rhs.getBatchIndex());
- } else {
- return this.messageId.compareTo(messageId);
+ public synchronized int compareTo(MessageIdAdv messageId) {
+ int result = Long.compare(this.messageId.getLedgerId(),
messageId.getLedgerId());
+ if (result != 0) {
+ return result;
+ }
+ result = Long.compare(this.messageId.getEntryId(),
messageId.getEntryId());
+ if (result != 0) {
+ return result;
}
+ return Integer.compare(
+ (this.messageId.getBatchIndex() >= 0) ?
this.messageId.getBatchIndex() : Integer.MAX_VALUE,
+ (messageId.getBatchIndex() >= 0) ? messageId.getBatchIndex() :
Integer.MAX_VALUE
+ );
}
- private synchronized void set(final MessageIdImpl messageId, final
BitSetRecyclable bitSetRecyclable) {
+ private synchronized void set(final MessageIdAdv messageId, final
BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index 06f79024c4f..1c4230470db 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -22,6 +22,8 @@ import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.TopicMessageId;
@Data
@NoArgsConstructor
@@ -67,18 +69,12 @@ public class ResetCursorData {
}
public ResetCursorData(MessageId messageId) {
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- this.ledgerId = batchMessageId.getLedgerId();
- this.entryId = batchMessageId.getEntryId();
- this.batchIndex = batchMessageId.getBatchIndex();
- this.partitionIndex = batchMessageId.partitionIndex;
- } else if (messageId instanceof MessageIdImpl) {
- MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
- this.ledgerId = messageIdImpl.getLedgerId();
- this.entryId = messageIdImpl.getEntryId();
- this.partitionIndex = messageIdImpl.partitionIndex;
- } else if (messageId instanceof TopicMessageIdImpl) {
+ MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ this.ledgerId = messageIdAdv.getLedgerId();
+ this.entryId = messageIdAdv.getEntryId();
+ this.batchIndex = messageIdAdv.getBatchIndex();
+ this.partitionIndex = messageIdAdv.getPartitionIndex();
+ if (messageId instanceof TopicMessageId) {
throw new IllegalArgumentException("Not supported operation on
partitioned-topic");
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 941f18cf65a..189dc1c6083 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -21,16 +21,12 @@ package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.TopicMessageId;
-public class TopicMessageIdImpl implements TopicMessageId {
+public class TopicMessageIdImpl extends TopicMessageId.Impl {
- /** This topicPartitionName is get from ConsumerImpl, it contains
partition part. */
- private final String topicPartitionName;
private final String topicName;
- private final MessageId messageId;
public TopicMessageIdImpl(String topicPartitionName, String topicName,
MessageId messageId) {
- this.messageId = messageId;
- this.topicPartitionName = topicPartitionName;
+ super(topicPartitionName, messageId);
this.topicName = topicName;
}
@@ -49,40 +45,21 @@ public class TopicMessageIdImpl implements TopicMessageId {
*/
@Deprecated
public String getTopicPartitionName() {
- return this.topicPartitionName;
+ return getOwnerTopic();
}
+ @Deprecated
public MessageId getInnerMessageId() {
- return messageId;
- }
-
- @Override
- public String toString() {
- return messageId.toString();
- }
-
- @Override
- public byte[] toByteArray() {
- return messageId.toByteArray();
- }
-
- @Override
- public int hashCode() {
- return messageId.hashCode();
+ return new MessageIdImpl(getLedgerId(), getEntryId(),
getPartitionIndex());
}
@Override
public boolean equals(Object obj) {
- return messageId.equals(obj);
+ return super.equals(obj);
}
@Override
- public int compareTo(MessageId o) {
- return messageId.compareTo(o);
- }
-
- @Override
- public String getOwnerTopic() {
- return topicPartitionName;
+ public int hashCode() {
+ return super.hashCode();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index c3fcb0a16a3..d24ecbd6aa9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -70,7 +70,7 @@ public class TopicMessageImpl<T> implements Message<T> {
@Deprecated
public MessageId getInnerMessageId() {
- return MessageIdImpl.convertToMessageIdImpl(messageId);
+ return messageId.getInnerMessageId();
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 42eb197d632..ae874b4da6d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -174,7 +174,8 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
}
waitingOnListenerForZeroQueueSize = true;
trackMessage(message);
-
unAckedMessageTracker.add(normalizeMessageId(message.getMessageId()),
message.getRedeliveryCount());
+ unAckedMessageTracker.add(
+
MessageIdAdvUtils.discardBatch(message.getMessageId()),
message.getRedeliveryCount());
listener.received(ZeroQueueConsumerImpl.this,
beforeConsume(message));
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing
unqueued message: {}", topic, subscription,
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml
b/pulsar-client/src/main/resources/findbugsExclude.xml
index e5f8babe841..92ec9e934ee 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -1007,4 +1007,9 @@
<Method name="getThread"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.BatchMessageIdImpl"/>
+ <Method name="getAckSet"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
</FindBugsFilter>
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index ddca6951e49..0418a54c772 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
@@ -61,7 +62,7 @@ public class AcknowledgementsGroupingTrackerTest {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
consumer.unAckedChunkedMessageIdSequenceMap =
- ConcurrentOpenHashMap.<MessageIdImpl,
MessageIdImpl[]>newBuilder().build();
+ ConcurrentOpenHashMap.<MessageIdAdv,
MessageIdImpl[]>newBuilder().build();
cnx = spy(new ClientCnxTest(new ClientConfigurationData(),
eventLoopGroup));
PulsarClientImpl client = mock(PulsarClientImpl.class);
doReturn(client).when(consumer).getClient();
@@ -391,21 +392,21 @@ public class AcknowledgementsGroupingTrackerTest {
public void testDoIndividualBatchAckAsync() throws Exception{
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
AcknowledgmentsGroupingTracker tracker = new
PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
- MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10,
BatchMessageAckerDisabled.INSTANCE);
+ MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null);
BitSet bitSet = new BitSet(20);
for(int i = 0; i < 20; i ++) {
bitSet.set(i, true);
}
- MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20,
BatchMessageAcker.newAcker(bitSet));
+ MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet);
Method doIndividualBatchAckAsync =
PersistentAcknowledgmentsGroupingTracker.class
- .getDeclaredMethod("doIndividualBatchAckAsync",
BatchMessageIdImpl.class);
+ .getDeclaredMethod("doIndividualBatchAckAsync",
MessageIdAdv.class);
doIndividualBatchAckAsync.setAccessible(true);
doIndividualBatchAckAsync.invoke(tracker, messageId1);
doIndividualBatchAckAsync.invoke(tracker, messageId2);
Field pendingIndividualBatchIndexAcks =
PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
pendingIndividualBatchIndexAcks.setAccessible(true);
- ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>
batchIndexAcks =
- (ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>)
pendingIndividualBatchIndexAcks.get(tracker);
+ ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>
batchIndexAcks =
+ (ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>)
pendingIndividualBatchIndexAcks.get(tracker);
MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
assertTrue(batchIndexAcks.containsKey(position1));
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
deleted file mode 100644
index 1b3795d878c..00000000000
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import org.testng.annotations.Test;
-
-public class BatchMessageAckerDisabledTest {
-
- @Test
- public void testAckIndividual() {
- for (int i = 0; i < 10; i++) {
- assertTrue(BatchMessageAckerDisabled.INSTANCE.ackIndividual(i));
- }
- }
-
- @Test
- public void testAckCumulative() {
- for (int i = 0; i < 10; i++) {
- assertTrue(BatchMessageAckerDisabled.INSTANCE.ackCumulative(i));
- }
- }
-
- @Test
- public void testGetOutstandingAcks() {
- assertEquals(0,
BatchMessageAckerDisabled.INSTANCE.getOutstandingAcks());
- }
-
-}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
deleted file mode 100644
index d31fd18cba9..00000000000
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.BitSet;
-
-public class BatchMessageAckerTest {
-
- private static final int BATCH_SIZE = 10;
-
- private BatchMessageAcker acker;
-
- @BeforeMethod
- public void setup() {
- acker = BatchMessageAcker.newAcker(10);
- }
-
- @Test
- public void testAckers() {
- assertEquals(BATCH_SIZE, acker.getOutstandingAcks());
- assertEquals(BATCH_SIZE, acker.getBatchSize());
-
- assertFalse(acker.ackIndividual(4));
- for (int i = 0; i < BATCH_SIZE; i++) {
- if (4 == i) {
- assertFalse(acker.getBitSet().get(i));
- } else {
- assertTrue(acker.getBitSet().get(i));
- }
- }
-
- assertFalse(acker.ackCumulative(6));
- for (int i = 0; i < BATCH_SIZE; i++) {
- if (i <= 6) {
- assertFalse(acker.getBitSet().get(i));
- } else {
- assertTrue(acker.getBitSet().get(i));
- }
- }
-
- for (int i = BATCH_SIZE - 1; i >= 8; i--) {
- assertFalse(acker.ackIndividual(i));
- assertFalse(acker.getBitSet().get(i));
- }
-
- assertTrue(acker.ackIndividual(7));
- assertEquals(0, acker.getOutstandingAcks());
- }
-
- @Test
- public void testBitSetAcker() {
- BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray());
- BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet);
-
- Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet());
- Assert.assertEquals(acker.getOutstandingAcks(),
bitSetAcker.getOutstandingAcks());
- }
-
-}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
index 6bf9cd94348..10d805cdc4d 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
@@ -20,13 +20,8 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.IOException;
import java.util.Collections;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.Test;
public class BatchMessageIdImplTest {
@@ -123,36 +118,10 @@ public class BatchMessageIdImplTest {
assertEquals(batchMsgId2.hashCode(), msgId2.hashCode());
}
- @Test
- public void deserializationTest() {
- // initialize BitSet with null
- BatchMessageAcker ackerDisabled = new BatchMessageAcker(null, 0);
- BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0, 0,
ackerDisabled);
-
- ObjectWriter writer =
ObjectMapperFactory.create().writerWithDefaultPrettyPrinter();
-
- try {
- writer.writeValueAsString(batchMsgId);
- fail("Shouldn't be deserialized");
- } catch (JsonProcessingException e) {
- // expected
- assertTrue(e.getCause() instanceof NullPointerException);
- }
-
- // use the default BatchMessageAckerDisabled
- BatchMessageIdImpl batchMsgIdToDeserialize = new BatchMessageIdImpl(0,
0, 0, 0);
-
- try {
- writer.writeValueAsString(batchMsgIdToDeserialize);
- } catch (JsonProcessingException e) {
- fail("Should be successful");
- }
- }
-
@Test
public void serializeAndDeserializeTest() throws IOException {
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(1, 1, 0,
- 1, 10, BatchMessageAcker.newAcker(10));
+ 1, 10, BatchMessageIdImpl.newAckSet(10));
byte[] serialized = batchMessageId.toByteArray();
BatchMessageIdImpl deserialized = (BatchMessageIdImpl)
MessageIdImpl.fromByteArray(serialized);
assertEquals(deserialized.getBatchSize(),
batchMessageId.getBatchSize());
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
index 7f029635241..4173d6439b9 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
@@ -43,8 +43,7 @@ public class MessageIdSerializationTest {
@Test
public void testBatchSizeNotSet() throws Exception {
- MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
- BatchMessageAckerDisabled.INSTANCE);
+ MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1, null);
byte[] serialized = id.toByteArray();
assertEquals(MessageId.fromByteArray(serialized), id);
assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"),
id);
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 28cce0fe622..7df173da0f1 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -48,6 +48,7 @@ import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDataBasic;
@@ -325,7 +326,7 @@ public class FunctionCommon {
}
public static final long getSequenceId(MessageId messageId) {
- MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(messageId);
+ MessageIdAdv msgId = (MessageIdAdv) messageId;
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 10efc91ccda..ff0bfd391e8 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -389,6 +390,23 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
int batchIdx;
}
+ private static Method getMethodOfMessageId(MessageId messageId, String
name) throws NoSuchMethodException {
+ Class<?> clazz = messageId.getClass();
+ NoSuchMethodException firstException = null;
+ while (clazz != null) {
+ try {
+ return clazz.getDeclaredMethod(name);
+ } catch (NoSuchMethodException e) {
+ if (firstException == null) {
+ firstException = e;
+ }
+ clazz = clazz.getSuperclass();
+ }
+ }
+ assert firstException != null;
+ throw firstException;
+ }
+
@VisibleForTesting
static BatchMessageSequenceRef
getMessageSequenceRefForBatchMessage(MessageId messageId) {
long ledgerId;
@@ -396,23 +414,17 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
int batchIdx;
try {
try {
- messageId = (MessageId)
messageId.getClass().getDeclaredMethod("getInnerMessageId").invoke(messageId);
- } catch (NoSuchMethodException noSuchMethodException) {
- // not a TopicMessageIdImpl
- }
-
- try {
- batchIdx = (int)
messageId.getClass().getDeclaredMethod("getBatchIndex").invoke(messageId);
+ batchIdx = (int) getMethodOfMessageId(messageId,
"getBatchIndex").invoke(messageId);
+ if (batchIdx < 0) {
+ return null;
+ }
} catch (NoSuchMethodException noSuchMethodException) {
// not a BatchMessageIdImpl, returning null to use the
standard sequenceId
return null;
}
- // if getBatchIndex exists it means messageId is a
'BatchMessageIdImpl' instance.
- final Class<?> messageIdImplClass =
messageId.getClass().getSuperclass();
-
- ledgerId = (long)
messageIdImplClass.getDeclaredMethod("getLedgerId").invoke(messageId);
- entryId = (long)
messageIdImplClass.getDeclaredMethod("getEntryId").invoke(messageId);
+ ledgerId = (long) getMethodOfMessageId(messageId,
"getLedgerId").invoke(messageId);
+ entryId = (long) getMethodOfMessageId(messageId,
"getEntryId").invoke(messageId);
} catch (IllegalAccessException | NoSuchMethodException |
InvocationTargetException ex) {
log.error("Unexpected error while retrieving sequenceId, messageId
class: {}, error: {}",
messageId.getClass().getName(), ex.getMessage(), ex);