This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 9091f31cf KUDU-2671 refactor PartitionSchema::CreatePartitions()
9091f31cf is described below

commit 9091f31cfc5c102b6a734fa20b015735251392d4
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Tue May 31 20:18:51 2022 -0700

    KUDU-2671 refactor PartitionSchema::CreatePartitions()
    
    This patch refactors the code of the PartitionSchema::CreatePartitions()
    method and the code around its invocations, addressing a few TODOs.
    As a consequence, this patch fixes a bug in the CreateTable path: prior
    to this patch, a client could submit a CreateTableRequestPB request such
    that the order of ranges in CreateTableRequestPB::split_rows_range_bounds
    and CreateTableRequestPB::partition_schema::custom_hash_schema_ranges
    were different, and the result table would be created with wrong hash
    schemas.
    
    For example:
    
      requested:
        {{range_boundary_a0, range_boundary_a1}, hash_schema_a}
        {{range_boundary_b0, range_boundary_b1}, hash_schema_b}
    
      created:
        {{range_boundary_a0, range_boundary_a1}, hash_schema_b}
        {{range_boundary_b0, range_boundary_b1}, hash_schema_a}
    
    I'm planning to add corresponding test scenario to catch the regressions
    in a separate changelist for ease of reviewing and tracking the changes.
    
    However, I updated the existing test scenarios affected by this change.
    
    Change-Id: I5bac1f8ee349577e2f912530a28776415ed0a5b0
    Reviewed-on: http://gerrit.cloudera.org:8080/18582
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
    Reviewed-by: Attila Bukor <abu...@apache.org>
---
 src/kudu/common/partition-test.cc                  | 571 +++++++++------------
 src/kudu/common/partition.cc                       | 285 +++++-----
 src/kudu/common/partition.h                        |  46 +-
 src/kudu/common/partition_pruner-test.cc           |  76 +--
 src/kudu/common/scan_spec-test.cc                  |  18 +-
 .../integration-tests/ts_tablet_manager-itest.cc   |   2 +-
 src/kudu/master/catalog_manager.cc                 |  98 ++--
 src/kudu/master/master-test.cc                     |   4 +-
 src/kudu/master/sys_catalog.cc                     |   2 +-
 src/kudu/tablet/tablet-harness.h                   |   2 +-
 src/kudu/tserver/tablet_server-test.cc             |   2 +-
 11 files changed, 531 insertions(+), 575 deletions(-)

diff --git a/src/kudu/common/partition-test.cc 
b/src/kudu/common/partition-test.cc
index 259ba28c3..d74936936 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -19,9 +19,7 @@
 
 #include <algorithm>
 #include <cstdint>
-#include <random>
 #include <string>
-#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -112,7 +110,7 @@ void CheckCreateRangePartitions(const 
vector<pair<optional<string>, optional<str
   }
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, schema, 
&partitions));
   ASSERT_EQ(expected_partition_ranges.size(), partitions.size());
 
   for (int i = 0; i < partitions.size(); i++) {
@@ -188,7 +186,7 @@ TEST_F(PartitionTest, TestCompoundRangeKeyEncoding) {
   }
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, schema, 
&partitions));
   ASSERT_EQ(4, partitions.size());
 
   EXPECT_TRUE(partitions[0].hash_buckets().empty());
@@ -475,7 +473,7 @@ TEST_F(PartitionTest, TestCreateHashPartitions) {
 
   vector<Partition> partitions;
   ASSERT_OK(
-      partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {}, 
schema, &partitions));
+      partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, 
&partitions));
   ASSERT_EQ(3, partitions.size());
 
   EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
@@ -564,7 +562,7 @@ TEST_F(PartitionTest, TestCreatePartitions) {
   // Split keys need not be passed in sorted order.
   vector<KuduPartialRow> split_rows = { split_b, split_a };
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(split_rows, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(split_rows, {}, schema, 
&partitions));
   ASSERT_EQ(12, partitions.size());
 
   EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
@@ -912,31 +910,43 @@ void CheckSerializationFunctions(const PartitionSchemaPB& 
pb,
   ASSERT_EQ(partition_schema, partition_schema1);
 }
 
+void AddRangePartitionWithSchema(
+    const Schema& schema,
+    const KuduPartialRow& lower,
+    const KuduPartialRow& upper,
+    const PartitionSchema::HashSchema& range_hash_schema,
+    PartitionSchemaPB* pb) {
+  auto* range = pb->add_custom_hash_schema_ranges();
+  RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+  encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+  encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+  for (const auto& dimension : range_hash_schema) {
+    auto* hash_dimension_pb = range->add_hash_schema();
+    PartitionSchema::HashDimension hash_dimension;
+    for (const auto& cid : dimension.column_ids) {
+      
hash_dimension_pb->add_columns()->set_name(schema.column_by_id(cid).name());
+    }
+    hash_dimension_pb->set_num_buckets(dimension.num_buckets);
+    hash_dimension_pb->set_seed(dimension.seed);
+  }
+}
+
 } // anonymous namespace
 
-TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
-  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+TEST_F(PartitionTest, VaryingHashSchemasPerRange) {
+  // CREATE TABLE t (a STRING, b STRING, c STRING, PRIMARY KEY (a, b, c))
   // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)];
   Schema schema({ ColumnSchema("a", STRING),
                   ColumnSchema("b", STRING),
                   ColumnSchema("c", STRING) },
                 { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
 
-  PartitionSchemaPB schema_builder;
-  // Table-wide hash schema defined below, 3 by 2 buckets so 6 total.
-  AddHashDimension(&schema_builder, { "a", "c" }, 3, 0);
-  AddHashDimension(&schema_builder, { "b" }, 2, 0);
-  PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, 
&partition_schema));
-  CheckSerializationFunctions(schema_builder, partition_schema, schema);
-
-  ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)",
-            partition_schema.DebugString(schema));
-
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-  vector<pair<pair<KuduPartialRow, KuduPartialRow>,
-              PartitionSchema::HashSchema>> bounds_with_hash_schemas;
+  PartitionSchemaPB ps_pb;
+  // Table-wide hash schema is defined below: 3 by 2 buckets, so 6 total.
+  const PartitionSchema::HashSchema table_wide_hash_schema =
+      { { { ColumnId(0), ColumnId(2) }, 3, 0}, { { ColumnId(1) }, 2, 0 } };
+  AddHashDimension(&ps_pb, { "a", "c" }, 3, 0);
+  AddHashDimension(&ps_pb, { "b" }, 2, 0);
 
   { // [(a1, _, c1), (a2, _, c2))
     KuduPartialRow lower(&schema);
@@ -945,11 +955,8 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
     ASSERT_OK(lower.SetStringCopy("c", "c1"));
     ASSERT_OK(upper.SetStringCopy("a", "a2"));
     ASSERT_OK(upper.SetStringCopy("c", "c2"));
-    PartitionSchema::HashSchema hash_schema_4_buckets = {{{ColumnId(0)}, 4, 
0}};
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(hash_schema_4_buckets);
-    bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), 
std::move(upper)),
-                                          std::move(hash_schema_4_buckets));
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(0)}, 4, 0}}, &ps_pb);
   }
 
   { // [(a3, b3, _), (a4, b4, _))
@@ -959,11 +966,8 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
     ASSERT_OK(lower.SetStringCopy("b", "b3"));
     ASSERT_OK(upper.SetStringCopy("a", "a4"));
     ASSERT_OK(upper.SetStringCopy("b", "b4"));
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(partition_schema.hash_schema());
-    // Use the table-wide hash schema for this range.
-    bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), 
std::move(upper)),
-                                          partition_schema.hash_schema());
+    AddRangePartitionWithSchema(
+        schema, lower, upper, table_wide_hash_schema, &ps_pb);
   }
 
   { // [(a5, b5, _), (a6, _, c6))
@@ -973,234 +977,180 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
     ASSERT_OK(lower.SetStringCopy("b", "b5"));
     ASSERT_OK(upper.SetStringCopy("a", "a6"));
     ASSERT_OK(upper.SetStringCopy("c", "c6"));
-    PartitionSchema::HashSchema hash_schema_2_buckets_by_3 = {
-        {{ColumnId(0)}, 2, 0},
-        {{ColumnId(1)}, 3, 0}
-    };
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3);
-    bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), 
std::move(upper)),
-                                          
std::move(hash_schema_2_buckets_by_3));
+    AddRangePartitionWithSchema(
+        schema, lower, upper,
+        {{{ColumnId(0)}, 2, 0}, {{ColumnId(1)}, 3, 0}}, &ps_pb);
   }
 
