codelipenghui commented on code in PR #21270: URL: https://github.com/apache/pulsar/pull/21270#discussion_r1372529642
########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ########## @@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { AtomicLong messagesRead = new AtomicLong(); CompletableFuture<Reader<T>> future = new CompletableFuture<>(); - readAllExistingMessages(reader, future, startTime, messagesRead); + reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { + Map<String, TopicMessageId> maxMessageIds = new HashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); + }); + readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); Review Comment: It's better to make `maxMessageIds` that passed to the `readAllExistingMessages` immutable to prohibit other threads from getting a chance to update the HashMap. ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ########## @@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { AtomicLong messagesRead = new AtomicLong(); CompletableFuture<Reader<T>> future = new CompletableFuture<>(); - readAllExistingMessages(reader, future, startTime, messagesRead); + reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { + Map<String, TopicMessageId> maxMessageIds = new HashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); + }); + readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); + }); Review Comment: The exception from `reader.getLastMessageIdsAsync()` should be handled. Otherwise, the `future` will never get a chance to be complete. ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ########## @@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { AtomicLong messagesRead = new AtomicLong(); CompletableFuture<Reader<T>> future = new CompletableFuture<>(); - readAllExistingMessages(reader, future, startTime, messagesRead); + reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { + Map<String, TopicMessageId> maxMessageIds = new HashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); + }); + readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); + }); return future; } private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime, - AtomicLong messagesRead) { + AtomicLong messagesRead, Map<String, TopicMessageId> maxMessageIds) { reader.hasMessageAvailableAsync() .thenAccept(hasMessage -> { if (hasMessage) { reader.readNextAsync() .thenAccept(msg -> { messagesRead.incrementAndGet(); handleMessage(msg); - readAllExistingMessages(reader, future, startTime, messagesRead); + // The message is read one by one in a single thread, + // so it's fine that uses a hashmap to store last message ID. + TopicMessageId maxMessageId = maxMessageIds.get(msg.getTopicName()); + if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) { + maxMessageIds.remove(msg.getTopicName()); + } + if (maxMessageIds.size() == 0) { Review Comment: nit ```suggestion if (maxMessageIds.isEmpty()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org