This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 248d9f8ce [java] KUDU-2671 support adding a range with custom hash 
schema
248d9f8ce is described below

commit 248d9f8cecd947d700f9e1135db17a30ea1e194a
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Thu Jun 2 17:40:30 2022 -0700

    [java] KUDU-2671 support adding a range with custom hash schema
    
    With this patch, Kudu Java client now supports adding a range with
    custom hash schema for a table.  The patch also contains test cases
    to cover the new functionality.
    
    This is a patch to complement [1] at the Kudu Java client side
    ([1] introduced corresponding changes at the Kudu C++ client).
    
    [1] https://gerrit.cloudera.org/#/c/18663/
    
    Change-Id: Ieaab7a79d4336de7ff6ec84b8c1806407e4fa44e
    Reviewed-on: http://gerrit.cloudera.org:8080/18589
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Mahesh Reddy <mre...@cloudera.com>
    Reviewed-by: Attila Bukor <abu...@apache.org>
---
 .../org/apache/kudu/client/AlterTableOptions.java  |  33 +++
 .../org/apache/kudu/client/TestAlterTable.java     | 262 +++++++++++++++++++
 .../java/org/apache/kudu/client/TestKuduTable.java | 279 ++++++++++++++++++++-
 3 files changed, 567 insertions(+), 7 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
index 2f01d5798..a2e4f6476 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
@@ -363,6 +363,39 @@ public class AlterTableOptions {
     return this;
   }
 