-  const auto check_partitions = [](const vector<Partition>& partitions) {
-    ASSERT_EQ(16, partitions.size());
-
-    ASSERT_EQ(1, partitions[0].hash_buckets().size());
-    EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
-    EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].begin().range_key());
-    EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[0].end().range_key());
-    EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12),
-              partitions[0].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\0" "a2\0\0\0\0c2", 12),
-              partitions[0].end().ToString());
-
-    ASSERT_EQ(1, partitions[1].hash_buckets().size());
-    EXPECT_EQ(1, partitions[1].hash_buckets()[0]);
-    EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].begin().range_key());
-    EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[1].end().range_key());
-    EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12),
-              partitions[1].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\1" "a2\0\0\0\0c2", 12),
-              partitions[1].end().ToString());
-
-    ASSERT_EQ(1, partitions[2].hash_buckets().size());
-    EXPECT_EQ(2, partitions[2].hash_buckets()[0]);
-    EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].begin().range_key());
-    EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[2].end().range_key());
-    EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12),
-              partitions[2].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\2" "a2\0\0\0\0c2", 12),
-              partitions[2].end().ToString());
-
-    ASSERT_EQ(1, partitions[3].hash_buckets().size());
-    EXPECT_EQ(3, partitions[3].hash_buckets()[0]);
-    EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].begin().range_key());
-    EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[3].end().range_key());
-    EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12),
-              partitions[3].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\3" "a2\0\0\0\0c2", 12),
-              partitions[3].end().ToString());
-
-    ASSERT_EQ(2, partitions[4].hash_buckets().size());
-    EXPECT_EQ(0, partitions[4].hash_buckets()[0]);
-    EXPECT_EQ(0, partitions[4].hash_buckets()[1]);
-    EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[4].begin().range_key());
-    EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[4].end().range_key());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a3\0\0b3\0\0", 16),
-              partitions[4].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16),
-              partitions[4].end().ToString());
-
-    ASSERT_EQ(2, partitions[5].hash_buckets().size());
-    EXPECT_EQ(0, partitions[5].hash_buckets()[0]);
-    EXPECT_EQ(1, partitions[5].hash_buckets()[1]);
-    EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[5].begin().range_key());
-    EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[5].end().range_key());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a3\0\0b3\0\0", 16),
-              partitions[5].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16),
-              partitions[5].end().ToString());
-
-    ASSERT_EQ(2, partitions[6].hash_buckets().size());
-    EXPECT_EQ(1, partitions[6].hash_buckets()[0]);
-    EXPECT_EQ(0, partitions[6].hash_buckets()[1]);
-    EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[6].begin().range_key());
-    EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[6].end().range_key());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a3\0\0b3\0\0", 16),
-              partitions[6].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16),
-              partitions[6].end().ToString());
-
-    ASSERT_EQ(2, partitions[7].hash_buckets().size());
-    EXPECT_EQ(1, partitions[7].hash_buckets()[0]);
-    EXPECT_EQ(1, partitions[7].hash_buckets()[1]);
-    EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[7].begin().range_key());
-    EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[7].end().range_key());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a3\0\0b3\0\0", 16),
-              partitions[7].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16),
-              partitions[7].end().ToString());
-
-    ASSERT_EQ(2, partitions[8].hash_buckets().size());
-    EXPECT_EQ(2, partitions[8].hash_buckets()[0]);
-    EXPECT_EQ(0, partitions[8].hash_buckets()[1]);
-    EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[8].begin().range_key());
-    EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[8].end().range_key());
-    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a3\0\0b3\0\0", 16),
-              partitions[8].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a4\0\0b4\0\0", 16),
-              partitions[8].end().ToString());
-
-    ASSERT_EQ(2, partitions[9].hash_buckets().size());
-    EXPECT_EQ(2, partitions[9].hash_buckets()[0]);
-    EXPECT_EQ(1, partitions[9].hash_buckets()[1]);
-    EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[9].begin().range_key());
-    EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[9].end().range_key());
-    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a3\0\0b3\0\0", 16),
-              partitions[9].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a4\0\0b4\0\0", 16),
-              partitions[9].end().ToString());
-
-    ASSERT_EQ(2, partitions[10].hash_buckets().size());
-    EXPECT_EQ(0, partitions[10].hash_buckets()[0]);
-    EXPECT_EQ(0, partitions[10].hash_buckets()[1]);
-    EXPECT_EQ(string("a5\0\0b5\0\0", 8),
-              partitions[10].begin().range_key());
-    EXPECT_EQ(string("a6\0\0\0\0c6", 8),
-              partitions[10].end().range_key());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a5\0\0b5\0\0", 16),
-              partitions[10].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a6\0\0\0\0c6", 16),
-              partitions[10].end().ToString());
-
-    ASSERT_EQ(2, partitions[11].hash_buckets().size());
-    EXPECT_EQ(0, partitions[11].hash_buckets()[0]);
-    EXPECT_EQ(1, partitions[11].hash_buckets()[1]);
-    EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[11].begin().range_key());
-    EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[11].end().range_key());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a5\0\0b5\0\0", 16),
-              partitions[11].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a6\0\0\0\0c6", 16),
-              partitions[11].end().ToString());
-
-    ASSERT_EQ(2, partitions[12].hash_buckets().size());
-    EXPECT_EQ(0, partitions[12].hash_buckets()[0]);
-    EXPECT_EQ(2, partitions[12].hash_buckets()[1]);
-    EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[12].begin().range_key());
-    EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[12].end().range_key());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a5\0\0b5\0\0", 16),
-              partitions[12].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a6\0\0\0\0c6", 16),
-              partitions[12].end().ToString());
-
-    ASSERT_EQ(2, partitions[13].hash_buckets().size());
-    EXPECT_EQ(1, partitions[13].hash_buckets()[0]);
-    EXPECT_EQ(0, partitions[13].hash_buckets()[1]);
-    EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[13].begin().range_key());
-    EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[13].end().range_key());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a5\0\0b5\0\0", 16),
-              partitions[13].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a6\0\0\0\0c6", 16),
-              partitions[13].end().ToString());
-
-    ASSERT_EQ(2, partitions[14].hash_buckets().size());
-    EXPECT_EQ(1, partitions[14].hash_buckets()[0]);
-    EXPECT_EQ(1, partitions[14].hash_buckets()[1]);
-    EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[14].begin().range_key());
-    EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[14].end().range_key());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a5\0\0b5\0\0", 16),
-              partitions[14].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a6\0\0\0\0c6", 16),
-              partitions[14].end().ToString());
-
-    ASSERT_EQ(2, partitions[15].hash_buckets().size());
-    EXPECT_EQ(1, partitions[15].hash_buckets()[0]);
-    EXPECT_EQ(2, partitions[15].hash_buckets()[1]);
-    EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[15].begin().range_key());
-    EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[15].end().range_key());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16),
-              partitions[15].begin().ToString());
-    EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16),
-              partitions[15].end().ToString());
-  };
+  PartitionSchema ps;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  CheckSerializationFunctions(ps_pb, ps, schema);
+
+  ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)",
+            ps.DebugString(schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(
-      {}, bounds, range_hash_schemas, schema, &partitions));
-  NO_FATALS(check_partitions(partitions));
-
-  bounds.clear();
-  range_hash_schemas.clear();
-  partitions.clear();
-
-  // Using std::random_shuffle to insert bounds and their hash schemas out of
-  // sorted order, yet resulting partitions will still be the same.
-  std::mt19937 gen(SeedRandom());
-  std::shuffle(bounds_with_hash_schemas.begin(), 
bounds_with_hash_schemas.end(), gen);
-
-  for (const auto& bounds_and_schema : bounds_with_hash_schemas) {
-    bounds.emplace_back(bounds_and_schema.first);
-    range_hash_schemas.emplace_back(bounds_and_schema.second);
-  }
+  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
 
-  ASSERT_OK(partition_schema.CreatePartitions(
-      {}, bounds, range_hash_schemas, schema, &partitions));
-  NO_FATALS(check_partitions(partitions));
+  ASSERT_EQ(16, partitions.size());
 
-  // Not clearing bounds or range_hash_schemas, adding a split row to test
-  // incompatibility.
-  vector<KuduPartialRow> splits;
-  { // split: (a1, _, c12)
-    KuduPartialRow split(&schema);
-    ASSERT_OK(split.SetStringCopy("a", "a1"));
-    ASSERT_OK(split.SetStringCopy("c", "c12"));
-    splits.emplace_back(std::move(split));
-  }
+  ASSERT_EQ(1, partitions[0].hash_buckets().size());
+  EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].begin().range_key());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[0].end().range_key());
+  EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12),
+            partitions[0].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\0" "a2\0\0\0\0c2", 12),
+            partitions[0].end().ToString());
 
-  // Expecting Status::InvalidArgument() due to 'splits' and schemas within
-  // 'range_hash_schemas' being defined at the same time.
-  {
-    const auto s = partition_schema.CreatePartitions(
-        splits, bounds, range_hash_schemas, schema, &partitions);
-    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(),
-                        "Both 'split_rows' and 'range_hash_schemas' "
-                        "cannot be populated at the same time");
-  }
+  ASSERT_EQ(1, partitions[1].hash_buckets().size());
+  EXPECT_EQ(1, partitions[1].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].begin().range_key());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[1].end().range_key());
+  EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12),
+            partitions[1].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\1" "a2\0\0\0\0c2", 12),
+            partitions[1].end().ToString());
 
