This is an automated email from the ASF dual-hosted git repository.

zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 17fd93255 [java] update lastPropagatedTimestamp and resourceMetrics in 
gotNextRow callback
17fd93255 is described below

commit 17fd93255e124191c840702dab3eed961aadee7f
Author: shenxingwuying <shenxingwuy...@gmail.com>
AuthorDate: Wed Apr 27 23:32:57 2022 +0800

    [java] update lastPropagatedTimestamp and resourceMetrics in gotNextRow 
callback
    
    As Andrew Wong said in https://gerrit.cloudera.org/18420,
    Client should update lastPropagatedTimestamp and resourceMetrics
    when client received the second or later scan response from tserver.
    
    Change-Id: I26308754ca741276204cc95cee4f8e4a91dbf331
    Reviewed-on: http://gerrit.cloudera.org:8080/18453
    Tested-by: Kudu Jenkins
    Reviewed-by: Yifan Zhang <chinazhangyi...@163.com>
---
 .../org/apache/kudu/client/AsyncKuduScanner.java    | 21 +++++++++++++++++++++
 .../apache/kudu/client/ITScannerMultiTablet.java    | 17 +++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 77e39b5be..49c188901 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -689,10 +689,31 @@ public final class AsyncKuduScanner {
       new Callback<RowResultIterator, Response>() {
         @Override
         public RowResultIterator call(final Response resp) {
+          long lastPropagatedTimestamp = AsyncKuduClient.NO_TIMESTAMP;
+          if (readMode == ReadMode.READ_YOUR_WRITES &&
+              resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            // For READ_YOUR_WRITES mode, update the latest propagated 
timestamp
+            // with the chosen snapshot timestamp sent back from the server, to
+            // avoid unnecessarily wait for subsequent reads. Since as long as
+            // the chosen snapshot timestamp of the next read is greater than
+            // the previous one, the scan does not violate READ_YOUR_WRITES
+            // session guarantees.
+            lastPropagatedTimestamp = resp.scanTimestamp;
+          } else if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) 
{
+            // Otherwise we just use the propagated timestamp returned from
+            // the server as the latest propagated timestamp.
+            lastPropagatedTimestamp = resp.propagatedTimestamp;
+          }
+          if (lastPropagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            client.updateLastPropagatedTimestamp(lastPropagatedTimestamp);
+          }
           numRowsReturned += resp.data.getNumRows();
           if (isFaultTolerant && resp.lastPrimaryKey != null) {
             lastPrimaryKey = resp.lastPrimaryKey;
           }
+          if (resp.resourceMetricsPb != null) {
+            resourceMetrics.update(resp.resourceMetricsPb);
+          }
           if (!resp.more) {  // We're done scanning this tablet.
             scanFinished();
             return resp.data;
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 16a948693..1f61d2c73 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -53,6 +53,7 @@ public class ITScannerMultiTablet {
       ITScannerMultiTablet.class.getName() + "-" + System.currentTimeMillis();
   protected static final int ROW_COUNT = 20000;
   protected static final int TABLET_COUNT = 3;
+  protected static final String METRIC_NAME = "total_duration_nanos";
 
   private static Schema schema = getBasicSchema();
   protected KuduTable table;
@@ -222,6 +223,12 @@ public class ITScannerMultiTablet {
           int previousRow = Integer.MIN_VALUE;
           boolean faultInjected = !this.enableFaultInjection;
           int faultInjectionLowBound = (ROW_COUNT / TABLET_COUNT / 2);
+          boolean firstScanRequest = true;
+
+          long firstScannedMetric = 0;
+          long firstPropagatedTimestamp = 0;
+          long lastScannedMetric = 0;
+          long lastPropagatedTimestamp = 0;
           while (scanner.hasMoreRows()) {
             RowResultIterator rri = scanner.nextRows();
             while (rri.hasNext()) {
@@ -234,11 +241,21 @@ public class ITScannerMultiTablet {
               if (!faultInjected && rowCount > faultInjectionLowBound) {
                 harness.restartTabletServer(scanner.currentTablet());
                 faultInjected = true;
+              } else {
+                if (firstScanRequest) {
+                  firstScannedMetric = 
scanner.getResourceMetrics().getMetric(METRIC_NAME);
+                  firstPropagatedTimestamp = 
harness.getClient().getLastPropagatedTimestamp();
+                  firstScanRequest = false;
+                }
+                lastScannedMetric = 
scanner.getResourceMetrics().getMetric(METRIC_NAME);
+                lastPropagatedTimestamp = 
harness.getClient().getLastPropagatedTimestamp();
               }
               previousRow = key;
               rowCount++;
             }
           }
+          assertTrue(lastScannedMetric != firstScannedMetric);
+          assertTrue(lastPropagatedTimestamp > firstPropagatedTimestamp);
         } catch (Exception e) {
           LOG.error("Scan error, {}", e.getMessage());
           e.printStackTrace();

Reply via email to