+  /**
+   * Similar to the other addRangePartition() methods, but instead of adding a
+   * range with table-wide hash schema, this method adds a range with
+   * custom hash schema.
+   *
+   * @param range the range with custom hash schema
+   * @return this instance
+   */
+  public AlterTableOptions 
addRangePartition(RangePartitionWithCustomHashSchema range) {
+    Preconditions.checkNotNull(range);
+    AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+    step.setType(AlterTableRequestPB.StepType.ADD_RANGE_PARTITION);
+    AlterTableRequestPB.AddRangePartition.Builder rangeBuilder =
+        AlterTableRequestPB.AddRangePartition.newBuilder();
+    rangeBuilder.setRangeBounds(
+        new Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+            range.getLowerBound(), range.getUpperBound(),
+            range.getLowerBoundType(), range.getUpperBoundType()));
+    for (org.apache.kudu.Common.PartitionSchemaPB.HashBucketSchemaPB 
hashSchema :
+        range.toPB().getHashSchemaList()) {
+      Common.PartitionSchemaPB.HashBucketSchemaPB.Builder hbs =
+          rangeBuilder.addCustomHashSchemaBuilder();
+      hbs.mergeFrom(hashSchema);
+    }
+    step.setAddRangePartition(rangeBuilder);
+    if (!pb.hasSchema()) {
+      pb.setSchema(ProtobufHelper.schemaToPb(range.getLowerBound().getSchema(),
+          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+              SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
+    }
+    return this;
+  }
+
   /**
    * Drop the range partition from the table with the specified inclusive 
lower bound and exclusive
    * upper bound. The bounds must match exactly, and may not span multiple 
range partitions.
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
index 377428437..79ddc39c4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
@@ -512,6 +512,268 @@ public class TestAlterTable {
     assertEquals(100, countRowsInTable(table));
   }
 
+  /**
+   * Test altering a table, adding range partitions with custom hash schema
+   * per range.
+   */
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testAlterAddRangeWithCustomHashSchema() throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.INT32)
+        .nullable(false)
+        .key(true)
+        .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+        .nullable(false)
+        .build());
+    final Schema schema = new Schema(columns);
+
+    CreateTableOptions createOptions =
+        new CreateTableOptions()
+            .setRangePartitionColumns(ImmutableList.of("c0"))
+            .addHashPartitions(ImmutableList.of("c0"), 2, 0)
+            .setNumReplicas(1);
+
+    {
+      // Add range partition with the table-wide hash schema (to be added upon
+      // creating the new table).
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 100);
+      createOptions.addRangePartition(lower, upper);
+    }
+
+    client.createTable(tableName, schema, createOptions);
+
+    // Alter the table: add a range partition with custom hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 200);
+      RangePartitionWithCustomHashSchema range =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      range.addHashPartitions(ImmutableList.of("c0"), 3, 0);
+      client.alterTable(tableName, new 
AlterTableOptions().addRangePartition(range));
+    }
+
+    KuduTable table = client.openTable(tableName);
+
+    // Insert some rows and then drop partitions, ensuring the row count comes
+    // as expected.
+    insertRows(table, -100, 100);
+    assertEquals(200, countRowsInTable(table));
+    insertRows(table, 100, 200);
+    assertEquals(300, countRowsInTable(table));
+
+    {
+      AlterTableOptions alter = new AlterTableOptions();
+      alter.setWait(true);
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 100);
+      alter.dropRangePartition(lower, upper);
+      client.alterTable(tableName, alter);
+      assertEquals(100, countRowsInTable(table));
+    }
+
+    {
+      AlterTableOptions alter = new AlterTableOptions();
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 200);
+      alter.dropRangePartition(lower, upper);
+      client.alterTable(tableName, alter);
+      assertEquals(0, countRowsInTable(table));
+    }
+
+    // Make sure it's possible to delete/drop the table after adding and then
+    // dropping a range with custom hash schema.
+    client.deleteTable(tableName);
+  }
+
+  /**
+   * Test altering a table, adding range partitions with custom hash schema
+   * per range and dropping partition in the middle, resulting in non-covered
+   * ranges between partition with the table-wide and custom hash schemas.
+   */
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testAlterAddRangeWithCustomHashSchemaNonCoveredRange() throws 
Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.INT32)
+        .nullable(false)
+        .key(true)
+        .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+        .nullable(false)
+        .build());
+    final Schema schema = new Schema(columns);
+
+    CreateTableOptions createOptions =
+        new CreateTableOptions()
+            .setRangePartitionColumns(ImmutableList.of("c0"))
+            .addHashPartitions(ImmutableList.of("c0"), 2, 0)
+            .setNumReplicas(1);
+
+    // Add 3 range partitions with the table-wide hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -300);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", -200);
+      createOptions.addRangePartition(lower, upper);
+    }
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 100);
+      createOptions.addRangePartition(lower, upper);
+    }
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 200);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 300);
+      createOptions.addRangePartition(lower, upper);
+    }
+
+    client.createTable(tableName, schema, createOptions);
+
+    // Add range partitions with custom hash schemas, interlaced with the
+    // partitions having the table-wide hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -400);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", -300);
+      RangePartitionWithCustomHashSchema range =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      range.addHashPartitions(ImmutableList.of("c0"), 3, 0);
+      client.alterTable(tableName, new 
AlterTableOptions().addRangePartition(range));
+    }
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -200);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", -100);
+      RangePartitionWithCustomHashSchema range =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      range.addHashPartitions(ImmutableList.of("c0"), 4, 0);
+      client.alterTable(tableName, new 
AlterTableOptions().addRangePartition(range));
+    }
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 200);
+      RangePartitionWithCustomHashSchema range =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      range.addHashPartitions(ImmutableList.of("c0"), 5, 0);
+      client.alterTable(tableName, new 
AlterTableOptions().addRangePartition(range));
+    }
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 300);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 400);
+      RangePartitionWithCustomHashSchema range =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      range.addHashPartitions(ImmutableList.of("c0"), 6, 0);
+      client.alterTable(tableName, new 
AlterTableOptions().addRangePartition(range));
+    }
+
+    KuduTable table = client.openTable(tableName);
+
+    // Insert some rows and then drop partitions, ensuring the row count comes
+    // as expected.
+    insertRows(table, -400, 0);
+    assertEquals(400, countRowsInTable(table));
+
+    insertRows(table, 0, 400);
+    assertEquals(800, countRowsInTable(table));
+
+    // Drop one range with table-wide hash schema in the very middle of the
+    // covered ranges.
+    {
+      AlterTableOptions alter = new AlterTableOptions();
+      alter.setWait(true);
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 100);
+      alter.dropRangePartition(lower, upper);
+      client.alterTable(tableName, alter);
+    }
+    assertEquals(600, countRowsInTable(table));
+
+    {
+      AlterTableOptions alter = new AlterTableOptions();
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -400);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", -300);
+      alter.dropRangePartition(lower, upper);
+      client.alterTable(tableName, alter);
+    }
+    assertEquals(500, countRowsInTable(table));
+
+    {
+      AlterTableOptions alter = new AlterTableOptions();
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", 200);
+      alter.dropRangePartition(lower, upper);
+      client.alterTable(tableName, alter);
+    }
+    assertEquals(400, countRowsInTable(table));
+
+    {
+      AlterTableOptions alter = new AlterTableOptions();
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt("c0", -200);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt("c0", -100);
+      alter.dropRangePartition(lower, upper);
+      client.alterTable(tableName, alter);
+    }
+    assertEquals(300, countRowsInTable(table));
+
+    // Make sure it's possible to delete/drop the table after adding and then
+    // dropping a range with custom hash schema.
+    client.deleteTable(tableName);
+  }
+
   @Test
   public void testAlterExtraConfigs() throws Exception {
     KuduTable table = createTable(ImmutableList.of());
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 5175ee95d..ce0e7081f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -594,8 +594,9 @@ public class TestKuduTable {
         }
       }
       assertEquals(2, buckets.size());