-  // Adding another schema to range_hash_schemas to trigger
-  // Status::InvalidArgument() due to 'bounds and 'range_hash_schema' not being
-  // the same size.
-  {
-    range_hash_schemas.push_back({});
-    const auto s = partition_schema.CreatePartitions(
-        {}, bounds, range_hash_schemas, schema, &partitions);
-    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(),
-                        "4 vs 3: per range hash schemas and range bounds "
-                        "must have the same size");
-  }
+  ASSERT_EQ(1, partitions[2].hash_buckets().size());
+  EXPECT_EQ(2, partitions[2].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].begin().range_key());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[2].end().range_key());
+  EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12),
+            partitions[2].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\2" "a2\0\0\0\0c2", 12),
+            partitions[2].end().ToString());
+
+  ASSERT_EQ(1, partitions[3].hash_buckets().size());
+  EXPECT_EQ(3, partitions[3].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].begin().range_key());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[3].end().range_key());
+  EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12),
+            partitions[3].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\3" "a2\0\0\0\0c2", 12),
+            partitions[3].end().ToString());
+
+  ASSERT_EQ(2, partitions[4].hash_buckets().size());
+  EXPECT_EQ(0, partitions[4].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[4].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[4].begin().range_key());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[4].end().range_key());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a3\0\0b3\0\0", 16),
+            partitions[4].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16),
+            partitions[4].end().ToString());
+
+  ASSERT_EQ(2, partitions[5].hash_buckets().size());
+  EXPECT_EQ(0, partitions[5].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[5].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[5].begin().range_key());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[5].end().range_key());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a3\0\0b3\0\0", 16),
+            partitions[5].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16),
+            partitions[5].end().ToString());
+
+  ASSERT_EQ(2, partitions[6].hash_buckets().size());
+  EXPECT_EQ(1, partitions[6].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[6].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[6].begin().range_key());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[6].end().range_key());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a3\0\0b3\0\0", 16),
+            partitions[6].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16),
+            partitions[6].end().ToString());
+
+  ASSERT_EQ(2, partitions[7].hash_buckets().size());
+  EXPECT_EQ(1, partitions[7].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[7].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[7].begin().range_key());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[7].end().range_key());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a3\0\0b3\0\0", 16),
+            partitions[7].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16),
+            partitions[7].end().ToString());
+
+  ASSERT_EQ(2, partitions[8].hash_buckets().size());
+  EXPECT_EQ(2, partitions[8].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[8].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[8].begin().range_key());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[8].end().range_key());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a3\0\0b3\0\0", 16),
+            partitions[8].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a4\0\0b4\0\0", 16),
+            partitions[8].end().ToString());
+
+  ASSERT_EQ(2, partitions[9].hash_buckets().size());
+  EXPECT_EQ(2, partitions[9].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[9].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[9].begin().range_key());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[9].end().range_key());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a3\0\0b3\0\0", 16),
+            partitions[9].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a4\0\0b4\0\0", 16),
+            partitions[9].end().ToString());
+
+  ASSERT_EQ(2, partitions[10].hash_buckets().size());
+  EXPECT_EQ(0, partitions[10].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[10].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),
+            partitions[10].begin().range_key());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),
+            partitions[10].end().range_key());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a5\0\0b5\0\0", 16),
+            partitions[10].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a6\0\0\0\0c6", 16),
+            partitions[10].end().ToString());
+
+  ASSERT_EQ(2, partitions[11].hash_buckets().size());
+  EXPECT_EQ(0, partitions[11].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[11].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[11].begin().range_key());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[11].end().range_key());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a5\0\0b5\0\0", 16),
+            partitions[11].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a6\0\0\0\0c6", 16),
+            partitions[11].end().ToString());
+
+  ASSERT_EQ(2, partitions[12].hash_buckets().size());
+  EXPECT_EQ(0, partitions[12].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[12].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[12].begin().range_key());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[12].end().range_key());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a5\0\0b5\0\0", 16),
+            partitions[12].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a6\0\0\0\0c6", 16),
+            partitions[12].end().ToString());
+
+  ASSERT_EQ(2, partitions[13].hash_buckets().size());
+  EXPECT_EQ(1, partitions[13].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[13].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[13].begin().range_key());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[13].end().range_key());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a5\0\0b5\0\0", 16),
+            partitions[13].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a6\0\0\0\0c6", 16),
+            partitions[13].end().ToString());
+
+  ASSERT_EQ(2, partitions[14].hash_buckets().size());
+  EXPECT_EQ(1, partitions[14].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[14].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[14].begin().range_key());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[14].end().range_key());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a5\0\0b5\0\0", 16),
+            partitions[14].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a6\0\0\0\0c6", 16),
+            partitions[14].end().ToString());
+
+  ASSERT_EQ(2, partitions[15].hash_buckets().size());
+  EXPECT_EQ(1, partitions[15].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[15].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[15].begin().range_key());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[15].end().range_key());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16),
+            partitions[15].begin().ToString());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16),
+            partitions[15].end().ToString());
 }
 
 TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) {
@@ -1209,18 +1159,7 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) {
                 { ColumnId(0), ColumnId(1) }, 2);
 
   // No table-wide hash bucket schema.
-  PartitionSchemaPB schema_builder;
-  PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, 
&partition_schema));
-  CheckSerializationFunctions(schema_builder, partition_schema, schema);
-
-  ASSERT_EQ("RANGE (a, b)", partition_schema.DebugString(schema));
-
-  typedef pair<KuduPartialRow, KuduPartialRow> RangeBound;
-  vector<RangeBound> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-  vector<pair<RangeBound, PartitionSchema::HashSchema>>
-      bounds_with_hash_schemas;
+  PartitionSchemaPB ps_pb;
 
   // [(a1, b1), (a2, b2))
   {
@@ -1230,19 +1169,18 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) {
     ASSERT_OK(lower.SetStringNoCopy("b", "b1"));
     ASSERT_OK(upper.SetStringNoCopy("a", "a2"));
     ASSERT_OK(upper.SetStringNoCopy("b", "b2"));
-    PartitionSchema::HashSchema hash_schema_2_buckets =
-        { { { ColumnId(0) }, 2, 0 } };
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(hash_schema_2_buckets);
-    bounds_with_hash_schemas.emplace_back(
-        make_pair(std::move(lower), std::move(upper)),
-        std::move(hash_schema_2_buckets));
+    AddRangePartitionWithSchema(
+        schema, lower, upper, { { { ColumnId(0) }, 2, 0 } }, &ps_pb);
   }
 
-  vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(
-      {}, bounds, range_hash_schemas, schema, &partitions));
+  PartitionSchema ps;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  CheckSerializationFunctions(ps_pb, ps, schema);
 
+  ASSERT_EQ("RANGE (a, b)", ps.DebugString(schema));
+
+  vector<Partition> partitions;
+  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
   ASSERT_EQ(2, partitions.size());
 
   {
@@ -1262,35 +1200,25 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) {
   }
 }
 
-TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
-  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+TEST_F(PartitionTest, VaryingHashSchemasPerUnboundedRanges) {
+  // CREATE TABLE t (a STRING, b STRING, c STRING, PRIMARY KEY (a, b, c))
   // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];
   Schema schema({ ColumnSchema("a", STRING),
                   ColumnSchema("b", STRING),
                   ColumnSchema("c", STRING) },
                 { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
 
-  PartitionSchemaPB schema_builder;
-  // Table-wide hash schema defined below.
-  AddHashDimension(&schema_builder, { "b" }, 2, 0);
-  PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, 
&partition_schema));
-  CheckSerializationFunctions(schema_builder, partition_schema, schema);
-
-  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)",
-            partition_schema.DebugString(schema));
-
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
+  PartitionSchemaPB ps_pb;
+  // Table-wide hash schema is defined below.
+  AddHashDimension(&ps_pb, { "b" }, 2, 0);
 
   { // [(_, _, _), (a1, _, c1))
     KuduPartialRow lower(&schema);
     KuduPartialRow upper(&schema);
     ASSERT_OK(upper.SetStringCopy("a", "a1"));
     ASSERT_OK(upper.SetStringCopy("c", "c1"));
-    PartitionSchema::HashSchema hash_schema_4_buckets = {{{ColumnId(0)}, 4, 
0}};
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(hash_schema_4_buckets);
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(0)}, 4, 0}}, &ps_pb);
   }
 
   { // [(a2, b2, _), (a3, b3, _))
@@ -1300,8 +1228,7 @@ TEST_F(PartitionTest, 
TestVaryingHashSchemasPerUnboundedRanges) {
     ASSERT_OK(lower.SetStringCopy("b", "b2"));
     ASSERT_OK(upper.SetStringCopy("a", "a3"));
     ASSERT_OK(upper.SetStringCopy("b", "b3"));
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(PartitionSchema::HashSchema());
+    AddRangePartitionWithSchema(schema, lower, upper, {}, &ps_pb);
   }
 
   { // [(a4, b4, _), (_, _, _))
@@ -1313,12 +1240,19 @@ TEST_F(PartitionTest, 
TestVaryingHashSchemasPerUnboundedRanges) {
         {{ColumnId(0)}, 2, 0},
         {{ColumnId(2)}, 3, 0}
     };
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3);
+    AddRangePartitionWithSchema(
+        schema, lower, upper,
+        {{{ColumnId(0)}, 2, 0}, {{ColumnId(2)}, 3, 0}}, &ps_pb);
   }
 
