This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 250eb90bc [master] KUDU-2671: Range specific hashing during table alter op. 250eb90bc is described below commit 250eb90bc0e1f4f472f44de8a23ce213595d5ee7 Author: Abhishek Chennaka <achenn...@cloudera.com> AuthorDate: Tue May 10 10:09:31 2022 -0400 [master] KUDU-2671: Range specific hashing during table alter op. This commit has the changes needed on the master side to support this functionality. A basic test is added to test the functionality as well. Change-Id: Iea9e3317d172c9ae76662c44b21fca9a4819930a Reviewed-on: http://gerrit.cloudera.org:8080/18515 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/master/catalog_manager.cc | 16 ++++- src/kudu/master/master-test.cc | 128 ++++++++++++++++++++++++++++++++++--- 2 files changed, 134 insertions(+), 10 deletions(-) diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 2928466f4..ab3380f23 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -2618,7 +2618,6 @@ Status CatalogManager::ApplyAlterPartitioningSteps( PartitionSchema partition_schema; RETURN_NOT_OK(PartitionSchema::FromPB( l.data().pb.partition_schema(), schema, &partition_schema)); - TableInfo::TabletInfoMap existing_tablets = table->tablet_map(); TableInfo::TabletInfoMap new_tablets; auto abort_mutations = MakeScopedCleanup([&new_tablets]() { @@ -2627,9 +2626,22 @@ Status CatalogManager::ApplyAlterPartitioningSteps( } }); + vector<PartitionSchema::HashSchema> range_hash_schemas; for (const auto& step : steps) { vector<DecodedRowOperation> ops; if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) { + if (FLAGS_enable_per_range_hash_schemas && + step.add_range_partition().custom_hash_schema_size() > 0) { + const Schema schema = client_schema.CopyWithColumnIds(); + PartitionSchema::HashSchema hash_schema; + RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB( + schema, step.add_range_partition().custom_hash_schema(), &hash_schema)); + if (partition_schema.hash_schema().size() != hash_schema.size()) { + return Status::NotSupported( + "varying number of hash dimensions per range is not yet supported"); + } + range_hash_schemas.emplace_back(std::move(hash_schema)); + } RowOperationsPBDecoder decoder(&step.add_range_partition().range_bounds(), &client_schema, &schema, nullptr); RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops)); @@ -2666,7 +2678,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps( vector<Partition> partitions; RETURN_NOT_OK(partition_schema.CreatePartitions( - {}, {{ *ops[0].split_row, *ops[1].split_row }}, {}, schema, &partitions)); + {}, {{ *ops[0].split_row, *ops[1].split_row }}, range_hash_schemas, schema, &partitions)); switch (step.type()) { case AlterTableRequestPB::ADD_RANGE_PARTITION: { for (const Partition& partition : partitions) { diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc index db345476c..5e13ed68c 100644 --- a/src/kudu/master/master-test.cc +++ b/src/kudu/master/master-test.cc @@ -197,6 +197,7 @@ class MasterTest : public KuduTest { const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds = {}, const vector<HashSchema>& range_hash_schemas = {}); + Status CreateTable(const string& name, const Schema& schema, const optional<TableTypePB>& type, @@ -204,7 +205,9 @@ class MasterTest : public KuduTest { const optional<string>& comment, const vector<KuduPartialRow>& split_rows, const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds, - const vector<HashSchema>& range_hash_schemas); + const vector<HashSchema>& range_hash_schemas, + const HashSchema& table_wide_hash_schema, + CreateTableResponsePB* resp); shared_ptr<Messenger> client_messenger_; unique_ptr<MiniMaster> mini_master_; @@ -223,8 +226,9 @@ Status MasterTest::CreateTable(const string& name, KuduPartialRow split2(&schema); RETURN_NOT_OK(split2.SetInt32("key", 20)); + CreateTableResponsePB resp; return CreateTable( - name, schema, type, owner, comment, { split1, split2 }, {}, {}); + name, schema, type, owner, comment, { split1, split2 }, {}, {}, {}, &resp); } Status MasterTest::CreateTable( @@ -233,8 +237,9 @@ Status MasterTest::CreateTable( const vector<KuduPartialRow>& split_rows, const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds, const vector<HashSchema>& range_hash_schemas) { + CreateTableResponsePB resp; return CreateTable( - name, schema, none, none, none, split_rows, bounds, range_hash_schemas); + name, schema, none, none, none, split_rows, bounds, range_hash_schemas, {}, &resp); } Status MasterTest::CreateTable( @@ -245,7 +250,9 @@ Status MasterTest::CreateTable( const optional<string>& comment, const vector<KuduPartialRow>& split_rows, const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds, - const vector<HashSchema>& range_hash_schemas) { + const vector<HashSchema>& range_hash_schemas, + const HashSchema& table_wide_hash_schema, + CreateTableResponsePB* resp) { if (!range_hash_schemas.empty() && range_hash_schemas.size() != bounds.size()) { return Status::InvalidArgument( @@ -268,6 +275,16 @@ Status MasterTest::CreateTable( } auto* ps_pb = req.mutable_partition_schema(); + + for (const auto& hash_dimension : table_wide_hash_schema) { + auto* hash_schema = ps_pb->add_hash_schema(); + for (const string& col_name : hash_dimension.columns) { + hash_schema->add_columns()->set_name(col_name); + } + hash_schema->set_num_buckets(hash_dimension.num_buckets); + hash_schema->set_seed(hash_dimension.num_buckets); + } + for (size_t i = 0; i < range_hash_schemas.size(); ++i) { auto* range = ps_pb->add_custom_hash_schema_ranges(); RowOperationsPBEncoder encoder(range->mutable_range_bounds()); @@ -296,10 +313,9 @@ Status MasterTest::CreateTable( controller.RequireServerFeature(MasterFeatures::RANGE_PARTITION_BOUNDS); } - CreateTableResponsePB resp; - RETURN_NOT_OK(proxy_->CreateTable(req, &resp, &controller)); - if (resp.has_error()) { - RETURN_NOT_OK(StatusFromPB(resp.error().status())); + RETURN_NOT_OK(proxy_->CreateTable(req, resp, &controller)); + if (resp->has_error()) { + RETURN_NOT_OK(StatusFromPB(resp->error().status())); } return Status::OK(); } @@ -935,6 +951,102 @@ TEST_F(MasterTest, ListTablesWithTableFilter) { } } +class AlterTableWithRangeSpecificHashSchema : public MasterTest, + public ::testing::WithParamInterface<bool> {}; + +TEST_P(AlterTableWithRangeSpecificHashSchema, TestAlterTableWithDifferentHashDimensions) { + constexpr const char* const kTableName = "testtb"; + const Schema kTableSchema({ColumnSchema("key", INT32), + ColumnSchema("val", INT32)}, 2); + FLAGS_enable_per_range_hash_schemas = true; // enable for testing. + FLAGS_default_num_replicas = 1; + + // Create a table with one partition + KuduPartialRow a_lower(&kTableSchema); + KuduPartialRow a_upper(&kTableSchema); + ASSERT_OK(a_lower.SetInt32("key", 0)); + ASSERT_OK(a_upper.SetInt32("key", 100)); + CreateTableResponsePB create_table_resp; + ASSERT_OK(CreateTable( + kTableName, kTableSchema, none, none, none, {}, {{a_lower, a_upper}}, + {}, {{{"key"}, 2, 0}}, &create_table_resp)); + + // Populate the custom hash schemas with different hash dimension count based on + // the test case + HashSchema custom_range_hash_schema; + const bool has_different_dimensions_count = GetParam(); + if (has_different_dimensions_count) { + custom_range_hash_schema = {{{"key"}, 3, 0}, + {{"val"}, 3, 0}}; + } else { + custom_range_hash_schema = {{{"key"}, 3, 0}}; + } + + //Create AlterTableRequestPB and populate it for the alter table operation + AlterTableRequestPB req; + AlterTableResponsePB resp; + RpcController controller; + req.mutable_table()->set_table_name(kTableName); + req.mutable_table()->set_table_id(create_table_resp.table_id()); + + AlterTableRequestPB::Step* step = req.add_alter_schema_steps(); + step->set_type(AlterTableRequestPB::ADD_RANGE_PARTITION); + KuduPartialRow lower(&kTableSchema); + ASSERT_OK(lower.SetInt32("key", 200)); + KuduPartialRow upper(&kTableSchema); + ASSERT_OK(upper.SetInt32("key", 300)); + RowOperationsPBEncoder splits_encoder( + step->mutable_add_range_partition()->mutable_range_bounds()); + splits_encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower); + splits_encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper); + for (const auto& hash_dimension: custom_range_hash_schema) { + auto* hash_dimension_pb = step->mutable_add_range_partition()->add_custom_hash_schema(); + for (const string& col_name: hash_dimension.columns) { + hash_dimension_pb->add_columns()->set_name(col_name); + } + hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets); + hash_dimension_pb->set_seed(hash_dimension.seed); + } + + ColumnSchemaPB* col1 = req.mutable_schema()->add_columns(); + col1->set_name("key"); + col1->set_type(INT32); + col1->set_is_key(true); + + ColumnSchemaPB* col2 = req.mutable_schema()->add_columns(); + col2->set_name("val"); + col2->set_type(INT32); + col2->set_is_key(true); + + // Check the number of tablets in the table + std::vector<scoped_refptr<TableInfo>> tables; + { + CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager()); + master_->catalog_manager()->GetAllTables(&tables); + } + ASSERT_EQ(1, tables.size()); + ASSERT_EQ(2, tables.front()->num_tablets()); + + // Submit the alter table request + proxy_->AlterTable(req, &resp, &controller); + if (has_different_dimensions_count) { + ASSERT_TRUE(resp.has_error()); + ASSERT_STR_CONTAINS(resp.error().status().DebugString(), + "varying number of hash dimensions per range is not yet supported"); + } else { + ASSERT_FALSE(resp.has_error()); + { + CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager()); + master_->catalog_manager()->GetAllTables(&tables); + } + ASSERT_EQ(1, tables.size()); + ASSERT_EQ(5, tables.front()->num_tablets()); + } +} + +INSTANTIATE_TEST_SUITE_P(AlterTableWithCustomHashSchema, + AlterTableWithRangeSpecificHashSchema, ::testing::Bool()); + TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) { constexpr const char* const kTableName = "testtb"; const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1);