java: Move scan token tests into their own class

A follow-up commit will add additional tests there.

Change-Id: Iefc23f0193cb24a00005a5554881eedbaac15929
Reviewed-on: http://gerrit.cloudera.org:8080/10724
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <danburk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0cda8c8e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0cda8c8e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0cda8c8e

Branch: refs/heads/master
Commit: 0cda8c8ee9dbd982763c3706a6defde1fffc6d00
Parents: 8155d8c
Author: Mike Percy <mpe...@apache.org>
Authored: Fri Jun 22 19:44:14 2018 -0700
Committer: Mike Percy <mpe...@apache.org>
Committed: Tue Jul 3 02:58:04 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TestKuduClient.java  | 228 ------------------
 .../org/apache/kudu/client/TestScanToken.java   | 235 +++++++++++++++++++
 .../org/apache/kudu/util/ClientTestUtil.java    |  45 ++++
 3 files changed, 280 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0cda8c8e/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 3d84cb1..ce88fd7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -49,10 +49,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
 import com.stumbleupon.async.Deferred;
 
 import org.apache.kudu.util.TimestampUtil;
@@ -586,238 +584,12 @@ public class TestKuduClient extends BaseKuduTest {
     ).size());
   }
 
-  /**
-   * Counts the rows in the provided scan tokens.
-   */
-  private int countScanTokenRows(List<KuduScanToken> tokens) throws Exception {
-    final AtomicInteger count = new AtomicInteger(0);
-    List<Thread> threads = new ArrayList<>();
-    for (final KuduScanToken token : tokens) {
-      final byte[] serializedToken = token.serialize();
-      Thread thread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try (KuduClient contextClient = new 
KuduClient.KuduClientBuilder(masterAddresses)
-              .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
-              .build()) {
-            KuduScanner scanner = 
KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
-            try {
-              int localCount = 0;
-              while (scanner.hasMoreRows()) {
-                localCount += Iterators.size(scanner.nextRows());
-              }
-              count.addAndGet(localCount);
-            } finally {
-              scanner.close();
-            }
-          } catch (Exception e) {
-            LOG.error("exception in parallel token scanner", e);
-          }
-        }
-      });
-      thread.run();
-      threads.add(thread);
-    }
-
-    for (Thread thread : threads) {
-      thread.join();
-    }
-    return count.get();
-  }
-
   @Test
   public void testGetAuthnToken() throws Exception {
     byte[] token = client.exportAuthenticationCredentials().join();
     assertNotNull(token);
   }
 