+  PartitionSchema ps;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  CheckSerializationFunctions(ps_pb, ps, schema);
+
+  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)", ps.DebugString(schema));
+
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, 
schema, &partitions));
+  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
   ASSERT_EQ(11, partitions.size());
   // Partitions below sorted by range, can verify that the partition keyspace 
is filled by checking
   // that the start key of the first partition and the end key of the last 
partition is cleared.
@@ -1407,33 +1341,23 @@ TEST_F(PartitionTest, 
TestVaryingHashSchemasPerUnboundedRanges) {
 }
 
 TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) {
-  // CREATE TABLE t (a VARCHAR, b VARCHAR, PRIMARY KEY (a, b))
+  // CREATE TABLE t (a STRING, b STRING, PRIMARY KEY (a, b))
   // PARTITION BY [HASH BUCKET (b), RANGE (a, b)];
   Schema schema({ ColumnSchema("a", STRING),
                   ColumnSchema("b", STRING) },
                 { ColumnId(0), ColumnId(1) }, 2);
 
-  PartitionSchemaPB schema_builder;
-  // Table-wide hash schema defined below.
-  AddHashDimension(&schema_builder, { "b" }, 2, 0);
-  PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, 
&partition_schema));
-  CheckSerializationFunctions(schema_builder, partition_schema, schema);
-
-  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b)",
-            partition_schema.DebugString(schema));
-
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
+  PartitionSchemaPB ps_pb;
+  // Table-wide hash schema is defined below.
+  AddHashDimension(&ps_pb, { "b" }, 2, 0);
 
   // [(_, _), (a1, _))
   {
     KuduPartialRow lower(&schema);
     KuduPartialRow upper(&schema);
     ASSERT_OK(upper.SetStringCopy("a", "a1"));
-    PartitionSchema::HashSchema hash_schema_3_buckets = {{{ColumnId(0)}, 3, 
0}};
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(hash_schema_3_buckets);
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(0)}, 3, 0}}, &ps_pb);
   }
 
   // [(a2, _), (a3, _))
@@ -1442,12 +1366,9 @@ TEST_F(PartitionTest, 
NoHashSchemasForLastUnboundedRange) {
     KuduPartialRow upper(&schema);
     ASSERT_OK(lower.SetStringCopy("a", "a2"));
     ASSERT_OK(upper.SetStringCopy("a", "a3"));
-    bounds.emplace_back(lower, upper);
-    PartitionSchema::HashSchema hash_schema_3_buckets_by_2 = {
-        {{ColumnId(0)}, 3, 0},
-        {{ColumnId(1)}, 2, 0}
-    };
-    range_hash_schemas.emplace_back(hash_schema_3_buckets_by_2);
+    AddRangePartitionWithSchema(
+        schema, lower, upper,
+        {{{ColumnId(0)}, 3, 0}, {{ColumnId(1)}, 2, 0}}, &ps_pb);
   }
 
   // [(a4, _), (_, _))
@@ -1455,13 +1376,17 @@ TEST_F(PartitionTest, 
NoHashSchemasForLastUnboundedRange) {
     KuduPartialRow lower(&schema);
     KuduPartialRow upper(&schema);
     ASSERT_OK(lower.SetStringCopy("a", "a4"));
-    bounds.emplace_back(lower, upper);
-    range_hash_schemas.emplace_back(PartitionSchema::HashSchema());
+    AddRangePartitionWithSchema(schema, lower, upper, {}, &ps_pb);
   }
 
