This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 23ab89db1a1546d93fc3848052d076a9026b3068
Author: Mahesh Reddy <mre...@cloudera.com>
AuthorDate: Wed Dec 9 12:23:26 2020 -0800

    [master] KUDU-2671: Range specific hashing at table creation time.
    
    This patch updates CreateTableRequestPB to allow different
    hash schemas to be defined per range at table creation time.
    This new field is appropriately decoded in catalog_manager.cc.
    
    While this patch handles the logic for creating the correct
    partitions, it does not update the metadata for either the
    table or tablets. The new per-range schemas will need to be
    added to the table metadata in a following patch.
    
    The changes to kudu/common include some refactoring and
    putting functions back into an anonymous namespace.
    
    Change-Id: I8f0dcbc3324f8f2d6e99b4d169fdf5c7f7dff95d
    Reviewed-on: http://gerrit.cloudera.org:8080/16859
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/common/partition-test.cc                  |  10 +-
 src/kudu/common/partition.cc                       |  68 +++++++------
 src/kudu/common/partition.h                        |  11 ++-
 .../integration-tests/table_locations-itest.cc     | 110 ++++++++++++++++++++-
 src/kudu/master/catalog_manager.cc                 |  17 +++-
 src/kudu/master/master-test.cc                     |  73 ++++++++++++--
 src/kudu/master/master.proto                       |  12 +++
 7 files changed, 254 insertions(+), 47 deletions(-)

diff --git a/src/kudu/common/partition-test.cc 
b/src/kudu/common/partition-test.cc
index c5e0873..03670a4 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -985,7 +985,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
                 { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
 
   PartitionSchemaPB schema_builder;
-  // Table-wide HashSchema defined below, 3 by 2 buckets so 6 total.
+  // Table-wide hash schema defined below, 3 by 2 buckets so 6 total.
   AddHashBucketComponent(&schema_builder, { "a", "c" }, 3, 0);
   AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
   PartitionSchema partition_schema;
@@ -1080,5 +1080,13 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
   ASSERT_EQ("Invalid argument: Both 'split_rows' and 'range_hash_schemas' "
             "cannot be populated at the same time.", s.ToString());
 
+  // adding another schema to range_hash_schemas to trigger 
Status::InvalidArgument due to
+  // 'bounds and 'range_hash_schema' not being the same size.
+  range_hash_schemas.emplace_back(PartitionSchema::HashBucketSchemas());
+  Status s1 = partition_schema.CreatePartitions({}, bounds, range_hash_schemas,
+                                               schema, &partitions);
+  ASSERT_EQ("Invalid argument: The number of range bounds does not match the 
number of per "
+            "range hash schemas.", s1.ToString());
+
 }
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 04c6f93..6414e0c 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -111,21 +111,10 @@ void Partition::FromPB(const PartitionPB& pb, Partition* 
partition) {
 }
 
 namespace {
-// Sets a repeated field of column identifiers to the provided column IDs.
-void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
-                          
RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>* identifiers) {
-    identifiers->Reserve(column_ids.size());
-    for (const ColumnId& column_id : column_ids) {
-      identifiers->Add()->set_id(column_id);
-    }
-}
-} // namespace
-
 // Extracts the column IDs from a protobuf repeated field of column 
identifiers.
-Status PartitionSchema::ExtractColumnIds(
-    const RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
-    const Schema& schema,
-    vector<ColumnId>* column_ids) {
+Status ExtractColumnIds(const 
RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
+                        const Schema& schema,
+                        vector<ColumnId>* column_ids) {
   vector<ColumnId> new_column_ids;
   new_column_ids.reserve(identifiers.size());
   for (const auto& identifier : identifiers) {
@@ -136,7 +125,7 @@ Status PartitionSchema::ExtractColumnIds(
           return Status::InvalidArgument("unknown column id", 
SecureDebugString(identifier));
         }
         new_column_ids.emplace_back(std::move(column_id));
-      continue;
+        continue;
       }
       case PartitionSchemaPB_ColumnIdentifierPB::kName: {
         int32_t column_idx = schema.find_column(identifier.name());
@@ -144,7 +133,7 @@ Status PartitionSchema::ExtractColumnIds(
           return Status::InvalidArgument("unknown column", 
SecureDebugString(identifier));
         }
         new_column_ids.emplace_back(schema.column_id(column_idx));
-      continue;
+        continue;
       }
       default: return Status::InvalidArgument("unknown column", 
SecureDebugString(identifier));
     }
@@ -153,12 +142,22 @@ Status PartitionSchema::ExtractColumnIds(
   return Status::OK();
 }
 
-Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
-                               const Schema& schema,
-                               PartitionSchema* partition_schema) {
-  partition_schema->Clear();
+// Sets a repeated field of column identifiers to the provided column IDs.
+void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
+                          
RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>* identifiers) {
+  identifiers->Reserve(column_ids.size());
+  for (const ColumnId& column_id : column_ids) {
+    identifiers->Add()->set_id(column_id);
+  }
+}
+} // namespace
+
 
