java: Move scan token tests into their own class A follow-up commit will add additional tests there.
Change-Id: Iefc23f0193cb24a00005a5554881eedbaac15929 Reviewed-on: http://gerrit.cloudera.org:8080/10724 Tested-by: Kudu Jenkins Reviewed-by: Dan Burkert <danburk...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0cda8c8e Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0cda8c8e Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0cda8c8e Branch: refs/heads/master Commit: 0cda8c8ee9dbd982763c3706a6defde1fffc6d00 Parents: 8155d8c Author: Mike Percy <mpe...@apache.org> Authored: Fri Jun 22 19:44:14 2018 -0700 Committer: Mike Percy <mpe...@apache.org> Committed: Tue Jul 3 02:58:04 2018 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/TestKuduClient.java | 228 ------------------ .../org/apache/kudu/client/TestScanToken.java | 235 +++++++++++++++++++ .../org/apache/kudu/util/ClientTestUtil.java | 45 ++++ 3 files changed, 280 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/0cda8c8e/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 3d84cb1..ce88fd7 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -49,10 +49,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; import com.stumbleupon.async.Deferred; import org.apache.kudu.util.TimestampUtil; @@ -586,238 +584,12 @@ public class TestKuduClient extends BaseKuduTest { ).size()); } - /** - * Counts the rows in the provided scan tokens. - */ - private int countScanTokenRows(List<KuduScanToken> tokens) throws Exception { - final AtomicInteger count = new AtomicInteger(0); - List<Thread> threads = new ArrayList<>(); - for (final KuduScanToken token : tokens) { - final byte[] serializedToken = token.serialize(); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses) - .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP) - .build()) { - KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient); - try { - int localCount = 0; - while (scanner.hasMoreRows()) { - localCount += Iterators.size(scanner.nextRows()); - } - count.addAndGet(localCount); - } finally { - scanner.close(); - } - } catch (Exception e) { - LOG.error("exception in parallel token scanner", e); - } - } - }); - thread.run(); - threads.add(thread); - } - - for (Thread thread : threads) { - thread.join(); - } - return count.get(); - } - @Test public void testGetAuthnToken() throws Exception { byte[] token = client.exportAuthenticationCredentials().join(); assertNotNull(token); } - /** - * Tests scan tokens by creating a set of scan tokens, serializing them, and - * then executing them in parallel with separate client instances. This - * simulates the normal usecase of scan tokens being created at a central - * planner and distributed to remote task executors. - */ - @Test - public void testScanTokens() throws Exception { - int saveFetchTablets = AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP; - try { - // For this test, make sure that we cover the case that not all tablets - // are returned in a single batch. - AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = 4; - - Schema schema = createManyStringsSchema(); - CreateTableOptions createOptions = new CreateTableOptions(); - createOptions.addHashPartitions(ImmutableList.of("key"), 8); - - PartialRow splitRow = schema.newPartialRow(); - splitRow.addString("key", "key_50"); - createOptions.addSplitRow(splitRow); - - syncClient.createTable(tableName, schema, createOptions); - - KuduSession session = syncClient.newSession(); - session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); - KuduTable table = syncClient.openTable(tableName); - for (int i = 0; i < 100; i++) { - Insert insert = table.newInsert(); - PartialRow row = insert.getRow(); - row.addString("key", String.format("key_%02d", i)); - row.addString("c1", "c1_" + i); - row.addString("c2", "c2_" + i); - session.apply(insert); - } - session.flush(); - - KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); - tokenBuilder.batchSizeBytes(0); - tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of()); - List<KuduScanToken> tokens = tokenBuilder.build(); - assertEquals(16, tokens.size()); - - // KUDU-1809, with batchSizeBytes configured to '0', - // the first call to the tablet server won't return - // any data. - { - KuduScanner scanner = tokens.get(0).intoScanner(syncClient); - assertEquals(0, scanner.nextRows().getNumRows()); - } - - for (KuduScanToken token : tokens) { - // Sanity check to make sure the debug printing does not throw. - LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), syncClient)); - } - } finally { - AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = saveFetchTablets; - } - } - - /** - * Tests scan token creation and execution on a table with non-covering range partitions. - */ - @Test - public void testScanTokensNonCoveringRangePartitions() throws Exception { - Schema schema = createManyStringsSchema(); - CreateTableOptions createOptions = new CreateTableOptions(); - createOptions.addHashPartitions(ImmutableList.of("key"), 2); - - PartialRow lower = schema.newPartialRow(); - PartialRow upper = schema.newPartialRow(); - lower.addString("key", "a"); - upper.addString("key", "f"); - createOptions.addRangePartition(lower, upper); - - lower = schema.newPartialRow(); - upper = schema.newPartialRow(); - lower.addString("key", "h"); - upper.addString("key", "z"); - createOptions.addRangePartition(lower, upper); - - PartialRow split = schema.newPartialRow(); - split.addString("key", "k"); - createOptions.addSplitRow(split); - - syncClient.createTable(tableName, schema, createOptions); - - KuduSession session = syncClient.newSession(); - session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); - KuduTable table = syncClient.openTable(tableName); - for (char c = 'a'; c < 'f'; c++) { - Insert insert = table.newInsert(); - PartialRow row = insert.getRow(); - row.addString("key", "" + c); - row.addString("c1", "c1_" + c); - row.addString("c2", "c2_" + c); - session.apply(insert); - } - for (char c = 'h'; c < 'z'; c++) { - Insert insert = table.newInsert(); - PartialRow row = insert.getRow(); - row.addString("key", "" + c); - row.addString("c1", "c1_" + c); - row.addString("c2", "c2_" + c); - session.apply(insert); - } - session.flush(); - - KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); - tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of()); - List<KuduScanToken> tokens = tokenBuilder.build(); - assertEquals(6, tokens.size()); - assertEquals('f' - 'a' + 'z' - 'h', countScanTokenRows(tokens)); - - for (KuduScanToken token : tokens) { - // Sanity check to make sure the debug printing does not throw. - LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), syncClient)); - } - } - - /** - * Tests the results of creating scan tokens, altering the columns being - * scanned, and then executing the scan tokens. - */ - @Test - public void testScanTokensConcurrentAlterTable() throws Exception { - Schema schema = new Schema(ImmutableList.of( - new ColumnSchema.ColumnSchemaBuilder("key", Type.INT64).nullable(false).key(true).build(), - new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64).nullable(false).key(false).build() - )); - CreateTableOptions createOptions = new CreateTableOptions(); - createOptions.setRangePartitionColumns(ImmutableList.<String>of()); - createOptions.setNumReplicas(1); - syncClient.createTable(tableName, schema, createOptions); - - KuduTable table = syncClient.openTable(tableName); - - KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); - List<KuduScanToken> tokens = tokenBuilder.build(); - assertEquals(1, tokens.size()); - KuduScanToken token = tokens.get(0); - - // Drop a column - syncClient.alterTable(tableName, new AlterTableOptions().dropColumn("a")); - try { - token.intoScanner(syncClient); - fail(); - } catch (IllegalArgumentException e) { - assertTrue(e.getMessage().contains("Unknown column")); - } - - // Add back the column with the wrong type. - syncClient.alterTable( - tableName, - new AlterTableOptions().addColumn( - new ColumnSchema.ColumnSchemaBuilder("a", Type.STRING).nullable(true).build())); - try { - token.intoScanner(syncClient); - fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains( - "invalid type INT64 for column 'a' in scan token, expected: STRING")); - } - - // Add the column with the wrong nullability. - syncClient.alterTable( - tableName, - new AlterTableOptions().dropColumn("a") - .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) - .nullable(true).build())); - try { - token.intoScanner(syncClient); - fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains( - "invalid nullability for column 'a' in scan token, expected: NOT NULL")); - } - - // Add the column with the correct type and nullability. - syncClient.alterTable( - tableName, - new AlterTableOptions().dropColumn("a") - .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) - .nullable(false) - .defaultValue(0L).build())); - token.intoScanner(syncClient); - } /** * Counts the rows in a table between two optional bounds. http://git-wip-us.apache.org/repos/asf/kudu/blob/0cda8c8e/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java new file mode 100644 index 0000000..1761cf7 --- /dev/null +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import com.google.common.collect.ImmutableList; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.apache.kudu.util.ClientTestUtil.countScanTokenRows; +import static org.apache.kudu.util.ClientTestUtil.createManyStringsSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestScanToken extends BaseKuduTest { + + private String testTableName; + + @Before + public void setup() { + testTableName = getTestMethodNameWithTimestamp(); + } + + /** + * Tests scan tokens by creating a set of scan tokens, serializing them, and + * then executing them in parallel with separate client instances. This + * simulates the normal usecase of scan tokens being created at a central + * planner and distributed to remote task executors. + */ + @Test + public void testScanTokens() throws Exception { + int saveFetchTablets = AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP; + try { + // For this test, make sure that we cover the case that not all tablets + // are returned in a single batch. + AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = 4; + + Schema schema = createManyStringsSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + createOptions.addHashPartitions(ImmutableList.of("key"), 8); + + PartialRow splitRow = schema.newPartialRow(); + splitRow.addString("key", "key_50"); + createOptions.addSplitRow(splitRow); + + syncClient.createTable(testTableName, schema, createOptions); + + KuduSession session = syncClient.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); + KuduTable table = syncClient.openTable(testTableName); + for (int i = 0; i < 100; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", String.format("key_%02d", i)); + row.addString("c1", "c1_" + i); + row.addString("c2", "c2_" + i); + session.apply(insert); + } + session.flush(); + + KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); + tokenBuilder.batchSizeBytes(0); + tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of()); + List<KuduScanToken> tokens = tokenBuilder.build(); + assertEquals(16, tokens.size()); + + // KUDU-1809, with batchSizeBytes configured to '0', + // the first call to the tablet server won't return + // any data. + { + KuduScanner scanner = tokens.get(0).intoScanner(syncClient); + assertEquals(0, scanner.nextRows().getNumRows()); + } + + for (KuduScanToken token : tokens) { + // Sanity check to make sure the debug printing does not throw. + LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), syncClient)); + } + } finally { + AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = saveFetchTablets; + } + } + + /** + * Tests scan token creation and execution on a table with non-covering range partitions. + */ + @Test + public void testScanTokensNonCoveringRangePartitions() throws Exception { + Schema schema = createManyStringsSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + createOptions.addHashPartitions(ImmutableList.of("key"), 2); + + PartialRow lower = schema.newPartialRow(); + PartialRow upper = schema.newPartialRow(); + lower.addString("key", "a"); + upper.addString("key", "f"); + createOptions.addRangePartition(lower, upper); + + lower = schema.newPartialRow(); + upper = schema.newPartialRow(); + lower.addString("key", "h"); + upper.addString("key", "z"); + createOptions.addRangePartition(lower, upper); + + PartialRow split = schema.newPartialRow(); + split.addString("key", "k"); + createOptions.addSplitRow(split); + + syncClient.createTable(testTableName, schema, createOptions); + + KuduSession session = syncClient.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); + KuduTable table = syncClient.openTable(testTableName); + for (char c = 'a'; c < 'f'; c++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", "" + c); + row.addString("c1", "c1_" + c); + row.addString("c2", "c2_" + c); + session.apply(insert); + } + for (char c = 'h'; c < 'z'; c++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", "" + c); + row.addString("c1", "c1_" + c); + row.addString("c2", "c2_" + c); + session.apply(insert); + } + session.flush(); + + KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); + tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of()); + List<KuduScanToken> tokens = tokenBuilder.build(); + assertEquals(6, tokens.size()); + assertEquals('f' - 'a' + 'z' - 'h', + countScanTokenRows(tokens, + syncClient.getMasterAddressesAsString(), + syncClient.getDefaultOperationTimeoutMs())); + + for (KuduScanToken token : tokens) { + // Sanity check to make sure the debug printing does not throw. + LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), syncClient)); + } + } + + /** + * Tests the results of creating scan tokens, altering the columns being + * scanned, and then executing the scan tokens. + */ + @Test + public void testScanTokensConcurrentAlterTable() throws Exception { + Schema schema = new Schema(ImmutableList.of( + new ColumnSchema.ColumnSchemaBuilder("key", Type.INT64).nullable(false).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64).nullable(false).key(false).build() + )); + CreateTableOptions createOptions = new CreateTableOptions(); + createOptions.setRangePartitionColumns(ImmutableList.<String>of()); + createOptions.setNumReplicas(1); + syncClient.createTable(testTableName, schema, createOptions); + + KuduTable table = syncClient.openTable(testTableName); + + KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); + List<KuduScanToken> tokens = tokenBuilder.build(); + assertEquals(1, tokens.size()); + KuduScanToken token = tokens.get(0); + + // Drop a column + syncClient.alterTable(testTableName, new AlterTableOptions().dropColumn("a")); + try { + token.intoScanner(syncClient); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Unknown column")); + } + + // Add back the column with the wrong type. + syncClient.alterTable( + testTableName, + new AlterTableOptions().addColumn( + new ColumnSchema.ColumnSchemaBuilder("a", Type.STRING).nullable(true).build())); + try { + token.intoScanner(syncClient); + fail(); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains( + "invalid type INT64 for column 'a' in scan token, expected: STRING")); + } + + // Add the column with the wrong nullability. + syncClient.alterTable( + testTableName, + new AlterTableOptions().dropColumn("a") + .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) + .nullable(true).build())); + try { + token.intoScanner(syncClient); + fail(); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains( + "invalid nullability for column 'a' in scan token, expected: NOT NULL")); + } + + // Add the column with the correct type and nullability. + syncClient.alterTable( + testTableName, + new AlterTableOptions().dropColumn("a") + .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) + .nullable(false) + .defaultValue(0L).build())); + token.intoScanner(syncClient); + } + +} http://git-wip-us.apache.org/repos/asf/kudu/blob/0cda8c8e/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java b/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java index 5476175..055955c 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/util/ClientTestUtil.java @@ -18,6 +18,7 @@ package org.apache.kudu.util; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; @@ -30,8 +31,10 @@ import org.apache.kudu.client.AsyncKuduScanner; import org.apache.kudu.client.AsyncKuduSession; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanToken; import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.PartialRow; @@ -40,6 +43,7 @@ import org.apache.kudu.client.RowResultIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -123,6 +127,47 @@ public abstract class ClientTestUtil { return countRowsInScan(scanBuilder.build()); } + /** + * Counts the rows in the provided scan tokens. + */ + public static int countScanTokenRows(List<KuduScanToken> tokens, final String masterAddresses, + final long operationTimeoutMs) + throws IOException, InterruptedException { + final AtomicInteger count = new AtomicInteger(0); + List<Thread> threads = new ArrayList<>(); + for (final KuduScanToken token : tokens) { + final byte[] serializedToken = token.serialize(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses) + .defaultAdminOperationTimeoutMs(operationTimeoutMs) + .build()) { + KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient); + try { + int localCount = 0; + while (scanner.hasMoreRows()) { + localCount += Iterators.size(scanner.nextRows()); + } + count.addAndGet(localCount); + } finally { + scanner.close(); + } + } catch (Exception e) { + LOG.error("exception in parallel token scanner", e); + } + } + }); + thread.run(); + threads.add(thread); + } + + for (Thread thread : threads) { + thread.join(); + } + return count.get(); + } + public static List<String> scanTableToStrings(KuduTable table, KuduPredicate... predicates) throws Exception { List<String> rowStrings = Lists.newArrayList();