This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17c58a53931 [improve][client] PIP-224 Part 1: Add TopicMessageId for
seek and acknowledge (#19158)
17c58a53931 is described below
commit 17c58a539315cc4ea39655d4328c5caf55f87d3d
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jan 31 22:06:16 2023 +0800
[improve][client] PIP-224 Part 1: Add TopicMessageId for seek and
acknowledge (#19158)
---
.../broker/service/PersistentFailoverE2ETest.java | 5 +-
.../broker/service/SubscriptionSeekTest.java | 4 +-
.../pulsar/client/api/MultiTopicsConsumerTest.java | 134 +++++++++++++++++++--
.../api/PartitionedProducerConsumerTest.java | 3 +-
.../apache/pulsar/client/impl/MessageIdTest.java | 5 +-
.../pulsar/client/impl/NegativeAcksTest.java | 3 +-
.../org/apache/pulsar/client/api/Consumer.java | 18 +--
.../pulsar/client/api/MessageAcknowledger.java | 6 +
.../apache/pulsar/client/api/TopicMessageId.java | 91 ++++++++++++++
.../pulsar/client/impl/BatchMessageIdImpl.java | 5 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 43 +++----
.../apache/pulsar/client/impl/MessageIdImpl.java | 25 ++--
.../client/impl/MultiTopicsConsumerImpl.java | 115 +++++++++++-------
.../pulsar/client/impl/NegativeAcksTracker.java | 5 +-
.../pulsar/client/impl/TopicMessageIdImpl.java | 10 +-
.../pulsar/client/impl/TopicMessageImpl.java | 7 +-
.../impl/UnAckedTopicMessageRedeliveryTracker.java | 9 +-
.../client/impl/UnAckedTopicMessageTracker.java | 5 +-
.../org/apache/pulsar/client/impl/MessageTest.java | 4 +-
.../pulsar/functions/utils/FunctionCommon.java | 5 +-
.../apache/pulsar/websocket/ConsumerHandler.java | 5 +-
.../tests/integration/semantics/SemanticsTest.java | 4 +-
22 files changed, 373 insertions(+), 138 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 7be0590fe53..ffc1444676b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -337,7 +336,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
totalMessages++;
consumer1.acknowledge(msg);
- MessageIdImpl msgId = (MessageIdImpl)
(((TopicMessageImpl)msg).getInnerMessageId());
+ MessageIdImpl msgId =
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
@@ -354,7 +353,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
totalMessages++;
consumer2.acknowledge(msg);
- MessageIdImpl msgId = (MessageIdImpl)
(((TopicMessageImpl)msg).getInnerMessageId());
+ MessageIdImpl msgId =
MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns,
receivedPtns).isEmpty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index b6f1771c088..2c2f62529d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
@@ -679,8 +678,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
if (message == null) {
break;
}
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
message.getMessageId();
- received.add(topicMessageId.getInnerMessageId());
+
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 6bd11de5a2f..b8ea87ab401 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -23,8 +23,15 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -32,34 +39,38 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class MultiTopicsConsumerTest extends ProducerConsumerBase {
- private static final Logger log =
LoggerFactory.getLogger(MultiTopicsConsumerTest.class);
private ScheduledExecutorService internalExecutorServiceDelegate;
- @BeforeMethod(alwaysRun = true)
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -231,4 +242,113 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}
+
+ @Test(timeOut = 30000)
+ public void testAcknowledgeWrongMessageId() throws Exception {
+ final var topic1 = newTopicName();
+ final var topic2 = newTopicName();
+
+ @Cleanup final var singleTopicConsumer = pulsarClient.newConsumer()
+ .topic(topic1)
+ .subscriptionName("sub-1")
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ assertTrue(singleTopicConsumer instanceof ConsumerImpl);
+
+ @Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer()
+ .topics(List.of(topic1, topic2))
+ .subscriptionName("sub-2")
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);
+
+ @Cleanup final var producer =
pulsarClient.newProducer().topic(topic1).create();
+ final var nonTopicMessageIds = new ArrayList<MessageId>();
+ nonTopicMessageIds.add(producer.send(new byte[]{ 0x00 }));
+ nonTopicMessageIds.add(singleTopicConsumer.receive().getMessageId());
+
+ // Multi-topics consumers can only acknowledge TopicMessageId,
otherwise NotAllowedException will be thrown
+ for (var msgId : nonTopicMessageIds) {
+ assertFalse(msgId instanceof TopicMessageId);
+
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
+ () -> multiTopicsConsumer.acknowledge(msgId));
+
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
+ () ->
multiTopicsConsumer.acknowledge(Collections.singletonList(msgId)));
+
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
+ () -> multiTopicsConsumer.acknowledgeCumulative(msgId));
+ }
+
+ // Single-topic consumer can acknowledge TopicMessageId
+ final var topicMessageId =
multiTopicsConsumer.receive().getMessageId();
+ assertTrue(topicMessageId instanceof TopicMessageId);
+ assertFalse(topicMessageId instanceof MessageIdImpl);
+ singleTopicConsumer.acknowledge(topicMessageId);
+ }
+
+ @DataProvider
+ public static Object[][] messageIdFromProducer() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(timeOut = 30000, dataProvider = "messageIdFromProducer")
+ public void testSeekCustomTopicMessageId(boolean messageIdFromProducer)
throws Exception {
+ final var topic = TopicName.get(newTopicName()).toString();
+ final var numPartitions = 3;
+ admin.topics().createPartitionedTopic(topic, numPartitions);
+
+ @Cleanup final var producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .messageRouter(new MessageRouter() {
+ int index = 0;
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return index++ % metadata.numPartitions();
+ }
+ })
+ .create();
+ @Cleanup final var consumer =
pulsarClient.newConsumer(Schema.INT32).topic(topic)
+ .subscriptionName("sub").subscribe();
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+
+ final var msgIds = new HashMap<String, List<MessageId>>();
+ final var numMessagesPerPartition = 10;
+ final var numMessages = numPartitions * numMessagesPerPartition;
+ for (int i = 0; i < numMessages; i++) {
+ var msgId = (MessageIdImpl) producer.send(i);
+ if (messageIdFromProducer) {
+ msgIds.computeIfAbsent(topic +
TopicName.PARTITIONED_TOPIC_SUFFIX + msgId.getPartitionIndex(),
+ __ -> new ArrayList<>()).add(msgId);
+ } else {
+ var topicMessageId = (TopicMessageId)
consumer.receive().getMessageId();
+ msgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ ->
new ArrayList<>()).add(topicMessageId);
+ }
+ }
+
+ final var partitions = IntStream.range(0, numPartitions)
+ .mapToObj(i -> topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)
+ .collect(Collectors.toSet());
+ assertEquals(msgIds.keySet(), partitions);
+
+ for (var partition : partitions) {
+ final var msgIdList = msgIds.get(partition);
+ assertEquals(msgIdList.size(), numMessagesPerPartition);
+ if (messageIdFromProducer) {
+ consumer.seek(TopicMessageId.create(partition,
msgIdList.get(numMessagesPerPartition / 2)));
+ } else {
+ consumer.seek(msgIdList.get(numMessagesPerPartition / 2));
+ }
+ }
+
+ var topicMsgIds = new HashMap<String, List<TopicMessageId>>();
+ for (int i = 0; i < ((numMessagesPerPartition / 2 - 1) *
numPartitions); i++) {
+ TopicMessageId topicMessageId = (TopicMessageId)
consumer.receive().getMessageId();
+ topicMsgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ ->
new ArrayList<>()).add(topicMessageId);
+ }
+ assertEquals(topicMsgIds.keySet(), partitions);
+ for (var partition : partitions) {
+ assertEquals(topicMsgIds.get(partition),
+ msgIds.get(partition).subList(numMessagesPerPartition / 2
+ 1, numMessagesPerPartition));
+ }
+ consumer.close();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index e0e1bf20e7c..cd384e58789 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
@@ -768,7 +767,7 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
-
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(),
2);
+
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(),
2);
consumer1.acknowledge(msg);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index c4cdcbd19d5..ceb5c51e6aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -119,7 +119,7 @@ public class MessageIdTest extends BrokerTestBase {
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
- messageId = ((TopicMessageIdImpl)
messageId).getInnerMessageId();
+ messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
}
assertTrue(messageIds.remove(messageId), "Failed to receive
message");
}
@@ -166,9 +166,6 @@ public class MessageIdTest extends BrokerTestBase {
for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId();
- if (topicType == TopicType.PARTITIONED) {
- messageId = ((TopicMessageIdImpl)
messageId).getInnerMessageId();
- }
assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
}
log.info("Remaining message IDs = {}", messageIds);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 679d6c1a19e..b4d01e263bc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
@@ -292,7 +293,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
.subscribe();
MessageId messageId = new MessageIdImpl(3, 1, 0);
- TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1",
"topic-1", messageId);
+ TopicMessageId topicMessageId = TopicMessageId.create("topic-1",
messageId);
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0,
1);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0,
2);
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 9a3ef7833df..69409900496 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -471,7 +471,9 @@ public interface Consumer<T> extends Closeable,
MessageAcknowledger {
* <li><code>MessageId.latest</code> : Reset the subscription on the
latest message in the topic
* </ul>
*
- * <p>Note: For multi-topics consumer, you can only seek to the earliest
or latest message.
+ * <p>Note: For multi-topics consumer, if `messageId` is a {@link
TopicMessageId}, the seek operation will happen
+ * on the owner topic of the message, which is returned by {@link
TopicMessageId#getOwnerTopic()}. Otherwise, you
+ * can only seek to the earliest or latest message for all topics
subscribed.
*
* @param messageId
* the message id where to reposition the subscription
@@ -519,19 +521,7 @@ public interface Consumer<T> extends Closeable,
MessageAcknowledger {
CompletableFuture<Void> seekAsync(Function<String, Object> function);
/**
- * Reset the subscription associated with this consumer to a specific
message id.
- *
- * <p>The message id can either be a specific message or represent the
first or last messages in the topic.
- * <ul>
- * <li><code>MessageId.earliest</code> : Reset the subscription on the
earliest message available in the topic
- * <li><code>MessageId.latest</code> : Reset the subscription on the
latest message in the topic
- * </ul>
- *
- * <p>Note: For multi-topics consumer, you can only seek to the earliest
or latest message.
- *
- * @param messageId
- * the message id where to reposition the subscription
- * @return a future to track the completion of the seek operation
+ * The asynchronous version of {@link Consumer#seek(MessageId)}.
*/
CompletableFuture<Void> seekAsync(MessageId messageId);
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
index c0a53983c5a..d1bab3abb5b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
@@ -49,6 +49,8 @@ public interface MessageAcknowledger {
*
* @throws PulsarClientException.AlreadyClosedException}
* if the consumer was already closed
+ * @throws PulsarClientException.NotAllowedException
+ * if `messageId` is not a {@link TopicMessageId} when
multiple topics are subscribed
*/
void acknowledge(MessageId messageId) throws PulsarClientException;
@@ -59,6 +61,8 @@ public interface MessageAcknowledger {
/**
* Acknowledge the consumption of a list of message.
* @param messageIdList the list of message IDs.
+ * @throws PulsarClientException.NotAllowedException
+ * if any message id in the list is not a {@link TopicMessageId} when
multiple topics are subscribed
*/
void acknowledge(List<MessageId> messageIdList) throws
PulsarClientException;
@@ -82,6 +86,8 @@ public interface MessageAcknowledger {
* The {@code MessageId} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
+ * @throws PulsarClientException.NotAllowedException
+ * if `messageId` is not a {@link TopicMessageId} when
multiple topics are subscribed
*/
void acknowledgeCumulative(MessageId messageId) throws
PulsarClientException;
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
new file mode 100644
index 00000000000..f6109d5f8e8
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The MessageId used for a consumer that subscribes multiple topics or
partitioned topics.
+ *
+ * <p>
+ * It's guaranteed that {@link Message#getMessageId()} must return a
TopicMessageId instance if the Message is received
+ * from a consumer that subscribes multiple topics or partitioned topics.
+ * The topic name used in APIs related to this class like `getOwnerTopic` and
`create` must be the full topic name. For
+ * example, "my-topic" is invalid while "persistent://public/default/my-topic"
is valid.
+ * If the topic is a partitioned topic, the topic name should be the name of
the specific partition, e.g.
+ * "persistent://public/default/my-topic-partition-0".
+ * </p>
+ */
+public interface TopicMessageId extends MessageId {
+
+ /**
+ * Return the owner topic name of a message.
+ *
+ * @return the owner topic
+ */
+ String getOwnerTopic();
+
+ static TopicMessageId create(String topic, MessageId messageId) {
+ if (messageId instanceof TopicMessageId) {
+ return (TopicMessageId) messageId;
+ }
+ return new Impl(topic, messageId);
+ }
+
+ /**
+ * The simplest implementation of a TopicMessageId interface.
+ */
+ class Impl implements TopicMessageId {
+ private final String topic;
+ private final MessageId messageId;
+
+ public Impl(String topic, MessageId messageId) {
+ this.topic = topic;
+ this.messageId = messageId;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return messageId.toByteArray();
+ }
+
+ @Override
+ public String getOwnerTopic() {
+ return topic;
+ }
+
+ @Override
+ public int compareTo(MessageId o) {
+ return messageId.compareTo(o);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return messageId.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return messageId.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return messageId.toString();
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index 7e3a143dff8..ed28082ff6a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import javax.annotation.Nonnull;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
/**
*/
@@ -77,8 +78,8 @@ public class BatchMessageIdImpl extends MessageIdImpl {
this.ledgerId, this.entryId, this.partitionIndex,
this.batchIndex,
other.ledgerId, other.entryId, other.partitionIndex, batchIndex
);
- } else if (o instanceof TopicMessageIdImpl) {
- return compareTo(((TopicMessageIdImpl) o).getInnerMessageId());
+ } else if (o instanceof TopicMessageId) {
+ return compareTo(MessageIdImpl.convertToMessageIdImpl(o));
} else {
throw new UnsupportedOperationException("Unknown MessageId type: "
+ o.getClass().getName());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8fef7399836..a59a67adbaa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -533,7 +534,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String, Long>
properties,
TransactionImpl txn) {
- checkArgument(messageId instanceof MessageIdImpl);
+ messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
@@ -590,10 +591,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.InvalidMessageException("Cannot handle message with null
messageId"));
}
- if (messageId instanceof TopicMessageIdImpl) {
- messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
- }
- checkArgument(messageId instanceof MessageIdImpl);
+ messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
@@ -629,7 +627,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (retryLetterProducer != null) {
try {
MessageImpl<T> retryMessage = (MessageImpl<T>)
getMessageImpl(message);
- String originMessageIdStr = getOriginMessageIdStr(message);
+ String originMessageIdStr = message.getMessageId().toString();
String originTopicNameStr = getOriginTopicNameStr(message);
SortedMap<String, String> propertiesMap =
getPropertiesMap(message, originMessageIdStr,
originTopicNameStr);
@@ -721,22 +719,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return propertiesMap;
}
- private String getOriginMessageIdStr(Message<?> message) {
- if (message instanceof TopicMessageImpl) {
- return ((TopicMessageIdImpl)
message.getMessageId()).getInnerMessageId().toString();
- } else if (message instanceof MessageImpl) {
- return message.getMessageId().toString();
- }
- return null;
- }
-
private String getOriginTopicNameStr(Message<?> message) {
- if (message instanceof TopicMessageImpl) {
- return ((TopicMessageIdImpl)
message.getMessageId()).getTopicName();
- } else if (message instanceof MessageImpl) {
+ MessageId messageId = message.getMessageId();
+ if (messageId instanceof TopicMessageId) {
+ String topic = ((TopicMessageId) messageId).getOwnerTopic();
+ int index = topic.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
+ if (index < 0) {
+ return topic;
+ } else {
+ return topic.substring(0, index);
+ }
+ } else {
return message.getTopicName();
}
- return null;
}
private MessageImpl<?> getMessageImpl(Message<?> message) {
@@ -2008,7 +2003,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
MessageIdImpl finalMessageId = messageId;
deadLetterProducer.thenAcceptAsync(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
- String originMessageIdStr = getOriginMessageIdStr(message);
+ String originMessageIdStr =
message.getMessageId().toString();
String originTopicNameStr = getOriginTopicNameStr(message);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
@@ -2177,7 +2172,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public CompletableFuture<Void> seekAsync(MessageId messageId) {
+ public CompletableFuture<Void> seekAsync(MessageId originalMessageId) {
+ final MessageIdImpl messageId =
MessageIdImpl.convertToMessageIdImpl(originalMessageId);
String seekBy = String.format("the message %s", messageId.toString());
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
@@ -2197,8 +2193,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
seek = Commands.newSeek(consumerId, requestId,
msgId.getFirstChunkMessageId().getLedgerId(),
msgId.getFirstChunkMessageId().getEntryId(), new
long[0]);
} else {
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+ seek = Commands.newSeek(
+ consumerId, requestId, messageId.getLedgerId(),
messageId.getEntryId(), new long[0]);
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
});
@@ -2573,6 +2569,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.connectionHandler.grabCnx();
}
+ @Deprecated
public String getTopicNameWithoutPartition() {
return topicNameWithoutPartition;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 02298e0f9d6..1a0f491a6a7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -27,7 +27,9 @@ import java.io.IOException;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.TopicName;
public class MessageIdImpl implements MessageId {
@@ -116,15 +118,20 @@ public class MessageIdImpl implements MessageId {
return messageId;
}
+ @InterfaceStability.Unstable
public static MessageIdImpl convertToMessageIdImpl(MessageId messageId) {
- if (messageId instanceof BatchMessageIdImpl) {
- return (BatchMessageIdImpl) messageId;
- } else if (messageId instanceof MessageIdImpl) {
- return (MessageIdImpl) messageId;
- } else if (messageId instanceof TopicMessageIdImpl) {
- return convertToMessageIdImpl(((TopicMessageIdImpl)
messageId).getInnerMessageId());
+ if (messageId instanceof TopicMessageId) {
+ if (messageId instanceof TopicMessageIdImpl) {
+ return (MessageIdImpl) ((TopicMessageIdImpl)
messageId).getInnerMessageId();
+ } else {
+ try {
+ return (MessageIdImpl)
MessageId.fromByteArray(messageId.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
- return null;
+ return (MessageIdImpl) messageId;
}
public static MessageId fromByteArrayWithTopic(byte[] data, String
topicName) throws IOException {
@@ -210,8 +217,8 @@ public class MessageIdImpl implements MessageId {
this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH,
other.ledgerId, other.entryId, other.partitionIndex, batchIndex
);
- } else if (o instanceof TopicMessageIdImpl) {
- return compareTo(((TopicMessageIdImpl) o).getInnerMessageId());
+ } else if (o instanceof TopicMessageId) {
+ return compareTo(convertToMessageIdImpl(o));
} else {
throw new UnsupportedOperationException("Unknown MessageId type: "
+ o.getClass().getName());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 224276ba5f0..341a91e9734 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
@@ -290,8 +291,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// Must be called from the internalPinnedExecutor thread
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message)
{
checkArgument(message instanceof MessageImpl);
- TopicMessageImpl<T> topicMessage = new
TopicMessageImpl<>(consumer.getTopic(),
- consumer.getTopicNameWithoutPartition(), message, consumer);
+ TopicMessageImpl<T> topicMessage = new
TopicMessageImpl<>(consumer.getTopic(), message, consumer);
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
@@ -443,26 +443,26 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String, Long>
properties,
TransactionImpl txnImpl) {
- checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+ if (!(messageId instanceof TopicMessageId)) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotAllowedException(
+ "Only TopicMessageId is allowed to acknowledge for a
multi-topics consumer, while messageId is "
+ + messageId.getClass().getName()));
+ }
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
}
+ TopicMessageId topicMessageId = (TopicMessageId) messageId;
+ ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getOwnerTopic());
+ if (consumer == null) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
+ }
+ MessageId innerMessageId =
MessageIdImpl.convertToMessageIdImpl(topicMessageId);
if (ackType == AckType.Cumulative) {
- Consumer individualConsumer =
consumers.get(topicMessageId.getTopicPartitionName());
- if (individualConsumer != null) {
- MessageId innerId = topicMessageId.getInnerMessageId();
- return individualConsumer.acknowledgeCumulativeAsync(innerId);
- } else {
- return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
- }
+ return consumer.acknowledgeCumulativeAsync(innerMessageId);
} else {
- ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
-
- MessageId innerId = topicMessageId.getInnerMessageId();
- return consumer.doAcknowledgeWithTxn(innerId, ackType, properties,
txnImpl)
+ return consumer.doAcknowledgeWithTxn(innerMessageId, ackType,
properties, txnImpl)
.thenRun(() ->
unAckedMessageTracker.remove(topicMessageId));
}
@@ -473,31 +473,34 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
AckType ackType,
Map<String, Long>
properties,
TransactionImpl txn) {
+ for (MessageId messageId : messageIdList) {
+ if (!(messageId instanceof TopicMessageId)) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotAllowedException(
+ "Only TopicMessageId is allowed to acknowledge for a
multi-topics consumer, while messageId is "
+ + messageId.getClass().getName()));
+ }
+ }
List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
if (ackType == AckType.Cumulative) {
messageIdList.forEach(messageId ->
resultFutures.add(doAcknowledge(messageId, ackType, properties, txn)));
- return CompletableFuture.allOf(resultFutures.toArray(new
CompletableFuture[0]));
} else {
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new
PulsarClientException("Consumer already closed"));
}
Map<String, List<MessageId>> topicToMessageIdMap = new HashMap<>();
for (MessageId messageId : messageIdList) {
- if (!(messageId instanceof TopicMessageIdImpl)) {
- return FutureUtil.failedFuture(
- new IllegalArgumentException("messageId is not
instance of TopicMessageIdImpl"));
- }
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
messageId;
-
topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new
ArrayList<>());
-
topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId());
+ TopicMessageId topicMessageId = (TopicMessageId) messageId;
+
topicToMessageIdMap.putIfAbsent(topicMessageId.getOwnerTopic(), new
ArrayList<>());
+ topicToMessageIdMap.get(topicMessageId.getOwnerTopic())
+
.add(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
}
topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> {
ConsumerImpl<T> consumer = consumers.get(topicPartitionName);
resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds,
ackType, properties, txn)
.thenAccept((res) ->
messageIdList.forEach(unAckedMessageTracker::remove)));
});
- return CompletableFuture.allOf(resultFutures.toArray(new
CompletableFuture[0]));
}
+ return CompletableFuture.allOf(resultFutures.toArray(new
CompletableFuture[0]));
}
@Override
@@ -510,21 +513,25 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return FutureUtil.failedFuture(new PulsarClientException
.InvalidMessageException("Cannot handle message with null
messageId"));
}
- checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+ if (!(messageId instanceof TopicMessageId)) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotAllowedException(
+ "Only TopicMessageId is allowed for reconsumeLater for a
multi-topics consumer, while messageId is "
+ + message.getClass().getName()));
+ }
+ TopicMessageId topicMessageId = (TopicMessageId) messageId;
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
}
if (ackType == AckType.Cumulative) {
- Consumer<?> individualConsumer =
consumers.get(topicMessageId.getTopicPartitionName());
+ Consumer<T> individualConsumer =
consumers.get(topicMessageId.getOwnerTopic());
if (individualConsumer != null) {
return
individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
} else {
return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
}
} else {
- ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
+ ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getOwnerTopic());
return consumer.doReconsumeLater(message, ackType,
customProperties, delayTime, unit)
.thenRun(()
->unAckedMessageTracker.remove(topicMessageId));
}
@@ -532,20 +539,18 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void negativeAcknowledge(MessageId messageId) {
- checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+ checkArgument(messageId instanceof TopicMessageId);
+ TopicMessageId topicMessageId = (TopicMessageId) messageId;
- ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
- consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
+ ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getOwnerTopic());
+
consumer.negativeAcknowledge(MessageIdImpl.convertToMessageIdImpl(topicMessageId));
}
@Override
public void negativeAcknowledge(Message<?> message) {
MessageId messageId = message.getMessageId();
- checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
-
- ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
+ checkArgument(messageId instanceof TopicMessageId);
+ ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
consumer.negativeAcknowledge(message);
}
@@ -680,7 +685,9 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return;
}
- checkArgument(messageIds.stream().findFirst().get() instanceof
TopicMessageIdImpl);
+ for (MessageId messageId : messageIds) {
+ checkArgument(messageId instanceof TopicMessageId);
+ }
if (conf.getSubscriptionType() != SubscriptionType.Shared
&& conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
@@ -689,12 +696,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return;
}
removeExpiredMessagesFromQueue(messageIds);
- messageIds.stream().map(messageId -> (TopicMessageIdImpl) messageId)
-
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName,
Collectors.toSet()))
+ messageIds.stream().map(messageId -> (TopicMessageId) messageId)
+ .collect(Collectors.groupingBy(TopicMessageId::getOwnerTopic,
Collectors.toSet()))
.forEach((topicName, messageIds1) ->
consumers.get(topicName)
.redeliverUnacknowledgedMessages(messageIds1.stream()
- .map(mid ->
mid.getInnerMessageId()).collect(Collectors.toSet())));
+
.map(MessageIdImpl::convertToMessageIdImpl).collect(Collectors.toSet())));
resumeReceivingFromPausedConsumersIfNeeded();
}
@@ -748,19 +755,35 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
- MessageIdImpl targetMessageId =
MessageIdImpl.convertToMessageIdImpl(messageId);
- if (targetMessageId == null ||
isIllegalMultiTopicsMessageId(messageId)) {
+ final Consumer<T> internalConsumer;
+ if (messageId instanceof TopicMessageId) {
+ TopicMessageId topicMessageId = (TopicMessageId) messageId;
+ internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
+ if (internalConsumer == null) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotAllowedException(
+ "The owner topic " + topicMessageId.getOwnerTopic() +
" is not subscribed"));
+ }
+ } else {
+ internalConsumer = null;
+ }
+ if (internalConsumer == null &&
isIllegalMultiTopicsMessageId(messageId)) {
return FutureUtil.failedFuture(
new PulsarClientException("Illegal messageId, messageId
can only be earliest/latest")
);
}
- List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
- consumers.values().forEach(consumerImpl ->
futures.add(consumerImpl.seekAsync(targetMessageId)));
+
+ final CompletableFuture<Void> seekFuture;
+ if (internalConsumer == null) {
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
+ consumers.values().forEach(consumerImpl ->
futures.add(consumerImpl.seekAsync(messageId)));
+ seekFuture = FutureUtil.waitForAll(futures);
+ } else {
+ seekFuture = internalConsumer.seekAsync(messageId);
+ }
unAckedMessageTracker.clear();
clearIncomingMessages();
-
- return FutureUtil.waitForAll(futures);
+ return seekFuture;
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 8680f0f0e6c..70d57db3bb6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -95,10 +95,7 @@ class NegativeAcksTracker implements Closeable {
}
private synchronized void add(MessageId messageId, int redeliveryCount) {
- if (messageId instanceof TopicMessageIdImpl) {
- TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
- messageId = topicMessageId.getInnerMessageId();
- }
+ messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index c20960950d5..941f18cf65a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -19,8 +19,9 @@
package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
-public class TopicMessageIdImpl implements MessageId {
+public class TopicMessageIdImpl implements TopicMessageId {
/** This topicPartitionName is get from ConsumerImpl, it contains
partition part. */
private final String topicPartitionName;
@@ -37,6 +38,7 @@ public class TopicMessageIdImpl implements MessageId {
* Get the topic name without partition part of this message.
* @return the name of the topic on which this message was published
*/
+ @Deprecated
public String getTopicName() {
return this.topicName;
}
@@ -45,6 +47,7 @@ public class TopicMessageIdImpl implements MessageId {
* Get the topic name which contains partition part for this message.
* @return the topic name which contains Partition part
*/
+ @Deprecated
public String getTopicPartitionName() {
return this.topicPartitionName;
}
@@ -77,4 +80,9 @@ public class TopicMessageIdImpl implements MessageId {
public int compareTo(MessageId o) {
return messageId.compareTo(o);
}
+
+ @Override
+ public String getOwnerTopic() {
+ return topicPartitionName;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index f6c33cc930f..c3fcb0a16a3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -36,14 +36,13 @@ public class TopicMessageImpl<T> implements Message<T> {
final ConsumerImpl receivedByconsumer;
TopicMessageImpl(String topicPartitionName,
- String topicName,
Message<T> msg,
ConsumerImpl receivedByConsumer) {
this.topicPartitionName = topicPartitionName;
this.receivedByconsumer = receivedByConsumer;
this.msg = msg;
- this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName,
msg.getMessageId());
+ this.messageId = new TopicMessageIdImpl(topicPartitionName,
topicPartitionName, msg.getMessageId());
}
/**
@@ -59,6 +58,7 @@ public class TopicMessageImpl<T> implements Message<T> {
* Get the topic name which contains partition part for this message.
* @return the topic name which contains Partition part
*/
+ @Deprecated
public String getTopicPartitionName() {
return topicPartitionName;
}
@@ -68,8 +68,9 @@ public class TopicMessageImpl<T> implements Message<T> {
return messageId;
}
+ @Deprecated
public MessageId getInnerMessageId() {
- return messageId.getInnerMessageId();
+ return MessageIdImpl.convertToMessageIdImpl(messageId);
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
index 907e6109e19..823dd4ad5f4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedeliveryTracker {
@@ -41,8 +42,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedelive
Entry<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>>
entry = iterator.next();
UnackMessageIdWrapper messageIdWrapper = entry.getKey();
MessageId messageId = messageIdWrapper.getMessageId();
- if (messageId instanceof TopicMessageIdImpl
- && ((TopicMessageIdImpl)
messageId).getTopicPartitionName().contains(topicName)) {
+ if (messageId instanceof TopicMessageId
+ && ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
HashSet<UnackMessageIdWrapper> exist =
redeliveryMessageIdPartitionMap.get(messageIdWrapper);
entry.getValue().remove(messageIdWrapper);
iterator.remove();
@@ -54,8 +55,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedelive
Iterator<MessageId> iteratorAckTimeOut =
ackTimeoutMessages.keySet().iterator();
while (iterator.hasNext()) {
MessageId messageId = iteratorAckTimeOut.next();
- if (messageId instanceof TopicMessageIdImpl
- && ((TopicMessageIdImpl)
messageId).getTopicPartitionName().contains(topicName)) {
+ if (messageId instanceof TopicMessageId
+ && ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
iterator.remove();
removed++;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index e8e80c5e690..1cbab584404 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
@@ -39,8 +40,8 @@ public class UnAckedTopicMessageTracker extends
UnAckedMessageTracker {
while (iterator.hasNext()) {
Entry<MessageId, HashSet<MessageId>> entry = iterator.next();
MessageId messageId = entry.getKey();
- if (messageId instanceof TopicMessageIdImpl
- && ((TopicMessageIdImpl)
messageId).getTopicPartitionName().contains(topicName)) {
+ if (messageId instanceof TopicMessageId
+ && ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
entry.getValue().remove(messageId);
iterator.remove();
removed++;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index ad56df918c0..feb4539971a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -63,7 +63,7 @@ public class MessageTest {
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload,
Schema.BYTES, null);
msg.setMessageId(new MessageIdImpl(-1, -1, -1));
- TopicMessageImpl<byte[]> topicMessage = new
TopicMessageImpl<>(topicName, topicName, msg, null);
+ TopicMessageImpl<byte[]> topicMessage = new
TopicMessageImpl<>(topicName, msg, null);
assertTrue(topicMessage.isReplicated());
assertEquals(msg.getReplicatedFrom(), from);
@@ -76,7 +76,7 @@ public class MessageTest {
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload,
Schema.BYTES, null);
msg.setMessageId(new MessageIdImpl(-1, -1, -1));
- TopicMessageImpl<byte[]> topicMessage = new
TopicMessageImpl<>(topicName, topicName, msg, null);
+ TopicMessageImpl<byte[]> topicMessage = new
TopicMessageImpl<>(topicName, msg, null);
assertFalse(topicMessage.isReplicated());
assertNull(topicMessage.getReplicatedFrom());
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 96f6f4708a7..bda99a39478 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -47,7 +47,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -315,9 +314,7 @@ public class FunctionCommon {
}
public static final long getSequenceId(MessageId messageId) {
- MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof
TopicMessageIdImpl)
- ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
- : messageId);
+ MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(messageId);
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 1aa99722512..579b4233399 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
@@ -292,8 +293,8 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
private void handleAck(ConsumerCommand command) throws IOException {
// We should have received an ack
- MessageId msgId =
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
- topic.toString());
+ TopicMessageId msgId = TopicMessageId.create(topic.toString(),
+
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received ack request of message {} from {} ",
consumer.getTopic(),
subscription, msgId,
getRemote().getInetSocketAddress().toString());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 0d523ff56ea..76f09675245 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -40,8 +40,8 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
@@ -219,7 +219,7 @@ public class SemanticsTest extends PulsarTestSuite {
Message<String> m = consumer.receive();
int topicIdx;
if (numTopics > 1) {
- String topic = ((TopicMessageIdImpl)
m.getMessageId()).getTopicPartitionName();
+ String topic = ((TopicMessageId)
m.getMessageId()).getOwnerTopic();
String[] topicParts = StringUtils.split(topic, '-');
topicIdx =
Integer.parseInt(topicParts[topicParts.length - 1]);