-  for (const PartitionSchemaPB_HashBucketSchemaPB& hash_bucket_pb : 
pb.hash_bucket_schemas()) {
+Status PartitionSchema::ExtractHashBucketSchemasFromPB(
+    const Schema& schema,
+    const RepeatedPtrField<PartitionSchemaPB_HashBucketSchemaPB>& 
hash_buckets_pb,
+    HashBucketSchemas* hash_bucket_schemas) {
+  for (const PartitionSchemaPB_HashBucketSchemaPB& hash_bucket_pb : 
hash_buckets_pb) {
     HashBucketSchema hash_bucket;
     RETURN_NOT_OK(ExtractColumnIds(hash_bucket_pb.columns(), schema, 
&hash_bucket.column_ids));
 
@@ -170,8 +169,17 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
 
     hash_bucket.seed = hash_bucket_pb.seed();
     hash_bucket.num_buckets = hash_bucket_pb.num_buckets();
-    partition_schema->hash_bucket_schemas_.push_back(hash_bucket);
+    hash_bucket_schemas->push_back(std::move(hash_bucket));
   }
+  return Status::OK();
+}
+
+Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
+                               const Schema& schema,
+                               PartitionSchema* partition_schema) {
+  partition_schema->Clear();
+  RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema, 
pb.hash_bucket_schemas(),
+                             &partition_schema->hash_bucket_schemas_));
 
   if (pb.has_range_schema()) {
     const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
@@ -407,12 +415,14 @@ Status PartitionSchema::CreatePartitions(const 
vector<KuduPartialRow>& split_row
                                          vector<Partition>* partitions) const {
   const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
 
-  if (!split_rows.empty()) {
-    for (const auto& hash_schemas : range_hash_schemas) {
-      if (!hash_schemas.empty()) {
-        return Status::InvalidArgument("Both 'split_rows' and 
'range_hash_schemas' cannot be "
-                                       "populated at the same time.");
+  if (!range_hash_schemas.empty()) {
+    if (!split_rows.empty()) {
+      return Status::InvalidArgument("Both 'split_rows' and 
'range_hash_schemas' cannot be "
+                                     "populated at the same time.");
       }
+    if (range_bounds.size() != range_hash_schemas.size()) {
+      return Status::InvalidArgument("The number of range bounds does not 
match the number of per "
+                                     "range hash schemas.");
     }
   }
 
@@ -440,8 +450,10 @@ Status PartitionSchema::CreatePartitions(const 
vector<KuduPartialRow>& split_row
   RETURN_NOT_OK(SplitRangeBounds(schema, std::move(splits), 
&bounds_with_hash_schemas));
 
   if (!range_hash_schemas.empty()) {
-    // Hash schemas per range cannot be applied to split rows.
+    // The number of ranges should match the size of range_hash_schemas.
     DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size());
+    // No split rows should be defined if range_hash_schemas is populated.
+    DCHECK(split_rows.empty());
     vector<Partition> result_partitions;
     // Iterate through each bound and its hash schemas to generate hash 
partitions.
     for (const auto& bound : bounds_with_hash_schemas) {
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 22a065f..d76802e 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -41,7 +41,7 @@ class ConstContiguousRow;
 class KuduPartialRow;
 class PartitionPB;
 class PartitionSchemaPB;
-class PartitionSchemaPB_ColumnIdentifierPB;
+class PartitionSchemaPB_HashBucketSchemaPB;
 template <typename Buffer> class KeyEncoder;
 
 // A Partition describes the set of rows that a Tablet is responsible for
@@ -163,11 +163,12 @@ class PartitionSchema {
     HashBucketSchemas hash_schemas;
   };
 
-  // Extracts the column IDs from a protobuf repeated field of column 
identifiers.
-  static Status ExtractColumnIds(
-      const 
google::protobuf::RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& 
identifiers,
+  // Extracts HashBucketSchemas from a protobuf repeated field of hash buckets.
+  static Status ExtractHashBucketSchemasFromPB(
       const Schema& schema,
-      std::vector<ColumnId>* column_ids);
+      const 
google::protobuf::RepeatedPtrField<PartitionSchemaPB_HashBucketSchemaPB>&
+          hash_buckets_pb,
+      HashBucketSchemas* hash_bucket_schemas);
 
   // Deserializes a protobuf message into a partition schema.
   static Status FromPB(const PartitionSchemaPB& pb,
diff --git a/src/kudu/integration-tests/table_locations-itest.cc 
b/src/kudu/integration-tests/table_locations-itest.cc
index ccb7106..c5ecebb 100644
--- a/src/kudu/integration-tests/table_locations-itest.cc
+++ b/src/kudu/integration-tests/table_locations-itest.cc
@@ -93,6 +93,7 @@ DEFINE_int32(benchmark_runtime_secs, 5, "Number of seconds to 
run the benchmark"
 DEFINE_int32(benchmark_num_threads, 16, "Number of threads to run the 
benchmark");
 DEFINE_int32(benchmark_num_tablets, 60, "Number of tablets to create");
 
+DECLARE_bool(enable_per_range_hash_schemas);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(follower_unavailable_considered_failed_sec);
 DECLARE_int32(heartbeat_interval_ms);
@@ -153,10 +154,19 @@ class TableLocationsTest : public KuduTest {
 
   virtual void SetUpConfig() {}
 
+  struct HashBucketSchema {
+    vector<string> columns;
+    int32_t num_buckets;
+    uint32_t seed;
+  };
+
   Status CreateTable(const string& table_name,
                      const Schema& schema,
                      const vector<KuduPartialRow>& split_rows,
-                     const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds);
+                     const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds,
+                     const vector<vector<HashBucketSchema>>& range_hash_schema,
+                     const vector<HashBucketSchema>& table_hash_schema);
+
 
   void CreateTable(const string& table_name, int num_splits);
 
@@ -172,7 +182,10 @@ Status TableLocationsTest::CreateTable(const string& 
table_name,
                                        const Schema& schema,
                                        const vector<KuduPartialRow>& 
split_rows = {},
                                        const vector<pair<KuduPartialRow,
-                                                         KuduPartialRow>>& 
bounds = {}) {
+                                                         KuduPartialRow>>& 
bounds = {},
+                                       const vector<vector<HashBucketSchema>>&
+                                           range_hash_schema = {},
+                                       const vector<HashBucketSchema>& 
table_hash_schema = {}) {
 
   CreateTableRequestPB req;
   CreateTableResponsePB resp;
@@ -189,6 +202,30 @@ Status TableLocationsTest::CreateTable(const string& 
table_name,
     encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
   }
 
+  for (const auto& hash_schemas : range_hash_schema) {
+    auto* range_hash_schemas_pb = req.add_range_hash_schemas();
+    for (const auto& hash_schema : hash_schemas) {
+      auto* hash_schema_pb = range_hash_schemas_pb->add_hash_schemas();
+      for (const string& col_name : hash_schema.columns) {
+        hash_schema_pb->add_columns()->set_name(col_name);
+      }
+      hash_schema_pb->set_num_buckets(hash_schema.num_buckets);
+      hash_schema_pb->set_seed(hash_schema.seed);
+    }
+  }
+
+  if (!table_hash_schema.empty()) {
+    auto* partition_schema_pb = req.mutable_partition_schema();
+    for (const auto& hash_schema : table_hash_schema) {
+      auto* hash_schema_pb = partition_schema_pb->add_hash_bucket_schemas();
+      for (const string& col_name : hash_schema.columns) {
+        hash_schema_pb->add_columns()->set_name(col_name);
+      }
+      hash_schema_pb->set_num_buckets(hash_schema.num_buckets);
+      hash_schema_pb->set_seed(hash_schema.seed);
+    }
+  }
+
   return proxy_->CreateTable(req, &resp, &controller);
 }
 
@@ -417,6 +454,75 @@ TEST_F(TableLocationsTest, TestGetTableLocations) {
   }
 }
 
+TEST_F(TableLocationsTest, TestRangeSpecificHashing) {
+  const string table_name = "test";
+  Schema schema({ ColumnSchema("key", STRING), ColumnSchema("val", STRING) }, 
2);
+  KuduPartialRow row(&schema);
+
+  FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds(3, { row, row });
+  ASSERT_OK(bounds[0].first.SetStringNoCopy(0, "a"));
+  ASSERT_OK(bounds[0].second.SetStringNoCopy(0, "b"));
+  ASSERT_OK(bounds[1].first.SetStringNoCopy(0, "c"));
+  ASSERT_OK(bounds[1].second.SetStringNoCopy(0, "d"));
+  ASSERT_OK(bounds[2].first.SetStringNoCopy(0, "e"));
+  ASSERT_OK(bounds[2].second.SetStringNoCopy(0, "f"));
+
+  vector<vector<HashBucketSchema>> range_hash_schema;
+  vector<HashBucketSchema> hash_schema_4_by_2 = { { { "key" }, 4, 0 }, { { 
"val" }, 2, 0} };
+  range_hash_schema.emplace_back(hash_schema_4_by_2);
+  vector<HashBucketSchema> hash_schema_6 = { { { "key" }, 6, 2 } };
+  range_hash_schema.emplace_back(hash_schema_6);
+
+  // Table-wide hash schema, applied to range by default if no per-range 
schema is specified.
+  vector<HashBucketSchema> table_hash_schema_5 = { { { "val" }, 5, 4 } };
+  range_hash_schema.emplace_back(vector<HashBucketSchema>());
+
+  ASSERT_OK(CreateTable(table_name, schema, {}, bounds, range_hash_schema, 
table_hash_schema_5));
+  NO_FATALS(CheckMasterTableCreation(table_name, 19));
+
+  GetTableLocationsRequestPB req;
+  GetTableLocationsResponsePB resp;
+  RpcController controller;
+  req.mutable_table()->set_table_name(table_name);
+  req.set_max_returned_locations(19);
+  ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
+  SCOPED_TRACE(SecureDebugString(resp));
+
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_EQ(19, resp.tablet_locations().size());
+
+  vector<string> partition_key_starts =  {
+      string("\0\0\0\0" "\0\0\0\0" "a" "\0\0", 11),
+      string("\0\0\0\0" "\0\0\0\1" "a" "\0\0", 11),
+      string("\0\0\0\1" "\0\0\0\0" "a" "\0\0", 11),
+      string("\0\0\0\1" "\0\0\0\1" "a" "\0\0", 11),
+      string("\0\0\0\2" "\0\0\0\0" "a" "\0\0", 11),
+      string("\0\0\0\2" "\0\0\0\1" "a" "\0\0", 11),
+      string("\0\0\0\3" "\0\0\0\0" "a" "\0\0", 11),
+      string("\0\0\0\3" "\0\0\0\1" "a" "\0\0", 11),
+      string("\0\0\0\0" "c" "\0\0", 7),
+      string("\0\0\0\1" "c" "\0\0", 7),
+      string("\0\0\0\2" "c" "\0\0", 7),
+      string("\0\0\0\3" "c" "\0\0", 7),
+      string("\0\0\0\4" "c" "\0\0", 7),
+      string("\0\0\0\5" "c" "\0\0", 7),
+      string("\0\0\0\0" "e" "\0\0", 7),
+      string("\0\0\0\1" "e" "\0\0", 7),
+      string("\0\0\0\2" "e" "\0\0", 7),
+      string("\0\0\0\3" "e" "\0\0", 7),
+      string("\0\0\0\4" "e" "\0\0", 7)
+  };
+  // Sorting partition keys to match the tablets that are returned in sorted 
order.
+  sort(partition_key_starts.begin(), partition_key_starts.end());
+  ASSERT_EQ(partition_key_starts.size(), resp.tablet_locations_size());
+  for (int i = 0; i < resp.tablet_locations_size(); i++) {
+    EXPECT_EQ(partition_key_starts[i],
+              resp.tablet_locations(i).partition().partition_key_start());
+  }
+}
+
 TEST_F(TableLocationsWithTSLocationTest, TestGetTSLocation) {
   const string table_name = "test";
   Schema schema({ ColumnSchema("key", STRING) }, 1);
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index c4eed38..00d62f2 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -327,6 +327,10 @@ DEFINE_uint32(table_locations_cache_capacity_mb, 0,
               "of 0 means table locations are not be cached");
 TAG_FLAG(table_locations_cache_capacity_mb, advanced);
 
+DEFINE_bool(enable_per_range_hash_schemas, false,
+            "Whether the ability to specify different hash schemas per range 
is enabled");
+TAG_FLAG(enable_per_range_hash_schemas, unsafe);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int64(tsk_rotation_seconds);
 
@@ -1710,7 +1714,6 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
           
RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
                 ops[i].split_row.get()));
         }
-
         range_bounds.emplace_back(*op.split_row, *ops[i].split_row);
         break;
       }
@@ -1719,10 +1722,20 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
     }
   }
 
+  PartitionSchema::RangeHashSchema range_hash_schemas;
+  if (FLAGS_enable_per_range_hash_schemas) {
+    for (int i = 0; i < req.range_hash_schemas_size(); i++) {
+      PartitionSchema::HashBucketSchemas hash_bucket_schemas;
+      RETURN_NOT_OK(PartitionSchema::ExtractHashBucketSchemasFromPB(
+          schema, req.range_hash_schemas(i).hash_schemas(), 
&hash_bucket_schemas));
+      range_hash_schemas.emplace_back(std::move(hash_bucket_schemas));
+    }
+  }
+
   // Create partitions based on specified partition schema and split rows.
   vector<Partition> partitions;
   RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, range_bounds,
-                                                  {}, schema, &partitions));
+                                                  range_hash_schemas, schema, 
&partitions));
 
   // If they didn't specify a num_replicas, set it based on the default.
   if (!req.has_num_replicas()) {
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index a897b00..a0116ec 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -117,6 +117,7 @@ using std::vector;
 using strings::Substitute;
 
 DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
+DECLARE_bool(enable_per_range_hash_schemas);
 DECLARE_bool(master_support_authz_tokens);
 DECLARE_bool(mock_table_metrics_for_testing);
 DECLARE_bool(raft_prepare_replacement_before_eviction);
@@ -162,6 +163,12 @@ class MasterTest : public KuduTest {
     KuduTest::TearDown();
   }
 
+  struct HashBucketSchema {
+    vector<string> columns;
+    int32_t num_buckets;
+    uint32_t seed;
+  };
+
   void DoListTables(const ListTablesRequestPB& req, ListTablesResponsePB* 
resp);
   void DoListAllTables(ListTablesResponsePB* resp);
   Status CreateTable(const string& table_name,
@@ -172,7 +179,8 @@ class MasterTest : public KuduTest {
                      const vector<KuduPartialRow>& split_rows,
                      const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds,
                      const optional<string>& owner,
-                     const optional<TableTypePB>& table_type = boost::none);
+                     const optional<TableTypePB>& table_type = boost::none,
+                     const vector<vector<HashBucketSchema>>& range_hash_schema 
= {});
 
   shared_ptr<Messenger> client_messenger_;
   unique_ptr<MiniMaster> mini_master_;
@@ -546,7 +554,8 @@ Status MasterTest::CreateTable(const string& table_name,
                                const vector<KuduPartialRow>& split_rows,
                                const vector<pair<KuduPartialRow, 
KuduPartialRow>>& bounds,
                                const optional<string>& owner,
-                               const optional<TableTypePB>& table_type) {
+                               const optional<TableTypePB>& table_type,
+                               const vector<vector<HashBucketSchema>>& 
range_hash_schema) {
   CreateTableRequestPB req;
   CreateTableResponsePB resp;
   RpcController controller;
@@ -565,6 +574,18 @@ Status MasterTest::CreateTable(const string& table_name,
     encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
   }
 
+  for (const auto& hash_schemas : range_hash_schema) {
+    auto* hash_schemas_pb = req.add_range_hash_schemas();
+    for (const auto& hash_schema : hash_schemas) {
+      auto* hash_bucket_schema_pb = hash_schemas_pb->add_hash_schemas();
+      for (const string& col_name : hash_schema.columns) {
+        hash_bucket_schema_pb->add_columns()->set_name(col_name);
+      }
+      hash_bucket_schema_pb->set_num_buckets(hash_schema.num_buckets);
+      hash_bucket_schema_pb->set_seed(hash_schema.seed);
+    }
+  }
+
   if (owner) {
     req.set_owner(*owner);
   }
@@ -805,6 +826,43 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
                         "least one range partition column");
   }
 
+  // No split rows and range specific hashing concurrently.
+  {
+    FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
+    KuduPartialRow split1(&kTableSchema);
+    ASSERT_OK(split1.SetInt32("key", 1));
+    KuduPartialRow a_lower(&kTableSchema);
+    KuduPartialRow a_upper(&kTableSchema);
+    ASSERT_OK(a_lower.SetInt32("key", 0));
+    ASSERT_OK(a_upper.SetInt32("key", 100));
+    vector<vector<HashBucketSchema>> range_hash_schema = { 
vector<HashBucketSchema>() };
+    Status s = CreateTable(kTableName, kTableSchema, { split1 }, { { a_lower, 
a_upper } },
+                           none, none, range_hash_schema);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "Both 'split_rows' and 'range_hash_schemas' cannot be "
+                        "populated at the same time.");
+  }
+
+  // The number of range bounds must match the size of user defined hash 
schemas.
+  {
+    FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
+    KuduPartialRow a_lower(&kTableSchema);
+    KuduPartialRow a_upper(&kTableSchema);
+    ASSERT_OK(a_lower.SetInt32("key", 0));
+    ASSERT_OK(a_upper.SetInt32("key", 100));
+    vector<HashBucketSchema> hash_schemas_4 = { { {"key"}, 4, 0 } };
+    vector<HashBucketSchema> hash_schemas_2 = { { {"val"}, 2, 0 } };
+    vector<vector<HashBucketSchema>> range_hash_schema = 
{std::move(hash_schemas_2),
+                                                          
std::move(hash_schemas_4)};
+    Status s = CreateTable(kTableName, kTableSchema, { }, { { a_lower, a_upper 
} },
+                           none, none, range_hash_schema);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "The number of range bounds does not match the number 
of per "
+                        "range hash schemas.");
+  }
+
   // No non-range columns.
   {
     KuduPartialRow split(&kTableSchema);
@@ -827,8 +885,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(b_lower.SetInt32("key", 50));
     ASSERT_OK(b_upper.SetInt32("key", 150));
     Status s = CreateTable(kTableName, kTableSchema, { }, { { a_lower, a_upper 
},
-                                                            { b_lower, b_upper 
} },
-                                                            none);
+                                                            { b_lower, b_upper 
} }, none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: overlapping range 
partition");
   }