-  /**
-   * Tests scan tokens by creating a set of scan tokens, serializing them, and
-   * then executing them in parallel with separate client instances. This
-   * simulates the normal usecase of scan tokens being created at a central
-   * planner and distributed to remote task executors.
-   */
-  @Test
-  public void testScanTokens() throws Exception {
-    int saveFetchTablets = AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP;
-    try {
-      // For this test, make sure that we cover the case that not all tablets
-      // are returned in a single batch.
-      AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = 4;
-
-      Schema schema = createManyStringsSchema();
-      CreateTableOptions createOptions = new CreateTableOptions();
-      createOptions.addHashPartitions(ImmutableList.of("key"), 8);
-
-      PartialRow splitRow = schema.newPartialRow();
-      splitRow.addString("key", "key_50");
-      createOptions.addSplitRow(splitRow);
-
-      syncClient.createTable(tableName, schema, createOptions);
-
-      KuduSession session = syncClient.newSession();
-      
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-      KuduTable table = syncClient.openTable(tableName);
-      for (int i = 0; i < 100; i++) {
-        Insert insert = table.newInsert();
-        PartialRow row = insert.getRow();
-        row.addString("key", String.format("key_%02d", i));
-        row.addString("c1", "c1_" + i);
-        row.addString("c2", "c2_" + i);
-        session.apply(insert);
-      }
-      session.flush();
-
-      KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
syncClient.newScanTokenBuilder(table);
-      tokenBuilder.batchSizeBytes(0);
-      tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
-      List<KuduScanToken> tokens = tokenBuilder.build();
-      assertEquals(16, tokens.size());
-
-      // KUDU-1809, with batchSizeBytes configured to '0',
-      // the first call to the tablet server won't return
-      // any data.
-      {
-        KuduScanner scanner = tokens.get(0).intoScanner(syncClient);
-        assertEquals(0, scanner.nextRows().getNumRows());
-      }
-
-      for (KuduScanToken token : tokens) {
-        // Sanity check to make sure the debug printing does not throw.
-        LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), 
syncClient));
-      }
-    } finally {
-      AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = saveFetchTablets;
-    }
-  }
-
-  /**
-   * Tests scan token creation and execution on a table with non-covering 
range partitions.
-   */
-  @Test
-  public void testScanTokensNonCoveringRangePartitions() throws Exception {
-    Schema schema = createManyStringsSchema();
-    CreateTableOptions createOptions = new CreateTableOptions();
-    createOptions.addHashPartitions(ImmutableList.of("key"), 2);
-
-    PartialRow lower = schema.newPartialRow();
-    PartialRow upper = schema.newPartialRow();
-    lower.addString("key", "a");
-    upper.addString("key", "f");
-    createOptions.addRangePartition(lower, upper);
-
-    lower = schema.newPartialRow();
-    upper = schema.newPartialRow();
-    lower.addString("key", "h");
-    upper.addString("key", "z");
-    createOptions.addRangePartition(lower, upper);
-
-    PartialRow split = schema.newPartialRow();
-    split.addString("key", "k");
-    createOptions.addSplitRow(split);
-
-    syncClient.createTable(tableName, schema, createOptions);
-
-    KuduSession session = syncClient.newSession();
-    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-    KuduTable table = syncClient.openTable(tableName);
-    for (char c = 'a'; c < 'f'; c++) {
-      Insert insert = table.newInsert();
-      PartialRow row = insert.getRow();
-      row.addString("key", "" + c);
-      row.addString("c1", "c1_" + c);
-      row.addString("c2", "c2_" + c);
-      session.apply(insert);
-    }
-    for (char c = 'h'; c < 'z'; c++) {
-      Insert insert = table.newInsert();
-      PartialRow row = insert.getRow();
-      row.addString("key", "" + c);
-      row.addString("c1", "c1_" + c);
-      row.addString("c2", "c2_" + c);
-      session.apply(insert);
-    }
-    session.flush();
-
-    KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
syncClient.newScanTokenBuilder(table);
-    tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
-    List<KuduScanToken> tokens = tokenBuilder.build();
-    assertEquals(6, tokens.size());
-    assertEquals('f' - 'a' + 'z' - 'h', countScanTokenRows(tokens));
-
-    for (KuduScanToken token : tokens) {
-      // Sanity check to make sure the debug printing does not throw.
-      LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), 
syncClient));
-    }
-  }
-
-  /**
-   * Tests the results of creating scan tokens, altering the columns being
-   * scanned, and then executing the scan tokens.
-   */
-  @Test
-  public void testScanTokensConcurrentAlterTable() throws Exception {
-    Schema schema = new Schema(ImmutableList.of(
-        new ColumnSchema.ColumnSchemaBuilder("key", 
Type.INT64).nullable(false).key(true).build(),
-        new ColumnSchema.ColumnSchemaBuilder("a", 
Type.INT64).nullable(false).key(false).build()
-    ));
-    CreateTableOptions createOptions = new CreateTableOptions();
-    createOptions.setRangePartitionColumns(ImmutableList.<String>of());
-    createOptions.setNumReplicas(1);
-    syncClient.createTable(tableName, schema, createOptions);
-
-    KuduTable table = syncClient.openTable(tableName);
-
-    KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
syncClient.newScanTokenBuilder(table);
-    List<KuduScanToken> tokens = tokenBuilder.build();
-    assertEquals(1, tokens.size());
-    KuduScanToken token = tokens.get(0);
-
-    // Drop a column
-    syncClient.alterTable(tableName, new AlterTableOptions().dropColumn("a"));
-    try {
-      token.intoScanner(syncClient);
-      fail();
-    } catch (IllegalArgumentException e) {
-      assertTrue(e.getMessage().contains("Unknown column"));
-    }
-
-    // Add back the column with the wrong type.
-    syncClient.alterTable(
-        tableName,
-        new AlterTableOptions().addColumn(
-            new ColumnSchema.ColumnSchemaBuilder("a", 
Type.STRING).nullable(true).build()));
-    try {
-      token.intoScanner(syncClient);
-      fail();
-    } catch (IllegalStateException e) {
-      assertTrue(e.getMessage().contains(
-          "invalid type INT64 for column 'a' in scan token, expected: 
STRING"));
-    }
-
-    // Add the column with the wrong nullability.
-    syncClient.alterTable(
-        tableName,
-        new AlterTableOptions().dropColumn("a")
-                               .addColumn(new 
ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
-                                                          
.nullable(true).build()));
-    try {
-      token.intoScanner(syncClient);
-      fail();
-    } catch (IllegalStateException e) {
-      assertTrue(e.getMessage().contains(
-          "invalid nullability for column 'a' in scan token, expected: NOT 
NULL"));
-    }
-
-    // Add the column with the correct type and nullability.
-    syncClient.alterTable(
-        tableName,
-        new AlterTableOptions().dropColumn("a")
-                               .addColumn(new 
ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
-                                                          .nullable(false)
-                                                          
.defaultValue(0L).build()));
-    token.intoScanner(syncClient);
-  }
 
   /**
    * Counts the rows in a table between two optional bounds.

http://git-wip-us.apache.org/repos/asf/kudu/blob/0cda8c8e/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
new file mode 100644
index 0000000..1761cf7
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.kudu.util.ClientTestUtil.countScanTokenRows;
+import static org.apache.kudu.util.ClientTestUtil.createManyStringsSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestScanToken extends BaseKuduTest {
+
+  private String testTableName;
+
+  @Before
+  public void setup() {
+    testTableName = getTestMethodNameWithTimestamp();
+  }
+
+  /**
+   * Tests scan tokens by creating a set of scan tokens, serializing them, and
+   * then executing them in parallel with separate client instances. This
+   * simulates the normal usecase of scan tokens being created at a central
+   * planner and distributed to remote task executors.
+   */
+  @Test
+  public void testScanTokens() throws Exception {
+    int saveFetchTablets = AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP;
+    try {
+      // For this test, make sure that we cover the case that not all tablets
+      // are returned in a single batch.
+      AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = 4;
+
+      Schema schema = createManyStringsSchema();
+      CreateTableOptions createOptions = new CreateTableOptions();
+      createOptions.addHashPartitions(ImmutableList.of("key"), 8);
+
+      PartialRow splitRow = schema.newPartialRow();
+      splitRow.addString("key", "key_50");
+      createOptions.addSplitRow(splitRow);
+
+      syncClient.createTable(testTableName, schema, createOptions);
+
+      KuduSession session = syncClient.newSession();
+      
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+      KuduTable table = syncClient.openTable(testTableName);
+      for (int i = 0; i < 100; i++) {
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addString("key", String.format("key_%02d", i));
+        row.addString("c1", "c1_" + i);
+        row.addString("c2", "c2_" + i);
+        session.apply(insert);
+      }
+      session.flush();
+
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
syncClient.newScanTokenBuilder(table);
+      tokenBuilder.batchSizeBytes(0);
+      tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+      List<KuduScanToken> tokens = tokenBuilder.build();
+      assertEquals(16, tokens.size());
+
+      // KUDU-1809, with batchSizeBytes configured to '0',
+      // the first call to the tablet server won't return
+      // any data.
+      {
+        KuduScanner scanner = tokens.get(0).intoScanner(syncClient);
+        assertEquals(0, scanner.nextRows().getNumRows());
+      }
+
+      for (KuduScanToken token : tokens) {
+        // Sanity check to make sure the debug printing does not throw.
+        LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), 
syncClient));
+      }
+    } finally {
+      AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = saveFetchTablets;
+    }
+  }
+
+  /**
+   * Tests scan token creation and execution on a table with non-covering 
range partitions.
+   */
+  @Test
+  public void testScanTokensNonCoveringRangePartitions() throws Exception {
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(ImmutableList.of("key"), 2);
+
+    PartialRow lower = schema.newPartialRow();
+    PartialRow upper = schema.newPartialRow();
+    lower.addString("key", "a");
+    upper.addString("key", "f");
+    createOptions.addRangePartition(lower, upper);
+
+    lower = schema.newPartialRow();
+    upper = schema.newPartialRow();
+    lower.addString("key", "h");
+    upper.addString("key", "z");
+    createOptions.addRangePartition(lower, upper);
+
+    PartialRow split = schema.newPartialRow();
+    split.addString("key", "k");
+    createOptions.addSplitRow(split);
+
+    syncClient.createTable(testTableName, schema, createOptions);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    KuduTable table = syncClient.openTable(testTableName);
+    for (char c = 'a'; c < 'f'; c++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", "" + c);
+      row.addString("c1", "c1_" + c);
+      row.addString("c2", "c2_" + c);
+      session.apply(insert);
+    }
+    for (char c = 'h'; c < 'z'; c++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", "" + c);
+      row.addString("c1", "c1_" + c);
+      row.addString("c2", "c2_" + c);
+      session.apply(insert);
+    }
+    session.flush();
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
syncClient.newScanTokenBuilder(table);
+    tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(6, tokens.size());
+    assertEquals('f' - 'a' + 'z' - 'h',
+                 countScanTokenRows(tokens,
+                                    syncClient.getMasterAddressesAsString(),
+                                    
syncClient.getDefaultOperationTimeoutMs()));
+
+    for (KuduScanToken token : tokens) {
+      // Sanity check to make sure the debug printing does not throw.
+      LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), 
syncClient));
+    }
+  }
+
+  /**
+   * Tests the results of creating scan tokens, altering the columns being
+   * scanned, and then executing the scan tokens.
+   */
+  @Test
+  public void testScanTokensConcurrentAlterTable() throws Exception {
+    Schema schema = new Schema(ImmutableList.of(
+        new ColumnSchema.ColumnSchemaBuilder("key", 
Type.INT64).nullable(false).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("a", 
Type.INT64).nullable(false).key(false).build()
+    ));
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.<String>of());
+    createOptions.setNumReplicas(1);
+    syncClient.createTable(testTableName, schema, createOptions);
+
+    KuduTable table = syncClient.openTable(testTableName);
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
syncClient.newScanTokenBuilder(table);
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(1, tokens.size());
+    KuduScanToken token = tokens.get(0);
+
+    // Drop a column
+    syncClient.alterTable(testTableName, new 
AlterTableOptions().dropColumn("a"));
+    try {
+      token.intoScanner(syncClient);
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("Unknown column"));
+    }
+
+    // Add back the column with the wrong type.
+    syncClient.alterTable(
+        testTableName,
+        new AlterTableOptions().addColumn(
+            new ColumnSchema.ColumnSchemaBuilder("a", 
Type.STRING).nullable(true).build()));
+    try {
+      token.intoScanner(syncClient);
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains(
+          "invalid type INT64 for column 'a' in scan token, expected: 
STRING"));
+    }
+
+    // Add the column with the wrong nullability.
+    syncClient.alterTable(
+        testTableName,
+        new AlterTableOptions().dropColumn("a")
+                               .addColumn(new 
ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
+                                                          
.nullable(true).build()));
+    try {
+      token.intoScanner(syncClient);
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains(
+          "invalid nullability for column 'a' in scan token, expected: NOT 
NULL"));
+    }
+
+    // Add the column with the correct type and nullability.
+    syncClient.alterTable(
+        testTableName,
+        new AlterTableOptions().dropColumn("a")
+                               .addColumn(new 
ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
+                                                          .nullable(false)
+                                                          
.defaultValue(0L).build()));
+    token.intoScanner(syncClient);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/0cda8c8e/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java 
b/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java
index 5476175..055955c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java
@@ -18,6 +18,7 @@
 package org.apache.kudu.util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
@@ -30,8 +31,10 @@ import org.apache.kudu.client.AsyncKuduScanner;
 import org.apache.kudu.client.AsyncKuduSession;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
@@ -40,6 +43,7 @@ import org.apache.kudu.client.RowResultIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -123,6 +127,47 @@ public abstract class ClientTestUtil {
     return countRowsInScan(scanBuilder.build());
   }
 
+  /**
+   * Counts the rows in the provided scan tokens.
+   */
+  public static int countScanTokenRows(List<KuduScanToken> tokens, final 
String masterAddresses,
+                                       final long operationTimeoutMs)
+      throws IOException, InterruptedException {
+    final AtomicInteger count = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<>();
+    for (final KuduScanToken token : tokens) {
+      final byte[] serializedToken = token.serialize();
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try (KuduClient contextClient = new 
KuduClient.KuduClientBuilder(masterAddresses)
+                   .defaultAdminOperationTimeoutMs(operationTimeoutMs)
+                   .build()) {
+            KuduScanner scanner = 
KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+            try {
+              int localCount = 0;
+              while (scanner.hasMoreRows()) {
+                localCount += Iterators.size(scanner.nextRows());
+              }
+              count.addAndGet(localCount);
+            } finally {
+              scanner.close();
+            }
+          } catch (Exception e) {
+            LOG.error("exception in parallel token scanner", e);
+          }
+        }
+      });
+      thread.run();
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    return count.get();
+  }
+
   public static List<String> scanTableToStrings(KuduTable table,
                                                 KuduPredicate... predicates) 
throws Exception {
     List<String> rowStrings = Lists.newArrayList();

Reply via email to