codelipenghui commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1372536188


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java:
##########
@@ -110,4 +110,37 @@ public interface TableView<T> extends Closeable {
      * @return a future that can used to track when the table view has been 
closed.
      */
     CompletableFuture<Void> closeAsync();
+
+
+    /**
+     *

Review Comment:
   Remove this line.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java:
##########
@@ -110,4 +110,37 @@ public interface TableView<T> extends Closeable {
      * @return a future that can used to track when the table view has been 
closed.
      */
     CompletableFuture<Void> closeAsync();
+
+

Review Comment:
   Remove the extra empty line.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -230,25 +230,35 @@ private void handleMessage(Message<T> msg) {
         }
     }
 
-    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> 
reader) {
+    @Override
+    public CompletableFuture<Void> refreshAsync() {
+        return reader.thenCompose(reader -> readAllExistingMessages(reader, 
false));
+    }
+
+    @Override
+    public void refresh() throws PulsarClientException {
+        refreshAsync().join();
+    }
+
+    private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader, 
boolean readTailMessages) {
         long startTime = System.nanoTime();
         AtomicLong messagesRead = new AtomicLong();
 
-        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        readAllExistingMessages(reader, future, startTime, messagesRead);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        readAllExistingMessages(reader, future, startTime, messagesRead, 
readTailMessages);
         return future;
     }
 
-    private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Reader<T>> future, long startTime,
-                                         AtomicLong messagesRead) {
+    private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Void> future, long startTime,
+                                         AtomicLong messagesRead, boolean 
readTailMessages) {
         reader.hasMessageAvailableAsync()
                 .thenAccept(hasMessage -> {
                    if (hasMessage) {
                        reader.readNextAsync()
                                .thenAccept(msg -> {
                                   messagesRead.incrementAndGet();
                                   handleMessage(msg);
-                                  readAllExistingMessages(reader, future, 
startTime, messagesRead);
+                                  readAllExistingMessages(reader, future, 
startTime, messagesRead, readTailMessages);

Review Comment:
   If I understand correctly, this PR will depend on 
https://github.com/apache/pulsar/pull/21270, right? Otherwise, the 
implementation will not conform to [what the API 
said](https://github.com/apache/pulsar/pull/21417/files#diff-4ec98b75d3bcad44ec9b4ee0a8853435c9f20e37880a917a544797a8942f4e41R123-R124).



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -232,33 +232,33 @@ private void handleMessage(Message<T> msg) {
 
     @Override
     public CompletableFuture<Void> refreshAsync() {
-        return reader.thenCompose(this::readAllExistingMessages);
+        return reader.thenCompose(reader -> readAllExistingMessages(reader, 
false));

Review Comment:
   Maybe we should consider having a callback queue for the refresh API. For 
example:
   
   ```
   | LastMessageIds -> Future | LastMessageIds -> Future | LastMessageIds -> 
Future | LastMessageIds -> Future |
   ```
   
   For the refresh request, we can try to get the last message IDs from the 
broker and then add it along with the Future to the queue.
   
   ```
   | [{topic1-> (1,2), topic2 -> {3,2}}] -> Future |
   ```
   
   Then returns the future to the caller.
   
   The tailing read will check the header of the queue and try to complete the 
future if all the last message IDs are read. 



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -129,6 +129,23 @@ private Set<String> publishMessages(String topic, int 
count, boolean enableBatch
         return keys;
     }
 
+    @Test
+    public void testRefreshAPI() throws Exception {
+        String topic = "persistent://public/default/testRefreshAPI";
+        admin.topics().createPartitionedTopic(topic, 3);
+        int count = 50;
+        Set<String> keys = this.publishMessages(topic, count, false);
+        @Cleanup
+        TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES)
+                .topic(topic)
+                .create();
+        // Call the refresh API to update the tableview. After this operation,
+        // there is no need to use 'Awaitility.await()' when assert the size 
and keys of 'tv'.

Review Comment:
   You should publish more messages since creating the table view will also 
load all the data from the server side before it is available for users.
   
   We should also consider stopping or slowing the `readTailMessages` method to 
make sure the refreshed data is contributed by the refresh API, not the 
`readTailMessages` method. Otherwise, you don't if this test can protect the 
newly added refresh API to avoid break changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to