+  PartitionSchema ps;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  CheckSerializationFunctions(ps_pb, ps, schema);
+
+  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b)", ps.DebugString(schema));
+
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(
-      {}, bounds, range_hash_schemas, schema, &partitions));
+  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
   ASSERT_EQ(10, partitions.size());
 
   {
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index cad1ab7ff..d02838979 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -436,7 +436,7 @@ Status PartitionSchema::EncodeRangeBounds(
   auto& bounds_whs = *bounds_with_hash_schemas;
   DCHECK(bounds_whs.empty());
   if (range_bounds.empty()) {
-    bounds_whs.emplace_back(RangeWithHashSchema{"", "", {}});
+    bounds_whs.emplace_back(RangeWithHashSchema{"", "", hash_schema_});
     return Status::OK();
   }
 
@@ -460,7 +460,7 @@ Status PartitionSchema::EncodeRangeBounds(
           "range partition lower bound must be less than the upper bound",
           RangePartitionDebugString(bound.first, bound.second));
     }
-    RangeWithHashSchema temp{std::move(lower), std::move(upper), {}};
+    RangeWithHashSchema temp{std::move(lower), std::move(upper), hash_schema_};
     if (!range_hash_schemas.empty()) {
       temp.hash_schema = range_hash_schemas[j++];
     }
@@ -499,11 +499,13 @@ Status PartitionSchema::SplitRangeBounds(
     const Schema& schema,
     const vector<string>& splits,
     RangesWithHashSchemas* bounds_with_hash_schemas) const {
+  DCHECK(bounds_with_hash_schemas);
   if (splits.empty()) {
     return Status::OK();
   }
 
-  auto expected_bounds = std::max(1UL, bounds_with_hash_schemas->size()) + 
splits.size();
+  const auto expected_bounds =
+      std::max(1UL, bounds_with_hash_schemas->size()) + splits.size();
   RangesWithHashSchemas new_bounds_with_hash_schemas;
   new_bounds_with_hash_schemas.reserve(expected_bounds);
 
@@ -526,12 +528,12 @@ Status PartitionSchema::SplitRangeBounds(
       // Split the current bound. Add the lower section to the result list,
       // and continue iterating on the upper section.
       new_bounds_with_hash_schemas.emplace_back(
-          RangeWithHashSchema{std::move(lower), *split, {}});
+          RangeWithHashSchema{std::move(lower), *split, hash_schema_});
       lower = *split;
     }
 
     new_bounds_with_hash_schemas.emplace_back(
-        RangeWithHashSchema{std::move(lower), upper, {}});
+        RangeWithHashSchema{std::move(lower), upper, hash_schema_});
   }
 
   if (split != splits.end()) {
@@ -546,46 +548,23 @@ Status PartitionSchema::SplitRangeBounds(
 Status PartitionSchema::CreatePartitions(
     const vector<KuduPartialRow>& split_rows,
     const vector<pair<KuduPartialRow, KuduPartialRow>>& range_bounds,
-    const vector<HashSchema>& range_hash_schemas,
     const Schema& schema,
     vector<Partition>* partitions) const {
-  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  DCHECK(partitions);
 
-  if (!range_hash_schemas.empty()) {
-    if (!split_rows.empty()) {
-      return Status::InvalidArgument("Both 'split_rows' and 
'range_hash_schemas' cannot be "
-                                     "populated at the same time.");
-    }
-    if (range_bounds.size() != range_hash_schemas.size()) {
-      return Status::InvalidArgument(
-          Substitute("$0 vs $1: per range hash schemas and range bounds "
-                     "must have the same size",
-                     range_hash_schemas.size(), range_bounds.size()));
-    }
-  }
-
-  auto base_hash_partitions = GenerateHashPartitions(hash_schema_, 
hash_encoder);
-
-  std::unordered_set<int> range_column_idxs;
-  for (const ColumnId& column_id : range_schema_.column_ids) {
-    int column_idx = schema.find_column_by_id(column_id);
-    if (column_idx == Schema::kColumnNotFound) {
-      return Status::InvalidArgument(Substitute("range partition column ID $0 "
-                                                "not found in table schema.", 
column_id));
-    }
-    if (!InsertIfNotPresent(&range_column_idxs, column_idx)) {
-      return Status::InvalidArgument("duplicate column in range partition",
-                                     schema.column(column_idx).name());
-    }
-  }
+  RETURN_NOT_OK(CheckRangeSchema(schema));
 
   RangesWithHashSchemas bounds_with_hash_schemas;
-  vector<string> splits;
-  RETURN_NOT_OK(EncodeRangeBounds(range_bounds, range_hash_schemas, schema,
+  RETURN_NOT_OK(EncodeRangeBounds(range_bounds, {}, schema,
                                   &bounds_with_hash_schemas));
+  vector<string> splits;
   RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits));
   RETURN_NOT_OK(SplitRangeBounds(schema, splits, &bounds_with_hash_schemas));
 
+  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  const auto base_hash_partitions =
+      GenerateHashPartitions(hash_schema_, hash_encoder);
+
   // Even if no hash partitioning for a table is specified, there must be at
   // least one element in 'base_hash_partitions': it's used to build the result
   // set of range partitions.
@@ -594,112 +573,73 @@ Status PartitionSchema::CreatePartitions(
   DCHECK(base_hash_partitions.size() > 1 ||
          base_hash_partitions.front().hash_buckets().empty());
 
-  // Maps each partition to its respective hash schema within
-  // 'bounds_with_hash_schemas', needed for the logic later in this method
-  // for filling in holes in partition key space. The container is empty if no
-  // per-range hash schemas are present.
-  vector<size_t> partition_idx_to_hash_schema_idx;
-
-  if (range_hash_schemas.empty()) {
-    // Create a partition per range bound and hash bucket combination.
-    vector<Partition> new_partitions;
-    for (const Partition& base_partition : base_hash_partitions) {
-      for (const auto& bound : bounds_with_hash_schemas) {
-        Partition p = base_partition;
-        p.begin_.mutable_range_key()->append(bound.lower);
-        p.end_.mutable_range_key()->append(bound.upper);
-        new_partitions.emplace_back(std::move(p));
-      }
+  // Create a partition per range bound and hash bucket combination.
+  vector<Partition> new_partitions;
+  for (const Partition& base_partition : base_hash_partitions) {
+    for (const auto& bound : bounds_with_hash_schemas) {
+      Partition p = base_partition;
+      p.begin_.mutable_range_key()->append(bound.lower);
+      p.end_.mutable_range_key()->append(bound.upper);
+      new_partitions.emplace_back(std::move(p));
     }
-    *partitions = std::move(new_partitions);
-  } else {
-    // The number of ranges should match the size of range_hash_schemas.
-    DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size());
-    // No split rows should be defined if range_hash_schemas is populated.
-    DCHECK(split_rows.empty());
-    vector<Partition> result_partitions;
-    // Iterate through each bound and its hash schemas to generate hash 
partitions.
-    for (size_t i = 0; i < bounds_with_hash_schemas.size(); ++i) {
-      const auto& bound = bounds_with_hash_schemas[i];
-      vector<Partition> current_bound_hash_partitions = GenerateHashPartitions(
-          bound.hash_schema, hash_encoder);
-      // Add range information to the partition key.
-      for (Partition& p : current_bound_hash_partitions) {
-        DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString();
-        p.begin_.mutable_range_key()->assign(bound.lower);
-        DCHECK(p.end().range_key().empty());
-        p.end_.mutable_range_key()->assign(bound.upper);
-        partition_idx_to_hash_schema_idx.emplace_back(i);
-      }
-      result_partitions.insert(result_partitions.end(),
-                               
std::make_move_iterator(current_bound_hash_partitions.begin()),
-                               
std::make_move_iterator(current_bound_hash_partitions.end()));
-    }
-    DCHECK_EQ(partition_idx_to_hash_schema_idx.size(), 
result_partitions.size());
-    *partitions = std::move(result_partitions);
   }
-  // Note: the following discussion and logic only takes effect when the 
table's
-  // partition schema includes at least one hash bucket component, and the
-  // absolute upper and/or absolute lower range bound is unbounded.
-  //
-  // At this point, we have the full set of partitions built up, but each
-  // partition only covers a finite slice of the partition key-space. Some
-  // operations involving partitions are easier (pruning, client meta cache) if
-  // it can be assumed that the partition keyspace does not have holes.
-  //
-  // In order to 'fill in' the partition key space, the absolute first and last
-  // partitions are extended to cover the rest of the lower and upper partition
-  // range by clearing the start and end partition key, respectively.
-  //
-  // When the table has two or more hash components, there will be gaps in
-  // between partitions at the boundaries of the component ranges. Similar to
-  // the absolute start and end case, these holes are filled by clearing the
-  // partition key beginning at the hash component. For a concrete example,
-  // see PartitionTest::TestCreatePartitions.
-  for (size_t partition_idx = 0; partition_idx < partitions->size(); 
++partition_idx) {
-    auto& p = (*partitions)[partition_idx];
-    const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size());
-    // Find the first nonzero-valued bucket from the end and truncate the
-    // partition key starting from that bucket onwards for zero-valued buckets.
-    //
-    // TODO(aserbin): is this really necessary -- zeros in hash key are the
-    //                minimum possible values, and the range part is already
-    //                empty?
-    if (p.begin().range_key().empty()) {
-      for (int i = hash_buckets_num - 1; i >= 0; --i) {
-        if (p.hash_buckets()[i] != 0) {
-          break;
-        }
-        p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i);
-      }
-    }
-    // Starting from the last hash bucket, truncate the partition key until we 
hit the first
-    // non-max-valued bucket, at which point, replace the encoding with the 
next-incremented
-    // bucket value. For example, the following range end partition keys 
should be transformed,
-    // where the key is (hash_comp1, hash_comp2, range_comp):
-    //
-    // [ (0, 0, "") -> (0, 1, "") ]
-    // [ (0, 1, "") -> (1, _, "") ]
-    // [ (1, 0, "") -> (1, 1, "") ]
-    // [ (1, 1, "") -> (_, _, "") ]
-    if (p.end().range_key().empty()) {
-      const auto& hash_schema = range_hash_schemas.empty()
-          ? hash_schema_
-          : 
bounds_with_hash_schemas[partition_idx_to_hash_schema_idx[partition_idx]].hash_schema;
-      for (int i = hash_buckets_num - 1; i >= 0; --i) {
-        p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i);
-        const int32_t hash_bucket = p.hash_buckets()[i] + 1;
-        if (hash_bucket != hash_schema[i].num_buckets) {
-          hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key());
-          break;
-        }
-      }
+
+  UpdatePartitionBoundaries(&new_partitions);
+  *partitions = std::move(new_partitions);
+
+  return Status::OK();
+}
+
+Status PartitionSchema::CreatePartitions(
+      const RangesWithHashSchemas& ranges_with_hash_schemas,
+      const Schema& schema,
+      vector<Partition>* partitions) const {
+  DCHECK(partitions);
+
+  RETURN_NOT_OK(CheckRangeSchema(schema));
+
+  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  vector<Partition> result_partitions;
+  // Iterate through each bound and its hash schemas to generate hash 
partitions.
+  for (size_t i = 0; i < ranges_with_hash_schemas.size(); ++i) {
+    const auto& range = ranges_with_hash_schemas[i];
+    vector<Partition> current_bound_hash_partitions = GenerateHashPartitions(
+        range.hash_schema, hash_encoder);
+    // Add range information to the partition key.
+    for (Partition& p : current_bound_hash_partitions) {
+      DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString();
+      p.begin_.mutable_range_key()->assign(range.lower);
+      DCHECK(p.end().range_key().empty());
+      p.end_.mutable_range_key()->assign(range.upper);
     }
+    result_partitions.insert(result_partitions.end(),
+                             
std::make_move_iterator(current_bound_hash_partitions.begin()),
+                             
std::make_move_iterator(current_bound_hash_partitions.end()));
   }
 
+  UpdatePartitionBoundaries(&result_partitions);
+  *partitions = std::move(result_partitions);
+
   return Status::OK();
 }
 
+Status PartitionSchema::CreatePartitions(const Schema& schema,
+                                         vector<Partition>* partitions) const {
+  return CreatePartitions(ranges_with_hash_schemas_, schema, partitions);
+}
+
+Status PartitionSchema::CreatePartitionsForRange(
+    const pair<KuduPartialRow, KuduPartialRow>& range_bound,
+    const HashSchema& range_hash_schema,
+    const Schema& schema,
+    std::vector<Partition>* partitions) const {
+  RangesWithHashSchemas ranges_with_hash_schemas;
+  RETURN_NOT_OK(EncodeRangeBounds(
+      {range_bound}, {range_hash_schema}, schema, &ranges_with_hash_schemas));
+
+  return CreatePartitions(ranges_with_hash_schemas, schema, partitions);
+}
+
 template<typename Row>
 bool PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
                                                const Row& row) const {
@@ -1376,6 +1316,83 @@ Status PartitionSchema::Validate(const Schema& schema) 
const {
   return Status::OK();
 }
 
+Status PartitionSchema::CheckRangeSchema(const Schema& schema) const {
+  std::unordered_set<int> range_column_idxs;
+  for (const ColumnId& column_id : range_schema_.column_ids) {
+    const auto column_idx = schema.find_column_by_id(column_id);
+    if (column_idx == Schema::kColumnNotFound) {
+      return Status::InvalidArgument(Substitute(
+          "range partition column ID $0 not found in table schema", 
column_id));
+    }
+    if (!InsertIfNotPresent(&range_column_idxs, column_idx)) {
+      return Status::InvalidArgument("duplicate column in range partition",
+                                     schema.column(column_idx).name());
+    }
+  }
+  return Status::OK();
+}
+
+void PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) 
const {
+  // Note: the following discussion and logic only takes effect when the 
table's
+  // partition schema includes at least one hash bucket component, and the
+  // absolute upper and/or absolute lower range bound is unbounded.
+  //
+  // At this point, we have the full set of partitions built up, but each
+  // partition only covers a finite slice of the partition key-space. Some
+  // operations involving partitions are easier (pruning, client meta cache) if
+  // it can be assumed that the partition keyspace does not have holes.
+  //
+  // In order to 'fill in' the partition key space, the absolute first and last
+  // partitions are extended to cover the rest of the lower and upper partition
+  // range by clearing the start and end partition key, respectively.
+  //
+  // When the table has two or more hash components, there will be gaps in
+  // between partitions at the boundaries of the component ranges. Similar to
+  // the absolute start and end case, these holes are filled by clearing the
+  // partition key beginning at the hash component. For a concrete example,
+  // see PartitionTest::TestCreatePartitions.
+  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  for (size_t partition_idx = 0; partition_idx < partitions->size(); 
++partition_idx) {
+    auto& p = (*partitions)[partition_idx];
+    const auto& hash_schema = GetHashSchemaForRange(p.begin().range_key());
+    CHECK_EQ(hash_schema.size(), p.hash_buckets().size());
+    const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size());
+    // Find the first nonzero-valued bucket from the end and truncate the
+    // partition key starting from that bucket onwards for zero-valued buckets.
+    //
+    // TODO(aserbin): is this really necessary -- zeros in hash key are the
+    //                minimum possible values, and the range part is already
+    //                empty?
+    if (p.begin().range_key().empty()) {
+      for (int i = hash_buckets_num - 1; i >= 0; --i) {
+        if (p.hash_buckets()[i] != 0) {
+          break;
+        }
+        p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i);
+      }
+    }
+    // Starting from the last hash bucket, truncate the partition key until we 
hit the first
+    // non-max-valued bucket, at which point, replace the encoding with the 
next-incremented
+    // bucket value. For example, the following range end partition keys 
should be transformed,
+    // where the key is (hash_comp1, hash_comp2, range_comp):
+    //
+    // [ (0, 0, "") -> (0, 1, "") ]
+    // [ (0, 1, "") -> (1, _, "") ]
+    // [ (1, 0, "") -> (1, 1, "") ]
+    // [ (1, 1, "") -> (_, _, "") ]
+    if (p.end().range_key().empty()) {
+      for (int i = hash_buckets_num - 1; i >= 0; --i) {
+        p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i);
+        const int32_t hash_bucket = p.hash_buckets()[i] + 1;
+        if (hash_bucket != hash_schema[i].num_buckets) {
+          hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key());
+          break;
+        }
+      }
+    }
+  }
+}
+
 namespace {
 
   // Increments an unset column in the row.
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index fd03d5419..0cddc64d2 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -286,10 +286,11 @@ class PartitionSchema {
   // in each dimension of the hash schema.
   typedef std::vector<HashDimension> HashSchema;
 
+  // A structure representing a range with custom hash schema.
   struct RangeWithHashSchema {
-    std::string lower;
-    std::string upper;
-    HashSchema hash_schema;
+    std::string lower;      // encoded range key: lower boundary
+    std::string upper;      // encoded range key: upper boundary
+    HashSchema hash_schema; // hash schema for the range
   };
   typedef std::vector<RangeWithHashSchema> RangesWithHashSchemas;
 
@@ -322,14 +323,22 @@ class PartitionSchema {
   // of resulting partitions is the product of the number of hash buckets for
   // each hash bucket component, multiplied by
   // (split_rows.size() + max(1, range_bounds.size())).
-  // 'range_hash_schemas' contains each range's HashSchema,
-  // its order corresponds to the bounds in 'range_bounds'.
-  // If 'range_hash_schemas' is empty, the table wide hash schema is used per 
range.
-  // Size of 'range_hash_schemas' and 'range_bounds' are equal if 
'range_hash_schema' isn't empty.
   Status CreatePartitions(
       const std::vector<KuduPartialRow>& split_rows,
       const std::vector<std::pair<KuduPartialRow, KuduPartialRow>>& 
range_bounds,
-      const std::vector<HashSchema>& range_hash_schemas,
+      const Schema& schema,
+      std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
+
+  // Similar to the method above, but uses the 'ranges_with_hash_schemas_'
+  // member field as the input to produce the result partitions.
+  Status CreatePartitions(
+      const Schema& schema,
+      std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
+
+  // Create the set of partitions for a single range with specified hash 
schema.
+  Status CreatePartitionsForRange(
+      const std::pair<KuduPartialRow, KuduPartialRow>& range_bound,
+      const HashSchema& range_hash_schema,
       const Schema& schema,
       std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
 
@@ -512,6 +521,13 @@ class PartitionSchema {
       const HashSchema& hash_schema,
       const KeyEncoder<std::string>& hash_encoder);
 
+  // Create the set of partitions given the specified ranges with per-range
+  // hash schemas. The 'partitions' output parameter must be non-null.
+  Status CreatePartitions(
+      const RangesWithHashSchemas& ranges_with_hash_schemas,
+      const Schema& schema,
+      std::vector<Partition>* partitions) const;
+
   // PartitionKeyDebugString implementation for row types.
   template<typename Row>
   std::string PartitionKeyDebugStringImpl(const Row& row) const;
@@ -579,6 +595,20 @@ class PartitionSchema {
   // appropriate error code for an invalid partition schema.
   Status Validate(const Schema& schema) const;
 
+  // Check the range partition schema for consistency: make sure the columns
+  // used in the range partitioning are present in the table's schema and there
+  // aren't any duplicate columns.
+  Status CheckRangeSchema(const Schema& schema) const;
+
+  // Update partitions' boundaries provided with 'partitions' in-out parameter,
+  // assuming the 'partitions' are populated by calling the CreatePartitions()
+  // method. When a table has two or more hash components, there will be gaps
+  // in between partitions at the boundaries of the component ranges. This
+  // method fills in the address space to have the proper ordering of the
+  // serialized partition keys -- that's important for partition pruning and
+  // overall ordering of the serialized partition keys.
+  void UpdatePartitionBoundaries(std::vector<Partition>* partitions) const;
+
   // Validates the split rows, converts them to partition key form, and inserts
   // them into splits in sorted order.
   Status EncodeRangeSplits(const std::vector<KuduPartialRow>& split_rows,
diff --git a/src/kudu/common/partition_pruner-test.cc 
b/src/kudu/common/partition_pruner-test.cc
index 1552eecef..5f4a41246 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -73,8 +73,6 @@ class PartitionPrunerTest : public KuduTest {
       const vector<ColumnNameAndIntValue>& lower_int_cols,
       const vector<ColumnNameAndIntValue>& upper_int_cols,
       const vector<ColumnNamesNumBucketsAndSeed>& hash_schemas,
-      vector<pair<KuduPartialRow, KuduPartialRow>>* bounds,
-      vector<PartitionSchema::HashSchema>* range_hash_schemas,
       PartitionSchemaPB* pb);
 };
 
@@ -137,8 +135,6 @@ void PartitionPrunerTest::AddRangePartitionWithSchema(
     const vector<ColumnNameAndIntValue>& lower_int_cols,
     const vector<ColumnNameAndIntValue>& upper_int_cols,
     const vector<ColumnNamesNumBucketsAndSeed>& hash_buckets_info,
-    vector<pair<KuduPartialRow, KuduPartialRow>>* bounds,
-    vector<PartitionSchema::HashSchema>* range_hash_schemas,
     PartitionSchemaPB* pb) {
   auto* range = pb->add_custom_hash_schema_ranges();
   RowOperationsPBEncoder encoder(range->mutable_range_bounds());
@@ -172,8 +168,6 @@ void PartitionPrunerTest::AddRangePartitionWithSchema(
     hash_dimension.seed = get<2>(hash_bucket_info);
     hash_schema.emplace_back(hash_dimension);
   }
-  range_hash_schemas->emplace_back(std::move(hash_schema));
-  bounds->emplace_back(lower, upper);
 }
 
 TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
@@ -202,7 +196,7 @@ TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
   ASSERT_OK(split2.SetInt8("c", 10));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, 
&partitions));
 
   // Creates a scan with optional lower and upper bounds, and checks that the
   // expected number of tablets are pruned.
@@ -320,7 +314,7 @@ TEST_F(PartitionPrunerTest, 
TestPartialPrimaryKeyRangePruning) {
   ASSERT_OK(split2.SetStringCopy("b", "r"));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, 
&partitions));
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
@@ -429,7 +423,7 @@ TEST_F(PartitionPrunerTest, 
TestIntPartialPrimaryKeyRangePruning) {
   ASSERT_OK(split.SetInt8("b", 0));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, schema, 
&partitions));
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
@@ -518,7 +512,7 @@ TEST_F(PartitionPrunerTest, TestRangePruning) {
   ASSERT_OK(split2.SetStringCopy("b", "r"));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, 
&partitions));
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
@@ -687,7 +681,7 @@ TEST_F(PartitionPrunerTest, TestHashPruning) {
     ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
     vector<Partition> partitions;
-    ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, 
{},
+    ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {},
                                                        schema, &partitions));
 
 
@@ -763,7 +757,7 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
+  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {},
                                                      schema, &partitions));
 
 
