This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a9186514089 [improve][client] PIP-224: Add getLastMessageIds API
(#20040)
a9186514089 is described below
commit a91865140893f7e9737f6ce9ffc052584c721884
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Apr 11 10:25:57 2023 +0800
[improve][client] PIP-224: Add getLastMessageIds API (#20040)
Co-authored-by: Baodi Shi <[email protected]>
---
.../apache/pulsar/client/api/TopicReaderTest.java | 6 ++++
.../pulsar/client/impl/TopicsConsumerImplTest.java | 37 ++++++++++++++++++++++
.../org/apache/pulsar/client/api/Consumer.java | 21 ++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 15 +++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++++
.../pulsar/client/impl/MultiMessageIdImpl.java | 1 +
.../client/impl/MultiTopicsConsumerImpl.java | 13 ++++++++
7 files changed, 100 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index c2eb957ee60..424081b904c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -1096,6 +1096,12 @@ public class TopicReaderTest extends
ProducerConsumerBase {
assertFalse(lastMsgId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
+ List<TopicMessageId> lastMsgIds =
reader.getConsumer().getLastMessageIds();
+ assertEquals(lastMsgIds.size(), 1);
+ assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName);
+ MessageIdAdv lastMsgIdAdv = (MessageIdAdv) lastMsgIds.get(0);
+ assertEquals(lastMsgIdAdv.getLedgerId(), messageId.getLedgerId());
+ assertEquals(lastMsgIdAdv.getEntryId(), messageId.getEntryId());
reader.close();
CountDownLatch latch = new CountDownLatch(numOfMessage);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index ce4a0ae86ac..73fe9799642 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
@@ -42,7 +43,9 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -1097,6 +1100,11 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);
+ final Set<String> topics = new HashSet<>();
+ topics.add(topicName1);
+ IntStream.range(0, 2).forEach(i -> topics.add(topicName2 +
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+ IntStream.range(0, 3).forEach(i -> topics.add(topicName3 +
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+
// 1. producer connect
Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicName1)
.enableBatching(false)
@@ -1146,12 +1154,27 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
}
});
+ List<TopicMessageId> msgIds = consumer.getLastMessageIds();
+ assertEquals(msgIds.size(), 6);
+
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()),
topics);
+ for (TopicMessageId msgId : msgIds) {
+ int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1;
+ if (msgId.getOwnerTopic().equals(topicName1)) {
+ assertEquals(numMessages, totalMessages);
+ } else if (msgId.getOwnerTopic().startsWith(topicName2)) {
+ assertEquals(numMessages, totalMessages / 2);
+ } else {
+ assertEquals(numMessages, totalMessages / 3);
+ }
+ }
+
for (int i = 0; i < totalMessages; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}
+
messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId;
@@ -1170,6 +1193,20 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
}
});
+ msgIds = consumer.getLastMessageIds();
+ assertEquals(msgIds.size(), 6);
+
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()),
topics);
+ for (TopicMessageId msgId : msgIds) {
+ int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1;
+ if (msgId.getOwnerTopic().equals(topicName1)) {
+ assertEquals(numMessages, totalMessages * 2);
+ } else if (msgId.getOwnerTopic().startsWith(topicName2)) {
+ assertEquals(numMessages, totalMessages);
+ } else {
+ assertEquals(numMessages, totalMessages / 3 * 2);
+ }
+ }
+
consumer.unsubscribe();
consumer.close();
producer1.close();
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 69409900496..88ad24fe1f4 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import java.io.Closeable;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -539,16 +540,36 @@ public interface Consumer<T> extends Closeable,
MessageAcknowledger {
* Get the last message id available for consume.
*
* @return the last message id.
+ * @apiNote If the consumer is a multi-topics consumer, the returned value
cannot be used anywhere.
+ * @deprecated Use {@link Consumer#getLastMessageIds()} instead.
*/
+ @Deprecated
MessageId getLastMessageId() throws PulsarClientException;
/**
* Get the last message id available for consume.
*
* @return a future that can be used to track the completion of the
operation.
+ * @deprecated Use {@link Consumer#getLastMessageIdsAsync()}} instead.
*/
+ @Deprecated
CompletableFuture<MessageId> getLastMessageIdAsync();
+ /**
+ * Get all the last message id of the topics the consumer subscribed.
+ *
+ * @return the list of TopicMessageId instances of all the topics that the
consumer subscribed
+ * @throws PulsarClientException if failed to get last message id.
+ * @apiNote It's guaranteed that the owner topic of each TopicMessageId in
the returned list is different from owner
+ * topics of other TopicMessageId instances
+ */
+ List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
+
+ /**
+ * The asynchronous version of {@link Consumer#getLastMessageIds()}.
+ */
+ CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
+
/**
* @return Whether the consumer is connected to the broker
*/
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 973b3302f41..0db2a8e0ab9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -730,6 +731,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
public abstract CompletableFuture<Void> closeAsync();
+ @Deprecated
@Override
public MessageId getLastMessageId() throws PulsarClientException {
try {
@@ -742,9 +744,22 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
}
+ @Deprecated
@Override
public abstract CompletableFuture<MessageId> getLastMessageIdAsync();
+ @Override
+ public List<TopicMessageId> getLastMessageIds() throws
PulsarClientException {
+ try {
+ return getLastMessageIdsAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
+ throw PulsarClientException.unwrap(e);
+ }
+ }
+
private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
return SubscriptionType.Shared != type && SubscriptionType.Key_Shared
!= type;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fb372566426..cc016093196 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2336,11 +2336,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ @Deprecated
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
}
+ @Override
+ public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+ return getLastMessageIdAsync()
+ .thenApply(msgId ->
Collections.singletonList(TopicMessageId.create(topic, msgId)));
+ }
+
public CompletableFuture<GetLastMessageIdResponse>
internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
index 6e60239ffe5..f40e3476dd0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.MessageId;
* This is useful when MessageId is need for partition/multi-topics/pattern
consumer.
* e.g. seek(), ackCumulative(), getLastMessageId().
*/
+@Deprecated
public class MultiMessageIdImpl implements MessageId {
@Getter
private Map<String, MessageId> map;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5fe0e4a82b8..ef0345de919 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1468,6 +1468,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return partitionsAutoUpdateTimeout;
}
+ @Deprecated
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
CompletableFuture<MessageId> returnFuture = new CompletableFuture<>();
@@ -1496,6 +1497,18 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return returnFuture;
}
+ @Override
+ public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+ final List<CompletableFuture<List<TopicMessageId>>> futures =
consumers.values().stream()
+ .map(ConsumerImpl::getLastMessageIdsAsync)
+ .collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenApply(__ -> {
+ final List<TopicMessageId> messageIds = new ArrayList<>();
+
futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll);
+ return messageIds;
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {