This is an automated email from the ASF dual-hosted git repository. zhangyifan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 17fd93255 [java] update lastPropagatedTimestamp and resourceMetrics in gotNextRow callback 17fd93255 is described below commit 17fd93255e124191c840702dab3eed961aadee7f Author: shenxingwuying <shenxingwuy...@gmail.com> AuthorDate: Wed Apr 27 23:32:57 2022 +0800 [java] update lastPropagatedTimestamp and resourceMetrics in gotNextRow callback As Andrew Wong said in https://gerrit.cloudera.org/18420, Client should update lastPropagatedTimestamp and resourceMetrics when client received the second or later scan response from tserver. Change-Id: I26308754ca741276204cc95cee4f8e4a91dbf331 Reviewed-on: http://gerrit.cloudera.org:8080/18453 Tested-by: Kudu Jenkins Reviewed-by: Yifan Zhang <chinazhangyi...@163.com> --- .../org/apache/kudu/client/AsyncKuduScanner.java | 21 +++++++++++++++++++++ .../apache/kudu/client/ITScannerMultiTablet.java | 17 +++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 77e39b5be..49c188901 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -689,10 +689,31 @@ public final class AsyncKuduScanner { new Callback<RowResultIterator, Response>() { @Override public RowResultIterator call(final Response resp) { + long lastPropagatedTimestamp = AsyncKuduClient.NO_TIMESTAMP; + if (readMode == ReadMode.READ_YOUR_WRITES && + resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + // For READ_YOUR_WRITES mode, update the latest propagated timestamp + // with the chosen snapshot timestamp sent back from the server, to + // avoid unnecessarily wait for subsequent reads. Since as long as + // the chosen snapshot timestamp of the next read is greater than + // the previous one, the scan does not violate READ_YOUR_WRITES + // session guarantees. + lastPropagatedTimestamp = resp.scanTimestamp; + } else if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + // Otherwise we just use the propagated timestamp returned from + // the server as the latest propagated timestamp. + lastPropagatedTimestamp = resp.propagatedTimestamp; + } + if (lastPropagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { + client.updateLastPropagatedTimestamp(lastPropagatedTimestamp); + } numRowsReturned += resp.data.getNumRows(); if (isFaultTolerant && resp.lastPrimaryKey != null) { lastPrimaryKey = resp.lastPrimaryKey; } + if (resp.resourceMetricsPb != null) { + resourceMetrics.update(resp.resourceMetricsPb); + } if (!resp.more) { // We're done scanning this tablet. scanFinished(); return resp.data; diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java index 16a948693..1f61d2c73 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java @@ -53,6 +53,7 @@ public class ITScannerMultiTablet { ITScannerMultiTablet.class.getName() + "-" + System.currentTimeMillis(); protected static final int ROW_COUNT = 20000; protected static final int TABLET_COUNT = 3; + protected static final String METRIC_NAME = "total_duration_nanos"; private static Schema schema = getBasicSchema(); protected KuduTable table; @@ -222,6 +223,12 @@ public class ITScannerMultiTablet { int previousRow = Integer.MIN_VALUE; boolean faultInjected = !this.enableFaultInjection; int faultInjectionLowBound = (ROW_COUNT / TABLET_COUNT / 2); + boolean firstScanRequest = true; + + long firstScannedMetric = 0; + long firstPropagatedTimestamp = 0; + long lastScannedMetric = 0; + long lastPropagatedTimestamp = 0; while (scanner.hasMoreRows()) { RowResultIterator rri = scanner.nextRows(); while (rri.hasNext()) { @@ -234,11 +241,21 @@ public class ITScannerMultiTablet { if (!faultInjected && rowCount > faultInjectionLowBound) { harness.restartTabletServer(scanner.currentTablet()); faultInjected = true; + } else { + if (firstScanRequest) { + firstScannedMetric = scanner.getResourceMetrics().getMetric(METRIC_NAME); + firstPropagatedTimestamp = harness.getClient().getLastPropagatedTimestamp(); + firstScanRequest = false; + } + lastScannedMetric = scanner.getResourceMetrics().getMetric(METRIC_NAME); + lastPropagatedTimestamp = harness.getClient().getLastPropagatedTimestamp(); } previousRow = key; rowCount++; } } + assertTrue(lastScannedMetric != firstScannedMetric); + assertTrue(lastPropagatedTimestamp > firstPropagatedTimestamp); } catch (Exception e) { LOG.error("Scan error, {}", e.getMessage()); e.printStackTrace();