@@ -841,10 +835,9 @@ TEST_F(PartitionPrunerTest, 
TestMultiColumnInListHashPruning) {
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
+  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {},
                                                      schema, &partitions));
 
-
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
   const auto check = [&] (const vector<ColumnPredicate>& predicates,
@@ -940,7 +933,7 @@ TEST_F(PartitionPrunerTest, TestPruning) {
 
   vector<Partition> partitions;
   ASSERT_OK(partition_schema.CreatePartitions(
-      vector<KuduPartialRow>{ split }, {}, {}, schema, &partitions));
+      vector<KuduPartialRow>{ split }, {},  schema, &partitions));
   ASSERT_EQ(4, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -1047,7 +1040,7 @@ TEST_F(PartitionPrunerTest, TestKudu2173) {
   KuduPartialRow split1(&schema);
   ASSERT_OK(split1.SetInt8("a", 10));
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, schema, 
&partitions));
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
@@ -1099,9 +1092,6 @@ TEST_F(PartitionPrunerTest, 
DISABLED_TestHashSchemasPerRangePruning) {
   PartitionSchemaPB pb;
   CreatePartitionSchemaPB({"C"}, { {{"A"}, 2, 0}, {{"B"}, 2, 0} }, &pb);
 
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-
   // Need to add per range hash schema components to the field
   // 'ranges_with_hash_schemas_' of PartitionSchema because PartitionPruner 
will
   // use them to construct partition key ranges. Currently,
@@ -1112,26 +1102,24 @@ TEST_F(PartitionPrunerTest, 
DISABLED_TestHashSchemasPerRangePruning) {
 
   // [(_, _, a), (_, _, c))
   AddRangePartitionWithSchema(schema, {{"C", "a"}}, {{"C", "c"}}, {}, {},
-                              { {{"A"}, 3, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"A"}, 3, 0} }, &pb);
 
   // [(_, _, d), (_, _, f))
   AddRangePartitionWithSchema(schema, {{"C", "d"}}, {{"C", "f"}}, {}, {},
-                              { {{"A"}, 2, 0}, {{"B"}, 3, 0} },
-                              &bounds, &range_hash_schemas, &pb);
+                              { {{"A"}, 2, 0}, {{"B"}, 3, 0} }, &pb);
 
   // [(_, _, h), (_, _, j))