@@ -842,8 +899,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(split.SetInt32("key", 200));
 
     Status s = CreateTable(kTableName, kTableSchema, { split },
-                           { { bound_lower, bound_upper } },
-                           none);
+                           { { bound_lower, bound_upper } }, none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: split out of bounds");
   }
@@ -857,8 +913,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(split.SetInt32("key", -120));
 
     Status s = CreateTable(kTableName, kTableSchema, { split },
-                           { { bound_lower, bound_upper } },
-                           none);
+                           { { bound_lower, bound_upper } }, none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: split out of bounds");
   }
@@ -935,7 +990,7 @@ TEST_F(MasterTest, TestCreateTableOwnerNameTooLong) {
     
"xyz01234567899abcdefghijklmnopqrstuvwxyz01234567899abcdefghijklmnopqrstuvwxyz01234567899";
 
   const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", 
INT32) }, 1);
-  Status s = CreateTable(kTableName, kTableSchema, { }, { }, kOwnerName);
+  Status s = CreateTable(kTableName, kTableSchema, {}, {}, kOwnerName);
   ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "invalid owner name");
 }
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 750761f..01514b4 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -490,6 +490,10 @@ message GetTabletLocationsResponsePB {
 // ============================================================================
 //  Catalog
 // ============================================================================
+message PerRangeHashBucketSchemasPB {
+  repeated PartitionSchemaPB.HashBucketSchemaPB hash_schemas = 1;
+}
+
 message CreateTableRequestPB {
   required string name = 1;
   required SchemaPB schema = 2;
@@ -497,7 +501,15 @@ message CreateTableRequestPB {
   // repeated PartialRowPB split_rows = 5;
   // Holds either the split rows or the range bounds (or both) of the table.
   optional RowOperationsPB split_rows_range_bounds = 6;
+  // Holds the table's partition schema, the partition schema's hash bucket 
schemas
+  // are the default for any range where 'range_hash_schemas' is empty.
   optional PartitionSchemaPB partition_schema = 7;
+  // Holds the hash bucket schemas for each range during table creation.
+  // Only populated when 'split_rows_range_bounds' specifies range bounds, 
must be empty if any
+  // split rows are specified. If this field is set, its size must match the 
number of ranges
+  // specified by range bounds and they must be in the same order. If this 
field is empty,
+  // 'partition_schema' is assumed for every range bound.
+  repeated PerRangeHashBucketSchemasPB range_hash_schemas = 12;
   optional int32 num_replicas = 4;
 
   // If set, uses the provided value as the table owner when creating the 
table.

Reply via email to