codelipenghui commented on code in PR #21417: URL: https://github.com/apache/pulsar/pull/21417#discussion_r1372536188
########## pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java: ########## @@ -110,4 +110,37 @@ public interface TableView<T> extends Closeable { * @return a future that can used to track when the table view has been closed. */ CompletableFuture<Void> closeAsync(); + + + /** + * Review Comment: Remove this line. ########## pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java: ########## @@ -110,4 +110,37 @@ public interface TableView<T> extends Closeable { * @return a future that can used to track when the table view has been closed. */ CompletableFuture<Void> closeAsync(); + + Review Comment: Remove the extra empty line. ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ########## @@ -230,25 +230,35 @@ private void handleMessage(Message<T> msg) { } } - private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { + @Override + public CompletableFuture<Void> refreshAsync() { + return reader.thenCompose(reader -> readAllExistingMessages(reader, false)); + } + + @Override + public void refresh() throws PulsarClientException { + refreshAsync().join(); + } + + private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader, boolean readTailMessages) { long startTime = System.nanoTime(); AtomicLong messagesRead = new AtomicLong(); - CompletableFuture<Reader<T>> future = new CompletableFuture<>(); - readAllExistingMessages(reader, future, startTime, messagesRead); + CompletableFuture<Void> future = new CompletableFuture<>(); + readAllExistingMessages(reader, future, startTime, messagesRead, readTailMessages); return future; } - private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime, - AtomicLong messagesRead) { + private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Void> future, long startTime, + AtomicLong messagesRead, boolean readTailMessages) { reader.hasMessageAvailableAsync() .thenAccept(hasMessage -> { if (hasMessage) { reader.readNextAsync() .thenAccept(msg -> { messagesRead.incrementAndGet(); handleMessage(msg); - readAllExistingMessages(reader, future, startTime, messagesRead); + readAllExistingMessages(reader, future, startTime, messagesRead, readTailMessages); Review Comment: If I understand correctly, this PR will depend on https://github.com/apache/pulsar/pull/21270, right? Otherwise, the implementation will not conform to [what the API said](https://github.com/apache/pulsar/pull/21417/files#diff-4ec98b75d3bcad44ec9b4ee0a8853435c9f20e37880a917a544797a8942f4e41R123-R124). ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ########## @@ -232,33 +232,33 @@ private void handleMessage(Message<T> msg) { @Override public CompletableFuture<Void> refreshAsync() { - return reader.thenCompose(this::readAllExistingMessages); + return reader.thenCompose(reader -> readAllExistingMessages(reader, false)); Review Comment: Maybe we should consider having a callback queue for the refresh API. For example: ``` | LastMessageIds -> Future | LastMessageIds -> Future | LastMessageIds -> Future | LastMessageIds -> Future | ``` For the refresh request, we can try to get the last message IDs from the broker and then add it along with the Future to the queue. ``` | [{topic1-> (1,2), topic2 -> {3,2}}] -> Future | ``` Then returns the future to the caller. The tailing read will check the header of the queue and try to complete the future if all the last message IDs are read. ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java: ########## @@ -129,6 +129,23 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch return keys; } + @Test + public void testRefreshAPI() throws Exception { + String topic = "persistent://public/default/testRefreshAPI"; + admin.topics().createPartitionedTopic(topic, 3); + int count = 50; + Set<String> keys = this.publishMessages(topic, count, false); + @Cleanup + TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + .topic(topic) + .create(); + // Call the refresh API to update the tableview. After this operation, + // there is no need to use 'Awaitility.await()' when assert the size and keys of 'tv'. Review Comment: You should publish more messages since creating the table view will also load all the data from the server side before it is available for users. We should also consider stopping or slowing the `readTailMessages` method to make sure the refreshed data is contributed by the refresh API, not the `readTailMessages` method. Otherwise, you don't if this test can protect the newly added refresh API to avoid break changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org