-  AddRangePartitionWithSchema(schema, {{"C", "h"}}, {{"C", "j"}}, {}, {},
-                              {}, &bounds, &range_hash_schemas, &pb);
+  AddRangePartitionWithSchema(schema, {{"C", "h"}}, {{"C", "j"}}, {}, {}, {}, 
&pb);
 
   // [(_, _, k), (_, _, m))
   AddRangePartitionWithSchema(schema, {{"C", "k"}}, {{"C", "m"}}, {}, {},
-                              { {{"B"}, 2, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"B"}, 2, 0} }, &pb);
 
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
   ASSERT_EQ(12, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -1291,26 +1279,23 @@ TEST_F(PartitionPrunerTest, 
TestHashSchemasPerRangeWithPartialPrimaryKeyRangePru
   PartitionSchemaPB pb;
   CreatePartitionSchemaPB({"a", "b"}, {}, &pb);
 
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-
   // [(0, 0, _), (2, 2, _))
   AddRangePartitionWithSchema(schema, {}, {}, {{"a", 0}, {"b", 0}}, {{"a", 2}, 
{"b", 2}},
-                              { {{"c"}, 2, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"c"}, 2, 0} }, &pb);
 
   // [(2, 2, _), (4, 4, _))
   AddRangePartitionWithSchema(schema, {}, {}, {{"a", 2}, {"b", 2}}, {{"a", 4}, 
{"b", 4}},
-                              { {{"c"}, 3, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"c"}, 3, 0} }, &pb);
 
   // [(4, 4, _), (6, 6, _))
   AddRangePartitionWithSchema(schema, {}, {}, {{"a", 4}, {"b", 4}}, {{"a", 6}, 
{"b", 6}},
-                              { {{"c"}, 4, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"c"}, 4, 0} }, &pb);
 
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   Arena arena(1024);
@@ -1398,28 +1383,25 @@ TEST_F(PartitionPrunerTest, 
TestInListHashPruningPerRange) {
   PartitionSchemaPB pb;
   CreatePartitionSchemaPB({"A"}, { {{"B", "C"}, 3, 0} }, &pb);
 
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-
   // None of the ranges below uses the table-wide hash schema.
 
   // [(a, _, _), (c, _, _))
   AddRangePartitionWithSchema(schema, {{"A", "a"}}, {{"A", "c"}}, {}, {},
-                              { {{"B"}, 3, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"B"}, 3, 0} }, &pb);
 
   // [(c, _, _), (e, _, _))
   AddRangePartitionWithSchema(schema, {{"A", "c"}}, {{"A", "e"}}, {}, {},
-                              {}, &bounds, &range_hash_schemas, &pb);
+                              {}, &pb);
 
   // [(e, _, _), (g, _, _))
   AddRangePartitionWithSchema(schema, {{"A", "e"}}, {{"A", "g"}}, {}, {},
-                              { {{"C"}, 3, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"C"}, 3, 0} }, &pb);
 
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
   ASSERT_EQ(7, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -1497,30 +1479,26 @@ TEST_F(PartitionPrunerTest, 
DISABLED_TestSingleRangeElementAndBoundaryCase) {
   PartitionSchemaPB pb;
   CreatePartitionSchemaPB({"A"}, {}, &pb);
 
-  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-
   // [(_, _), (1, _))
   AddRangePartitionWithSchema(schema, {}, {}, {}, {{"A", 1}},
-                              {{{"B"}, 4, 0}}, &bounds, &range_hash_schemas, 
&pb);
+                              {{{"B"}, 4, 0}}, &pb);
 
   // [(1, _), (2, _))
   AddRangePartitionWithSchema(schema, {}, {}, {{"A", 1}}, {{"A", 2}},
-                              { {{"B"}, 2, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"B"}, 2, 0} }, &pb);
 
   // [(2, _), (3, _))
   AddRangePartitionWithSchema(schema, {}, {}, {{"A", 2}}, {{"A", 3}},
-                              { {{"B"}, 3, 0} }, &bounds, &range_hash_schemas, 
&pb);
+                              { {{"B"}, 3, 0} }, &pb);
 
   // [(3, _), (_, _))