-      assertTrue(buckets.contains(0));
-      assertTrue(buckets.contains(1));
+      for (int i = 0; i < buckets.size(); ++i) {
+        assertTrue(String.format("must have bucket %d", i), 
buckets.contains(i));
+      }
 
       final List<Partition> rangePartitions =
           table.getRangePartitions(client.getDefaultOperationTimeoutMs());
@@ -692,11 +693,9 @@ public class TestKuduTable {
       // Check the generated scan tokens cover all the tablets for the range:
       // all hash bucket indices should be present.
       assertEquals(5, buckets.size());
-      assertTrue(buckets.contains(0));
-      assertTrue(buckets.contains(1));
-      assertTrue(buckets.contains(2));
-      assertTrue(buckets.contains(3));
-      assertTrue(buckets.contains(4));
+      for (int i = 0; i < buckets.size(); ++i) {
+        assertTrue(String.format("must have bucket %d", i), 
buckets.contains(i));
+      }
     }
   }
 
@@ -1676,6 +1675,272 @@ public class TestKuduTable {
         table2.getSchema().getColumn("value").getComment());
   }
 
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testAlterTableAddRangePartitionCustomHashSchemaOverlapped() 
throws Exception {
+    final List<ColumnSchema> columns = ImmutableList.of(
+        new ColumnSchema.ColumnSchemaBuilder("key", 
Type.INT32).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
+    final Schema schema = new Schema(columns);
+
+    CreateTableOptions options = getBasicCreateTableOptions();
+    // Add table-wide schema for the table.
+    options.addHashPartitions(ImmutableList.of("key"), 2, 0);
+
+    client.createTable(tableName, schema, options);
+
+    // Originally, there are no range partitions in the newly created table.
+    assertEquals(
+        ImmutableList.of("UNBOUNDED"),
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+
+    PartialRow lower = schema.newPartialRow();
+    lower.addInt(0, -1);
+    PartialRow upper = schema.newPartialRow();
+    upper.addInt(0, 1);
+
+    RangePartitionWithCustomHashSchema range =
+        new RangePartitionWithCustomHashSchema(
+            lower,
+            upper,
+            RangePartitionBound.INCLUSIVE_BOUND,
+            RangePartitionBound.EXCLUSIVE_BOUND);
+    range.addHashPartitions(ImmutableList.of("key"), 3, 0);
+
+    try {
+      client.alterTable(tableName, new 
AlterTableOptions().addRangePartition(range));
+      fail("should not be able to add a partition which overlaps with existing 
unbounded one");
+    } catch (KuduException ex) {
+      final String errmsg = ex.getMessage();
+      assertTrue(errmsg, ex.getStatus().isInvalidArgument());
+      assertTrue(errmsg, errmsg.matches(".*new range partition conflicts with 
existing one:.*"));
+    }
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testAlterTableAddRangePartitionCustomHashSchema() throws 
Exception {
+    final List<ColumnSchema> columns = ImmutableList.of(
+        new ColumnSchema.ColumnSchemaBuilder("key", 
Type.INT32).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("value", 
Type.STRING).nullable(true).build());
+    final Schema schema = new Schema(columns);
+
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    // Add table-wide schema for the table.
+    builder.addHashPartitions(ImmutableList.of("key"), 2, 0);
+
+    // Add a range partition with table-wide hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt(0, -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt(0, 100);
+      builder.addRangePartition(lower, upper);
+    }
+
+    client.createTable(tableName, schema, builder);
+
+    assertEquals(
+        ImmutableList.of("-100 <= VALUES < 100"),
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt(0, 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt(0, 200);
+
+      RangePartitionWithCustomHashSchema range =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      range.addHashPartitions(ImmutableList.of("key"), 7, 0);
+
+      client.alterTable(
+          tableName, new AlterTableOptions().addRangePartition(range));
+    }
+
+    final KuduTable table = client.openTable(tableName);
+
+    List<String> expected = ImmutableList.of(
+        "-100 <= VALUES < 100", "100 <= VALUES < 200");
+    assertEquals(
+        expected,
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+
+    final PartitionSchema ps = 
client.openTable(tableName).getPartitionSchema();
+    assertTrue(ps.hasCustomHashSchemas());
+
+    {
+      // NOTE: use schema from server since ColumnIDs are needed for row 
encoding
+      final PartialRow rowLower = table.getSchema().newPartialRow();
+      rowLower.addInt(0, -100);
+
+      final PartialRow rowUpper = table.getSchema().newPartialRow();
+      rowUpper.addInt(0, 100);
+
+      // There should be 2 tablets for the range with the table-wide hash 
schema
+      // adding during table creation of the table.
+      {
+        final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+            KeyEncoder.encodeRangePartitionKey(rowLower, ps.getRangeSchema()));
+        // There should be just one dimension with 2 buckets.
+        assertEquals(1, s.size());
+        assertEquals(2, s.get(0).getNumBuckets());
+      }
+      {
+        final byte[] rowLowerEnc = ps.encodePartitionKey(rowLower);
+        final byte[] rowUpperEnc = ps.encodePartitionKey(rowUpper);
+
+        // The range part comes after the hash part in an encoded partition 
key.
+        // The hash part contains 4 * number_of_hash_dimensions bytes.
+        byte[] hashLower = Arrays.copyOfRange(rowLowerEnc, 4, 
rowLowerEnc.length);
+        byte[] hashUpper = Arrays.copyOfRange(rowUpperEnc, 4, 
rowUpperEnc.length);
+
+        Set<Integer> buckets = new HashSet();
+        for (KuduScanToken token : new 
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+            .addPredicate(KuduPredicate.newComparisonPredicate(
+                columns.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 
-100))
+            .addPredicate(KuduPredicate.newComparisonPredicate(
+                columns.get(0), KuduPredicate.ComparisonOp.LESS, 100))
+            .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+          final Partition p = token.getTablet().getPartition();
+          assertEquals(0, Bytes.memcmp(p.getRangeKeyStart(), hashLower));
+          assertEquals(0, Bytes.memcmp(p.getRangeKeyEnd(), hashUpper));
+          assertEquals(1, p.getHashBuckets().size());
+          buckets.add(p.getHashBuckets().get(0));
+        }
+
+        // Check that the generated scan tokens cover all the tablets for the 
range:
+        // all hash bucket indices should be present.
+        assertEquals(2, buckets.size());
+        for (int i = 0; i < buckets.size(); ++i) {
+          assertTrue(String.format("must have bucket %d", i), 
buckets.contains(i));
+        }
+      }
+    }
+
+    {
+      // NOTE: use schema from server since ColumnIDs are needed for row 
encoding
+      final PartialRow rowLower = table.getSchema().newPartialRow();
+      rowLower.addInt(0, 100);
+
+      final PartialRow rowUpper = table.getSchema().newPartialRow();
+      rowUpper.addInt(0, 200);
+
+      // There should be 7 tablets for the newly added range: and the newly 
added
+      // range with 7 hash buckets.
+      {
+        final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+            KeyEncoder.encodeRangePartitionKey(rowLower, ps.getRangeSchema()));
+        // There should be just one dimension with 7 buckets.
+        assertEquals(1, s.size());
+        assertEquals(7, s.get(0).getNumBuckets());
+      }
+      {
+        final byte[] rowLowerEnc = ps.encodePartitionKey(rowLower);
+        final byte[] rowUpperEnc = ps.encodePartitionKey(rowUpper);
+
+        // The range part comes after the hash part in an encoded partition 
key.
+        // The hash part contains 4 * number_of_hash_dimensions bytes.
+        byte[] hashLower = Arrays.copyOfRange(rowLowerEnc, 4, 
rowLowerEnc.length);
+        byte[] hashUpper = Arrays.copyOfRange(rowUpperEnc, 4, 
rowUpperEnc.length);
+
+        Set<Integer> buckets = new HashSet();
+        for (KuduScanToken token : new 
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+            .addPredicate(KuduPredicate.newComparisonPredicate(
+                columns.get(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 100))
+            .addPredicate(KuduPredicate.newComparisonPredicate(
+                columns.get(0), KuduPredicate.ComparisonOp.LESS, 200))
+            .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+          final Partition p = token.getTablet().getPartition();
+          assertEquals(0, Bytes.memcmp(p.getRangeKeyStart(), hashLower));
+          assertEquals(0, Bytes.memcmp(p.getRangeKeyEnd(), hashUpper));
+          assertEquals(1, p.getHashBuckets().size());
+          buckets.add(p.getHashBuckets().get(0));
+        }
+
+        // Check that the generated scan tokens cover all the tablets for the 
range:
+        // all hash bucket indices should be present.
+        assertEquals(7, buckets.size());
+        for (int i = 0; i < buckets.size(); ++i) {
+          assertTrue(String.format("must have bucket %d", i), 
buckets.contains(i));
+        }
+      }
+    }
+
+    // Make sure it's possible to insert into the newly added range.
+    KuduSession session = client.newSession();
+    {
+      for (int key = 0; key < 9; ++key) {
+        insertDefaultRow(table, session, key);
+      }
+      session.flush();
+
+      List<String> rowStrings = scanTableToStrings(table);
+      assertEquals(9, rowStrings.size());
+      for (int i = 0; i < rowStrings.size(); i++) {
+        StringBuilder expectedRow = new StringBuilder();
+        expectedRow.append(String.format("INT32 key=%d, STRING value=NULL", 
i));
+        assertEquals(expectedRow.toString(), rowStrings.get(i));
+      }
+
+      rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("key"), 
GREATER_EQUAL, 8));
+      assertEquals(1, rowStrings.size());
+      StringBuilder expectedRow = new StringBuilder();
+      expectedRow.append(String.format("INT32 key=8, STRING value=NULL"));
+      assertEquals(expectedRow.toString(), rowStrings.get(0));
+    }
+
+    // Insert more rows: those should go into both ranges -- the range with
+    // the table-wide and the newly added range with custom hash schema.
+    {
+      for (int key = 9; key < 200; ++key) {
+        insertDefaultRow(table, session, key);
+      }
+      session.flush();
+
+      List<String> rowStrings = scanTableToStrings(table);
+      assertEquals(200, rowStrings.size());
+      rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("key"), 
GREATER_EQUAL, 100));
+      assertEquals(100, rowStrings.size());
+
+      rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("key"), 
GREATER_EQUAL, 180));
+      assertEquals(20, rowStrings.size());
+    }
+
+    // Insert more rows into the range with table-wide hash schema.
+    {
+      for (int key = -100; key < 0; ++key) {
+        insertDefaultRow(table, session, key);
+      }
+      session.flush();
+
+      List<String> rowStrings = scanTableToStrings(table);
+      assertEquals(300, rowStrings.size());
+
+      rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("key"), LESS, 
0));
+      assertEquals(100, rowStrings.size());
+
+      // Predicate to have one part of the rows in the range with table-wide 
hash
+      // schema, and the other part from the range with custom hash schema.
+      rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("key"), 
GREATER_EQUAL, 50),
+          KuduPredicate.newComparisonPredicate(schema.getColumn("key"), LESS, 
150));
+      assertEquals(100, rowStrings.size());
+    }
+  }
+
   @Test(timeout = 100000)
   @SuppressWarnings("deprecation")
   public void testDimensionLabel() throws Exception {

Reply via email to