Repository: kudu
Updated Branches:
  refs/heads/master 3f194c3a1 -> a95441846


KUDU-16: implement Java client limits

This patch implements the scanner limits for the Java client. Each
AsyncKuduScanner will maintain a count of the number of rows already
returned (per table scan), and based on this value, will update the
per-tablet scan request.

Change-Id: I83635fbbc7714318f8b95d91b7f178e9ca7ebff7
Reviewed-on: http://gerrit.cloudera.org:8080/9926
Reviewed-by: Grant Henke <granthe...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a9544184
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a9544184
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a9544184

Branch: refs/heads/master
Commit: a95441846bb1381ff2543ffcddf3854184a111ea
Parents: 3f194c3
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Apr 4 12:48:53 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Sat Apr 7 00:15:23 2018 +0000

----------------------------------------------------------------------
 .../kudu/client/AbstractKuduScannerBuilder.java |  4 --
 .../apache/kudu/client/AsyncKuduScanner.java    | 12 +++--
 .../org/apache/kudu/client/TestKuduClient.java  | 48 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a9544184/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index f27c62b..4485eec 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -197,10 +197,6 @@ public abstract class AbstractKuduScannerBuilder
    * Sets a limit on the number of rows that will be returned by the scanner. 
There's no limit
    * by default.
    *
-   * WARNING: Currently setting the limit has no effect.
-   * See <a href="https://issues.apache.org/jira/browse/KUDU-16";>KUDU-16</a> 
for more
-   * information.
-   *
    * @param limit a positive long
    * @return this instance
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/a9544184/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 24581af..a2dd160 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -209,6 +209,8 @@ public final class AsyncKuduScanner {
 
   private boolean hasMore = true;
 
+  private long numRowsReturned = 0;
+
   /**
    * The tabletSlice currently being scanned.
    * If null, we haven't started scanning.
@@ -436,6 +438,8 @@ public final class AsyncKuduScanner {
             lastPrimaryKey = resp.lastPrimaryKey;
           }
 
+          numRowsReturned += resp.data.getNumRows();
+
           if (!resp.more || resp.scannerId == null) {
             scanFinished();
             return Deferred.fromResult(resp.data); // there might be data to 
return
@@ -519,6 +523,7 @@ public final class AsyncKuduScanner {
   private final Callback<RowResultIterator, Response> gotNextRow =
       new Callback<RowResultIterator, Response>() {
         public RowResultIterator call(final Response resp) {
+          numRowsReturned += resp.data.getNumRows();
           if (!resp.more) {  // We're done scanning this tablet.
             scanFinished();
             return resp.data;
@@ -566,8 +571,9 @@ public final class AsyncKuduScanner {
   void scanFinished() {
     Partition partition = tablet.getPartition();
     pruner.removePartitionKeyRange(partition.getPartitionKeyEnd());
-    // Stop scanning if we have scanned until or past the end partition key.
-    if (!pruner.hasMorePartitionKeyRanges()) {
+    // Stop scanning if we have scanned until or past the end partition key, or
+    // if we have fulfilled the limit.
+    if (!pruner.hasMorePartitionKeyRanges() || numRowsReturned >= limit) {
       hasMore = false;
       closed = true; // the scanner is closed on the other side at this point
       return;
@@ -819,7 +825,7 @@ public final class AsyncKuduScanner {
           // is the easiest way.
           AsyncKuduScanner.this.tablet = super.getTablet();
           NewScanRequestPB.Builder newBuilder = NewScanRequestPB.newBuilder();
-          newBuilder.setLimit(limit); // currently ignored
+          newBuilder.setLimit(limit - AsyncKuduScanner.this.numRowsReturned);
           
newBuilder.addAllProjectedColumns(ProtobufHelper.schemaToListPb(schema));
           
newBuilder.setTabletId(UnsafeByteOperations.unsafeWrap(tablet.getTabletIdAsBytes()));
           newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());

http://git-wip-us.apache.org/repos/asf/kudu/blob/a9544184/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index d527bd0..0d61e58 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -490,6 +490,54 @@ public class TestKuduClient extends BaseKuduTest {
   }
 
   /**
+  * Test scanning with limits.
+  */
+  @Test
+  public void testScanWithLimit() throws Exception {
+    syncClient.createTable(tableName, basicSchema, 
getBasicTableOptionsWithNonCoveredRange());
+    KuduTable table = syncClient.openTable(tableName);
+    KuduSession session = syncClient.newSession();
+    int num_rows = 100;
+    for (int key = 0; key < num_rows; key++) {
+      session.apply(createBasicSchemaInsert(table, key));
+    }
+
+    // Test with some non-positive limits, expecting to raise an exception.
+    int non_positives[] = { -1, 0 };
+    for (int limit : non_positives) {
+      try {
+        KuduScanner scanner = syncClient.newScannerBuilder(table)
+                                        .limit(limit)
+                                        .build();
+        fail();
+      } catch (IllegalArgumentException e) {
+        assertTrue(e.getMessage().contains("Need a strictly positive number"));
+      }
+    }
+
+    // Test with a limit and ensure we get the expected number of rows.
+    int limits[] = { num_rows - 1, num_rows, num_rows + 1 };
+    for (int limit : limits) {
+      KuduScanner scanner = syncClient.newScannerBuilder(table)
+                                      .limit(limit)
+                                      .build();
+      int count = 0;
+      while (scanner.hasMoreRows()) {
+        count += scanner.nextRows().getNumRows();
+      }
+      assertEquals(Math.min(num_rows, limit), count);
+    }
+
+    // Now test with limits for async scanners.
+    for (int limit : limits) {
+      AsyncKuduScanner scanner = new 
AsyncKuduScanner.AsyncKuduScannerBuilder(client, table)
+                                                     .limit(limit)
+                                                     .build();
+      assertEquals(Math.min(limit, num_rows), countRowsInScan(scanner));
+    }
+  }
+
+  /**
    * Test scanning with predicates.
    */
   @Test

Reply via email to