-  AddRangePartitionWithSchema(schema, {}, {}, {{"A", 3}}, {},
-                              {}, &bounds, &range_hash_schemas, &pb);
+  AddRangePartitionWithSchema(schema, {}, {}, {{"A", 3}}, {}, {}, &pb);
 
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
   ASSERT_EQ(10, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
diff --git a/src/kudu/common/scan_spec-test.cc 
b/src/kudu/common/scan_spec-test.cc
index a43cdfeab..14a2a46de 100644
--- a/src/kudu/common/scan_spec-test.cc
+++ b/src/kudu/common/scan_spec-test.cc
@@ -432,7 +432,7 @@ TEST_F(CompositeIntKeysTest, OneHashKeyInListHashPruning) {
   GeneratePartitionSchema(schema, { { { "a" }, 3 } }, {}, &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   // Verify the splitted values can merge into original set without 
overlapping.
@@ -474,7 +474,7 @@ TEST_F(CompositeIntKeysTest, 
OneHashKeyOneRangeKeyInListHashPruning) {
 
   vector<Partition> partitions;
   ASSERT_OK(partition_schema.CreatePartitions(
-      { split1, split2 }, {}, {}, schema, &partitions));
+      { split1, split2 }, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   ASSERT_EQ("a IN () AND b IN (50, 100)",
@@ -552,7 +552,7 @@ TEST_F(CompositeIntKeysTest, 
OneHashKeyMultiRangeKeyInListHashPruning) {
 
   vector<Partition> partitions;
   ASSERT_OK(partition_schema.CreatePartitions(
-      { split1, split2 }, {}, {}, schema, &partitions));
+      { split1, split2 }, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   ASSERT_EQ("PK >= (int8 a=4, int8 b=0, int8 c=-128) AND "
@@ -630,7 +630,7 @@ TEST_F(CompositeIntKeysTest, 
DifferentHashRangeKeyInListHashPruning) {
 
   vector<Partition> partitions;
   ASSERT_OK(partition_schema.CreatePartitions(
-      { split1, split2 }, {}, {}, schema, &partitions));
+      { split1, split2 }, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   ASSERT_EQ("PK >= (int8 a=4, int8 b=0, int8 c=-128) AND "
@@ -702,7 +702,7 @@ TEST_F(CompositeIntKeysTest, 
HashKeyInListHashPruningEmptyDetect) {
       schema, { { { "a" }, 3 } }, {}, &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   ASSERT_EQ("PK >= (int8 a=4, int8 b=50, int8 c=-128) AND "
@@ -734,7 +734,7 @@ TEST_F(CompositeIntKeysTest, 
MultiHashKeyOneColumnInListHashPruning) {
       schema, { { { "a" }, 3 }, { { "b" }, 3 }, }, {}, &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   // p1, p2, p3 should have the same predicate values to be pushed on hash(a).
@@ -807,7 +807,7 @@ TEST_F(CompositeIntKeysTest, 
MultiHashColumnsInListHashPruning) {
       schema, { { { "a", "b" }, 3 } }, {}, &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   ASSERT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
@@ -844,7 +844,7 @@ TEST_F(CompositeIntKeysTest, 
MultiHashKeyMultiHashInListHashPruning) {
       schema, { { { "a", "b" }, 3 }, { { "c" }, 3 } }, {}, &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   ASSERT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=40) AND "
@@ -922,7 +922,7 @@ TEST_F(CompositeIntKeysTest, NonKeyValuesInListHashPruning) 
{
       schema, { { { "a" }, 3 } }, {}, &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, 
&partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   ASSERT_EQ("d IN (1, 2, 3, 4, 5)",
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc 
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 57e6e6b43..1b898d0e6 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1018,7 +1018,7 @@ Status GetPartitionForTxnStatusTablet(int64_t 
start_txn_id, int64_t end_txn_id,
   RETURN_NOT_OK(upper_bound.SetInt64(TxnStatusTablet::kTxnIdColName, 
end_txn_id));
   vector<Partition> ps;
   RETURN_NOT_OK(pschema.CreatePartitions(/*split_rows=*/{},
-      { std::make_pair(lower_bound, upper_bound) }, {}, schema, &ps));
+      { std::make_pair(lower_bound, upper_bound) }, schema, &ps));
   *partition = ps[0];
   *partition_schema = pschema;
   return Status::OK();
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index ab3380f23..374587be6 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1917,31 +1917,33 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
     }
   }
 
-  // TODO(aserbin): make sure range boundaries in
-  //                req.partition_schema().custom_hash_schema_ranges()
-  //                correspond to range_bounds?
-  vector<PartitionSchema::HashSchema> range_hash_schemas;
-  if (FLAGS_enable_per_range_hash_schemas) {
-    // TODO(aserbin): the signature of CreatePartitions() require the
-    //                'range_hash_schemas' parameters: update its signature
-    //                to remove the extra parameter and rely on its
-    //                'ranges_with_hash_schemas_' member field; the path in
-    //                CatalogManager::ApplyAlterPartitioningSteps() involving
-    //                CreatePartitions() should be updated correspondingly.
-    const auto& ps = req.partition_schema();
-    for (int i = 0; i < ps.custom_hash_schema_ranges_size(); i++) {
-      PartitionSchema::HashSchema hash_schema;
-      RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
-          schema, ps.custom_hash_schema_ranges(i).hash_schema(), 
&hash_schema));
-      range_hash_schemas.emplace_back(std::move(hash_schema));
+  vector<Partition> partitions;
+  if (const auto& ps = req.partition_schema();
+      FLAGS_enable_per_range_hash_schemas && 
!ps.custom_hash_schema_ranges().empty()) {
+    // TODO(aserbin): in addition, should switch to specifying range 
information
+    //                only via 'PartitionSchemaPB::custom_hash_schema_ranges' 
or
+    //                'CreateTableRequestPB::split_rows_range_bounds', don't 
mix
+    if (!split_rows.empty()) {
+      return Status::InvalidArgument(
+          "both split rows and custom hash schema ranges cannot be "
+          "populated at the same time");
     }
+    if (const auto ranges_with_hash_schemas_size =
+            partition_schema.ranges_with_hash_schemas().size();
+        range_bounds.size() != ranges_with_hash_schemas_size) {
+      return Status::InvalidArgument(
+          Substitute("$0 vs $1: per range hash schemas and range bounds "
+                     "must have the same size",
+                     ranges_with_hash_schemas_size, range_bounds.size()));
+    }
+    // Create partitions based on specified ranges with custom hash schemas.
+    RETURN_NOT_OK(partition_schema.CreatePartitions(schema, &partitions));
+  } else {
+    // Create partitions based on specified partition schema and split rows.
+    RETURN_NOT_OK(partition_schema.CreatePartitions(
+        split_rows, range_bounds, schema, &partitions));
   }
 
-  // Create partitions based on specified partition schema and split rows.
-  vector<Partition> partitions;
-  RETURN_NOT_OK(partition_schema.CreatePartitions(
-      split_rows, range_bounds, range_hash_schemas, schema, &partitions));
-
   // Check the restriction on the same number of hash dimensions across all the
   // ranges. Also, check that the table-wide hash schema has the same number
   // of hash dimensions as all the partitions with custom hash schemas.
@@ -2628,29 +2630,15 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
 
   vector<PartitionSchema::HashSchema> range_hash_schemas;
   for (const auto& step : steps) {
+    CHECK(step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION ||
+          step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION);
+    const auto& range_bouds =
+        step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION
+        ? step.add_range_partition().range_bounds()
+        : step.drop_range_partition().range_bounds();
+    RowOperationsPBDecoder decoder(&range_bouds, &client_schema, &schema, 
nullptr);
     vector<DecodedRowOperation> ops;
-    if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
-      if (FLAGS_enable_per_range_hash_schemas &&
-          step.add_range_partition().custom_hash_schema_size() > 0) {
-        const Schema schema = client_schema.CopyWithColumnIds();
-        PartitionSchema::HashSchema hash_schema;
-        RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
-            schema, step.add_range_partition().custom_hash_schema(), 
&hash_schema));
-        if (partition_schema.hash_schema().size() != hash_schema.size()) {
-          return Status::NotSupported(
-              "varying number of hash dimensions per range is not yet 
supported");
-        }
-        range_hash_schemas.emplace_back(std::move(hash_schema));
-      }
-      RowOperationsPBDecoder 
decoder(&step.add_range_partition().range_bounds(),
-                                     &client_schema, &schema, nullptr);
-      RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
-    } else {
-      CHECK_EQ(step.type(), AlterTableRequestPB::DROP_RANGE_PARTITION);
-      RowOperationsPBDecoder 
decoder(&step.drop_range_partition().range_bounds(),
-                                     &client_schema, &schema, nullptr);
-      RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
-    }
+    RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
 
     if (ops.size() != 2) {
       return Status::InvalidArgument(
@@ -2677,8 +2665,26 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
     }
 
     vector<Partition> partitions;
-    RETURN_NOT_OK(partition_schema.CreatePartitions(
-        {}, {{ *ops[0].split_row, *ops[1].split_row }}, range_hash_schemas, 
schema, &partitions));
+    const pair<KuduPartialRow, KuduPartialRow> range_bound =
+        { *ops[0].split_row, *ops[1].split_row };
+    if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION &&
+        FLAGS_enable_per_range_hash_schemas &&
+        step.add_range_partition().custom_hash_schema_size() > 0) {
+      const Schema schema = client_schema.CopyWithColumnIds();
+      PartitionSchema::HashSchema hash_schema;
+      RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
+          schema, step.add_range_partition().custom_hash_schema(), 
&hash_schema));
+      if (partition_schema.hash_schema().size() != hash_schema.size()) {
+        return Status::NotSupported(
+            "varying number of hash dimensions per range is not yet 
supported");
+      }
+      RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
+          range_bound, hash_schema, schema, &partitions));
+    } else {
+      RETURN_NOT_OK(partition_schema.CreatePartitions(
+          {}, { range_bound }, schema, &partitions));
+    }
+
     switch (step.type()) {
       case AlterTableRequestPB::ADD_RANGE_PARTITION: {
         for (const Partition& partition : partitions) {
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index d69003f56..d91dbb653 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1049,8 +1049,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
                            range_hash_schemas);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
-                        "Both 'split_rows' and 'range_hash_schemas' cannot be "
-                        "populated at the same time.");
+                        "both split rows and custom hash schema ranges "
+                        "cannot be populated at the same time");
   }
 
   // No non-range columns.
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 5b61cd7f8..23cf59e59 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -332,7 +332,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
 
   vector<KuduPartialRow> split_rows;
   vector<Partition> partitions;
-  RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, {}, {}, schema, 
&partitions));
+  RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, {}, schema, 
&partitions));
   DCHECK_EQ(1, partitions.size());
 
   RETURN_NOT_OK(tablet::TabletMetadata::CreateNew(fs_manager,
diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h
index 9b23445e0..b02655447 100644
--- a/src/kudu/tablet/tablet-harness.h
+++ b/src/kudu/tablet/tablet-harness.h
@@ -53,7 +53,7 @@ static std::pair<PartitionSchema, Partition> 
CreateDefaultPartition(const Schema
 
   // Create the tablet partitions.
   std::vector<Partition> partitions;
-  CHECK_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
+  CHECK_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
   CHECK_EQ(1, partitions.size());
   return std::make_pair(partition_schema, partitions[0]);
 }
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 68fde5aad..15ad33065 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -4122,7 +4122,7 @@ TEST_F(TabletServerTest, TestWriteOutOfBounds) {
   ASSERT_OK(end_row.SetInt32("key", 20));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, {}, 
schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, 
schema, &partitions));
 
   ASSERT_EQ(3, partitions.size());
 

Reply via email to