This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 9091f31cf KUDU-2671 refactor PartitionSchema::CreatePartitions() 9091f31cf is described below commit 9091f31cfc5c102b6a734fa20b015735251392d4 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Tue May 31 20:18:51 2022 -0700 KUDU-2671 refactor PartitionSchema::CreatePartitions() This patch refactors the code of the PartitionSchema::CreatePartitions() method and the code around its invocations, addressing a few TODOs. As a consequence, this patch fixes a bug in the CreateTable path: prior to this patch, a client could submit a CreateTableRequestPB request such that the order of ranges in CreateTableRequestPB::split_rows_range_bounds and CreateTableRequestPB::partition_schema::custom_hash_schema_ranges were different, and the result table would be created with wrong hash schemas. For example: requested: {{range_boundary_a0, range_boundary_a1}, hash_schema_a} {{range_boundary_b0, range_boundary_b1}, hash_schema_b} created: {{range_boundary_a0, range_boundary_a1}, hash_schema_b} {{range_boundary_b0, range_boundary_b1}, hash_schema_a} I'm planning to add corresponding test scenario to catch the regressions in a separate changelist for ease of reviewing and tracking the changes. However, I updated the existing test scenarios affected by this change. Change-Id: I5bac1f8ee349577e2f912530a28776415ed0a5b0 Reviewed-on: http://gerrit.cloudera.org:8080/18582 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> Reviewed-by: Attila Bukor <abu...@apache.org> --- src/kudu/common/partition-test.cc | 571 +++++++++------------ src/kudu/common/partition.cc | 285 +++++----- src/kudu/common/partition.h | 46 +- src/kudu/common/partition_pruner-test.cc | 76 +-- src/kudu/common/scan_spec-test.cc | 18 +- .../integration-tests/ts_tablet_manager-itest.cc | 2 +- src/kudu/master/catalog_manager.cc | 98 ++-- src/kudu/master/master-test.cc | 4 +- src/kudu/master/sys_catalog.cc | 2 +- src/kudu/tablet/tablet-harness.h | 2 +- src/kudu/tserver/tablet_server-test.cc | 2 +- 11 files changed, 531 insertions(+), 575 deletions(-) diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc index 259ba28c3..d74936936 100644 --- a/src/kudu/common/partition-test.cc +++ b/src/kudu/common/partition-test.cc @@ -19,9 +19,7 @@ #include <algorithm> #include <cstdint> -#include <random> #include <string> -#include <type_traits> #include <utility> #include <vector> @@ -112,7 +110,7 @@ void CheckCreateRangePartitions(const vector<pair<optional<string>, optional<str } vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, schema, &partitions)); ASSERT_EQ(expected_partition_ranges.size(), partitions.size()); for (int i = 0; i < partitions.size(); i++) { @@ -188,7 +186,7 @@ TEST_F(PartitionTest, TestCompoundRangeKeyEncoding) { } vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, schema, &partitions)); ASSERT_EQ(4, partitions.size()); EXPECT_TRUE(partitions[0].hash_buckets().empty()); @@ -475,7 +473,7 @@ TEST_F(PartitionTest, TestCreateHashPartitions) { vector<Partition> partitions; ASSERT_OK( - partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {}, schema, &partitions)); + partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions)); ASSERT_EQ(3, partitions.size()); EXPECT_EQ(0, partitions[0].hash_buckets()[0]); @@ -564,7 +562,7 @@ TEST_F(PartitionTest, TestCreatePartitions) { // Split keys need not be passed in sorted order. vector<KuduPartialRow> split_rows = { split_b, split_a }; vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions(split_rows, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(split_rows, {}, schema, &partitions)); ASSERT_EQ(12, partitions.size()); EXPECT_EQ(0, partitions[0].hash_buckets()[0]); @@ -912,31 +910,43 @@ void CheckSerializationFunctions(const PartitionSchemaPB& pb, ASSERT_EQ(partition_schema, partition_schema1); } +void AddRangePartitionWithSchema( + const Schema& schema, + const KuduPartialRow& lower, + const KuduPartialRow& upper, + const PartitionSchema::HashSchema& range_hash_schema, + PartitionSchemaPB* pb) { + auto* range = pb->add_custom_hash_schema_ranges(); + RowOperationsPBEncoder encoder(range->mutable_range_bounds()); + encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower); + encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper); + for (const auto& dimension : range_hash_schema) { + auto* hash_dimension_pb = range->add_hash_schema(); + PartitionSchema::HashDimension hash_dimension; + for (const auto& cid : dimension.column_ids) { + hash_dimension_pb->add_columns()->set_name(schema.column_by_id(cid).name()); + } + hash_dimension_pb->set_num_buckets(dimension.num_buckets); + hash_dimension_pb->set_seed(dimension.seed); + } +} + } // anonymous namespace -TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) { - // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c)) +TEST_F(PartitionTest, VaryingHashSchemasPerRange) { + // CREATE TABLE t (a STRING, b STRING, c STRING, PRIMARY KEY (a, b, c)) // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)]; Schema schema({ ColumnSchema("a", STRING), ColumnSchema("b", STRING), ColumnSchema("c", STRING) }, { ColumnId(0), ColumnId(1), ColumnId(2) }, 3); - PartitionSchemaPB schema_builder; - // Table-wide hash schema defined below, 3 by 2 buckets so 6 total. - AddHashDimension(&schema_builder, { "a", "c" }, 3, 0); - AddHashDimension(&schema_builder, { "b" }, 2, 0); - PartitionSchema partition_schema; - ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema)); - CheckSerializationFunctions(schema_builder, partition_schema, schema); - - ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)", - partition_schema.DebugString(schema)); - - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; - vector<pair<pair<KuduPartialRow, KuduPartialRow>, - PartitionSchema::HashSchema>> bounds_with_hash_schemas; + PartitionSchemaPB ps_pb; + // Table-wide hash schema is defined below: 3 by 2 buckets, so 6 total. + const PartitionSchema::HashSchema table_wide_hash_schema = + { { { ColumnId(0), ColumnId(2) }, 3, 0}, { { ColumnId(1) }, 2, 0 } }; + AddHashDimension(&ps_pb, { "a", "c" }, 3, 0); + AddHashDimension(&ps_pb, { "b" }, 2, 0); { // [(a1, _, c1), (a2, _, c2)) KuduPartialRow lower(&schema); @@ -945,11 +955,8 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) { ASSERT_OK(lower.SetStringCopy("c", "c1")); ASSERT_OK(upper.SetStringCopy("a", "a2")); ASSERT_OK(upper.SetStringCopy("c", "c2")); - PartitionSchema::HashSchema hash_schema_4_buckets = {{{ColumnId(0)}, 4, 0}}; - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(hash_schema_4_buckets); - bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), std::move(upper)), - std::move(hash_schema_4_buckets)); + AddRangePartitionWithSchema( + schema, lower, upper, {{{ColumnId(0)}, 4, 0}}, &ps_pb); } { // [(a3, b3, _), (a4, b4, _)) @@ -959,11 +966,8 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) { ASSERT_OK(lower.SetStringCopy("b", "b3")); ASSERT_OK(upper.SetStringCopy("a", "a4")); ASSERT_OK(upper.SetStringCopy("b", "b4")); - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(partition_schema.hash_schema()); - // Use the table-wide hash schema for this range. - bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), std::move(upper)), - partition_schema.hash_schema()); + AddRangePartitionWithSchema( + schema, lower, upper, table_wide_hash_schema, &ps_pb); } { // [(a5, b5, _), (a6, _, c6)) @@ -973,234 +977,180 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) { ASSERT_OK(lower.SetStringCopy("b", "b5")); ASSERT_OK(upper.SetStringCopy("a", "a6")); ASSERT_OK(upper.SetStringCopy("c", "c6")); - PartitionSchema::HashSchema hash_schema_2_buckets_by_3 = { - {{ColumnId(0)}, 2, 0}, - {{ColumnId(1)}, 3, 0} - }; - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3); - bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), std::move(upper)), - std::move(hash_schema_2_buckets_by_3)); + AddRangePartitionWithSchema( + schema, lower, upper, + {{{ColumnId(0)}, 2, 0}, {{ColumnId(1)}, 3, 0}}, &ps_pb); } - const auto check_partitions = [](const vector<Partition>& partitions) { - ASSERT_EQ(16, partitions.size()); - - ASSERT_EQ(1, partitions[0].hash_buckets().size()); - EXPECT_EQ(0, partitions[0].hash_buckets()[0]); - EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].begin().range_key()); - EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[0].end().range_key()); - EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12), - partitions[0].begin().ToString()); - EXPECT_EQ(string("\0\0\0\0" "a2\0\0\0\0c2", 12), - partitions[0].end().ToString()); - - ASSERT_EQ(1, partitions[1].hash_buckets().size()); - EXPECT_EQ(1, partitions[1].hash_buckets()[0]); - EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].begin().range_key()); - EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[1].end().range_key()); - EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12), - partitions[1].begin().ToString()); - EXPECT_EQ(string("\0\0\0\1" "a2\0\0\0\0c2", 12), - partitions[1].end().ToString()); - - ASSERT_EQ(1, partitions[2].hash_buckets().size()); - EXPECT_EQ(2, partitions[2].hash_buckets()[0]); - EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].begin().range_key()); - EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[2].end().range_key()); - EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12), - partitions[2].begin().ToString()); - EXPECT_EQ(string("\0\0\0\2" "a2\0\0\0\0c2", 12), - partitions[2].end().ToString()); - - ASSERT_EQ(1, partitions[3].hash_buckets().size()); - EXPECT_EQ(3, partitions[3].hash_buckets()[0]); - EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].begin().range_key()); - EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[3].end().range_key()); - EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12), - partitions[3].begin().ToString()); - EXPECT_EQ(string("\0\0\0\3" "a2\0\0\0\0c2", 12), - partitions[3].end().ToString()); - - ASSERT_EQ(2, partitions[4].hash_buckets().size()); - EXPECT_EQ(0, partitions[4].hash_buckets()[0]); - EXPECT_EQ(0, partitions[4].hash_buckets()[1]); - EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[4].begin().range_key()); - EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[4].end().range_key()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a3\0\0b3\0\0", 16), - partitions[4].begin().ToString()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16), - partitions[4].end().ToString()); - - ASSERT_EQ(2, partitions[5].hash_buckets().size()); - EXPECT_EQ(0, partitions[5].hash_buckets()[0]); - EXPECT_EQ(1, partitions[5].hash_buckets()[1]); - EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[5].begin().range_key()); - EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[5].end().range_key()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a3\0\0b3\0\0", 16), - partitions[5].begin().ToString()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16), - partitions[5].end().ToString()); - - ASSERT_EQ(2, partitions[6].hash_buckets().size()); - EXPECT_EQ(1, partitions[6].hash_buckets()[0]); - EXPECT_EQ(0, partitions[6].hash_buckets()[1]); - EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[6].begin().range_key()); - EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[6].end().range_key()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a3\0\0b3\0\0", 16), - partitions[6].begin().ToString()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16), - partitions[6].end().ToString()); - - ASSERT_EQ(2, partitions[7].hash_buckets().size()); - EXPECT_EQ(1, partitions[7].hash_buckets()[0]); - EXPECT_EQ(1, partitions[7].hash_buckets()[1]); - EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[7].begin().range_key()); - EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[7].end().range_key()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a3\0\0b3\0\0", 16), - partitions[7].begin().ToString()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16), - partitions[7].end().ToString()); - - ASSERT_EQ(2, partitions[8].hash_buckets().size()); - EXPECT_EQ(2, partitions[8].hash_buckets()[0]); - EXPECT_EQ(0, partitions[8].hash_buckets()[1]); - EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[8].begin().range_key()); - EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[8].end().range_key()); - EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a3\0\0b3\0\0", 16), - partitions[8].begin().ToString()); - EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a4\0\0b4\0\0", 16), - partitions[8].end().ToString()); - - ASSERT_EQ(2, partitions[9].hash_buckets().size()); - EXPECT_EQ(2, partitions[9].hash_buckets()[0]); - EXPECT_EQ(1, partitions[9].hash_buckets()[1]); - EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[9].begin().range_key()); - EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[9].end().range_key()); - EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a3\0\0b3\0\0", 16), - partitions[9].begin().ToString()); - EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a4\0\0b4\0\0", 16), - partitions[9].end().ToString()); - - ASSERT_EQ(2, partitions[10].hash_buckets().size()); - EXPECT_EQ(0, partitions[10].hash_buckets()[0]); - EXPECT_EQ(0, partitions[10].hash_buckets()[1]); - EXPECT_EQ(string("a5\0\0b5\0\0", 8), - partitions[10].begin().range_key()); - EXPECT_EQ(string("a6\0\0\0\0c6", 8), - partitions[10].end().range_key()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a5\0\0b5\0\0", 16), - partitions[10].begin().ToString()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a6\0\0\0\0c6", 16), - partitions[10].end().ToString()); - - ASSERT_EQ(2, partitions[11].hash_buckets().size()); - EXPECT_EQ(0, partitions[11].hash_buckets()[0]); - EXPECT_EQ(1, partitions[11].hash_buckets()[1]); - EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[11].begin().range_key()); - EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[11].end().range_key()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a5\0\0b5\0\0", 16), - partitions[11].begin().ToString()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a6\0\0\0\0c6", 16), - partitions[11].end().ToString()); - - ASSERT_EQ(2, partitions[12].hash_buckets().size()); - EXPECT_EQ(0, partitions[12].hash_buckets()[0]); - EXPECT_EQ(2, partitions[12].hash_buckets()[1]); - EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[12].begin().range_key()); - EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[12].end().range_key()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a5\0\0b5\0\0", 16), - partitions[12].begin().ToString()); - EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a6\0\0\0\0c6", 16), - partitions[12].end().ToString()); - - ASSERT_EQ(2, partitions[13].hash_buckets().size()); - EXPECT_EQ(1, partitions[13].hash_buckets()[0]); - EXPECT_EQ(0, partitions[13].hash_buckets()[1]); - EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[13].begin().range_key()); - EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[13].end().range_key()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a5\0\0b5\0\0", 16), - partitions[13].begin().ToString()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a6\0\0\0\0c6", 16), - partitions[13].end().ToString()); - - ASSERT_EQ(2, partitions[14].hash_buckets().size()); - EXPECT_EQ(1, partitions[14].hash_buckets()[0]); - EXPECT_EQ(1, partitions[14].hash_buckets()[1]); - EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[14].begin().range_key()); - EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[14].end().range_key()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a5\0\0b5\0\0", 16), - partitions[14].begin().ToString()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a6\0\0\0\0c6", 16), - partitions[14].end().ToString()); - - ASSERT_EQ(2, partitions[15].hash_buckets().size()); - EXPECT_EQ(1, partitions[15].hash_buckets()[0]); - EXPECT_EQ(2, partitions[15].hash_buckets()[1]); - EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[15].begin().range_key()); - EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[15].end().range_key()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16), - partitions[15].begin().ToString()); - EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16), - partitions[15].end().ToString()); - }; + PartitionSchema ps; + ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps)); + CheckSerializationFunctions(ps_pb, ps, schema); + + ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)", + ps.DebugString(schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions( - {}, bounds, range_hash_schemas, schema, &partitions)); - NO_FATALS(check_partitions(partitions)); - - bounds.clear(); - range_hash_schemas.clear(); - partitions.clear(); - - // Using std::random_shuffle to insert bounds and their hash schemas out of - // sorted order, yet resulting partitions will still be the same. - std::mt19937 gen(SeedRandom()); - std::shuffle(bounds_with_hash_schemas.begin(), bounds_with_hash_schemas.end(), gen); - - for (const auto& bounds_and_schema : bounds_with_hash_schemas) { - bounds.emplace_back(bounds_and_schema.first); - range_hash_schemas.emplace_back(bounds_and_schema.second); - } + ASSERT_OK(ps.CreatePartitions(schema, &partitions)); - ASSERT_OK(partition_schema.CreatePartitions( - {}, bounds, range_hash_schemas, schema, &partitions)); - NO_FATALS(check_partitions(partitions)); + ASSERT_EQ(16, partitions.size()); - // Not clearing bounds or range_hash_schemas, adding a split row to test - // incompatibility. - vector<KuduPartialRow> splits; - { // split: (a1, _, c12) - KuduPartialRow split(&schema); - ASSERT_OK(split.SetStringCopy("a", "a1")); - ASSERT_OK(split.SetStringCopy("c", "c12")); - splits.emplace_back(std::move(split)); - } + ASSERT_EQ(1, partitions[0].hash_buckets().size()); + EXPECT_EQ(0, partitions[0].hash_buckets()[0]); + EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].begin().range_key()); + EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[0].end().range_key()); + EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12), + partitions[0].begin().ToString()); + EXPECT_EQ(string("\0\0\0\0" "a2\0\0\0\0c2", 12), + partitions[0].end().ToString()); - // Expecting Status::InvalidArgument() due to 'splits' and schemas within - // 'range_hash_schemas' being defined at the same time. - { - const auto s = partition_schema.CreatePartitions( - splits, bounds, range_hash_schemas, schema, &partitions); - ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); - ASSERT_STR_CONTAINS(s.ToString(), - "Both 'split_rows' and 'range_hash_schemas' " - "cannot be populated at the same time"); - } + ASSERT_EQ(1, partitions[1].hash_buckets().size()); + EXPECT_EQ(1, partitions[1].hash_buckets()[0]); + EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].begin().range_key()); + EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[1].end().range_key()); + EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12), + partitions[1].begin().ToString()); + EXPECT_EQ(string("\0\0\0\1" "a2\0\0\0\0c2", 12), + partitions[1].end().ToString()); - // Adding another schema to range_hash_schemas to trigger - // Status::InvalidArgument() due to 'bounds and 'range_hash_schema' not being - // the same size. - { - range_hash_schemas.push_back({}); - const auto s = partition_schema.CreatePartitions( - {}, bounds, range_hash_schemas, schema, &partitions); - ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); - ASSERT_STR_CONTAINS(s.ToString(), - "4 vs 3: per range hash schemas and range bounds " - "must have the same size"); - } + ASSERT_EQ(1, partitions[2].hash_buckets().size()); + EXPECT_EQ(2, partitions[2].hash_buckets()[0]); + EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].begin().range_key()); + EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[2].end().range_key()); + EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12), + partitions[2].begin().ToString()); + EXPECT_EQ(string("\0\0\0\2" "a2\0\0\0\0c2", 12), + partitions[2].end().ToString()); + + ASSERT_EQ(1, partitions[3].hash_buckets().size()); + EXPECT_EQ(3, partitions[3].hash_buckets()[0]); + EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].begin().range_key()); + EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[3].end().range_key()); + EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12), + partitions[3].begin().ToString()); + EXPECT_EQ(string("\0\0\0\3" "a2\0\0\0\0c2", 12), + partitions[3].end().ToString()); + + ASSERT_EQ(2, partitions[4].hash_buckets().size()); + EXPECT_EQ(0, partitions[4].hash_buckets()[0]); + EXPECT_EQ(0, partitions[4].hash_buckets()[1]); + EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[4].begin().range_key()); + EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[4].end().range_key()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a3\0\0b3\0\0", 16), + partitions[4].begin().ToString()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16), + partitions[4].end().ToString()); + + ASSERT_EQ(2, partitions[5].hash_buckets().size()); + EXPECT_EQ(0, partitions[5].hash_buckets()[0]); + EXPECT_EQ(1, partitions[5].hash_buckets()[1]); + EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[5].begin().range_key()); + EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[5].end().range_key()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a3\0\0b3\0\0", 16), + partitions[5].begin().ToString()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16), + partitions[5].end().ToString()); + + ASSERT_EQ(2, partitions[6].hash_buckets().size()); + EXPECT_EQ(1, partitions[6].hash_buckets()[0]); + EXPECT_EQ(0, partitions[6].hash_buckets()[1]); + EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[6].begin().range_key()); + EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[6].end().range_key()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a3\0\0b3\0\0", 16), + partitions[6].begin().ToString()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16), + partitions[6].end().ToString()); + + ASSERT_EQ(2, partitions[7].hash_buckets().size()); + EXPECT_EQ(1, partitions[7].hash_buckets()[0]); + EXPECT_EQ(1, partitions[7].hash_buckets()[1]); + EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[7].begin().range_key()); + EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[7].end().range_key()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a3\0\0b3\0\0", 16), + partitions[7].begin().ToString()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16), + partitions[7].end().ToString()); + + ASSERT_EQ(2, partitions[8].hash_buckets().size()); + EXPECT_EQ(2, partitions[8].hash_buckets()[0]); + EXPECT_EQ(0, partitions[8].hash_buckets()[1]); + EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[8].begin().range_key()); + EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[8].end().range_key()); + EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a3\0\0b3\0\0", 16), + partitions[8].begin().ToString()); + EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a4\0\0b4\0\0", 16), + partitions[8].end().ToString()); + + ASSERT_EQ(2, partitions[9].hash_buckets().size()); + EXPECT_EQ(2, partitions[9].hash_buckets()[0]); + EXPECT_EQ(1, partitions[9].hash_buckets()[1]); + EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[9].begin().range_key()); + EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[9].end().range_key()); + EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a3\0\0b3\0\0", 16), + partitions[9].begin().ToString()); + EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a4\0\0b4\0\0", 16), + partitions[9].end().ToString()); + + ASSERT_EQ(2, partitions[10].hash_buckets().size()); + EXPECT_EQ(0, partitions[10].hash_buckets()[0]); + EXPECT_EQ(0, partitions[10].hash_buckets()[1]); + EXPECT_EQ(string("a5\0\0b5\0\0", 8), + partitions[10].begin().range_key()); + EXPECT_EQ(string("a6\0\0\0\0c6", 8), + partitions[10].end().range_key()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a5\0\0b5\0\0", 16), + partitions[10].begin().ToString()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a6\0\0\0\0c6", 16), + partitions[10].end().ToString()); + + ASSERT_EQ(2, partitions[11].hash_buckets().size()); + EXPECT_EQ(0, partitions[11].hash_buckets()[0]); + EXPECT_EQ(1, partitions[11].hash_buckets()[1]); + EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[11].begin().range_key()); + EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[11].end().range_key()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a5\0\0b5\0\0", 16), + partitions[11].begin().ToString()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a6\0\0\0\0c6", 16), + partitions[11].end().ToString()); + + ASSERT_EQ(2, partitions[12].hash_buckets().size()); + EXPECT_EQ(0, partitions[12].hash_buckets()[0]); + EXPECT_EQ(2, partitions[12].hash_buckets()[1]); + EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[12].begin().range_key()); + EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[12].end().range_key()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a5\0\0b5\0\0", 16), + partitions[12].begin().ToString()); + EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a6\0\0\0\0c6", 16), + partitions[12].end().ToString()); + + ASSERT_EQ(2, partitions[13].hash_buckets().size()); + EXPECT_EQ(1, partitions[13].hash_buckets()[0]); + EXPECT_EQ(0, partitions[13].hash_buckets()[1]); + EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[13].begin().range_key()); + EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[13].end().range_key()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a5\0\0b5\0\0", 16), + partitions[13].begin().ToString()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a6\0\0\0\0c6", 16), + partitions[13].end().ToString()); + + ASSERT_EQ(2, partitions[14].hash_buckets().size()); + EXPECT_EQ(1, partitions[14].hash_buckets()[0]); + EXPECT_EQ(1, partitions[14].hash_buckets()[1]); + EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[14].begin().range_key()); + EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[14].end().range_key()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a5\0\0b5\0\0", 16), + partitions[14].begin().ToString()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a6\0\0\0\0c6", 16), + partitions[14].end().ToString()); + + ASSERT_EQ(2, partitions[15].hash_buckets().size()); + EXPECT_EQ(1, partitions[15].hash_buckets()[0]); + EXPECT_EQ(2, partitions[15].hash_buckets()[1]); + EXPECT_EQ(string("a5\0\0b5\0\0", 8), partitions[15].begin().range_key()); + EXPECT_EQ(string("a6\0\0\0\0c6", 8), partitions[15].end().range_key()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16), + partitions[15].begin().ToString()); + EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16), + partitions[15].end().ToString()); } TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) { @@ -1209,18 +1159,7 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) { { ColumnId(0), ColumnId(1) }, 2); // No table-wide hash bucket schema. - PartitionSchemaPB schema_builder; - PartitionSchema partition_schema; - ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema)); - CheckSerializationFunctions(schema_builder, partition_schema, schema); - - ASSERT_EQ("RANGE (a, b)", partition_schema.DebugString(schema)); - - typedef pair<KuduPartialRow, KuduPartialRow> RangeBound; - vector<RangeBound> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; - vector<pair<RangeBound, PartitionSchema::HashSchema>> - bounds_with_hash_schemas; + PartitionSchemaPB ps_pb; // [(a1, b1), (a2, b2)) { @@ -1230,19 +1169,18 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) { ASSERT_OK(lower.SetStringNoCopy("b", "b1")); ASSERT_OK(upper.SetStringNoCopy("a", "a2")); ASSERT_OK(upper.SetStringNoCopy("b", "b2")); - PartitionSchema::HashSchema hash_schema_2_buckets = - { { { ColumnId(0) }, 2, 0 } }; - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(hash_schema_2_buckets); - bounds_with_hash_schemas.emplace_back( - make_pair(std::move(lower), std::move(upper)), - std::move(hash_schema_2_buckets)); + AddRangePartitionWithSchema( + schema, lower, upper, { { { ColumnId(0) }, 2, 0 } }, &ps_pb); } - vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions( - {}, bounds, range_hash_schemas, schema, &partitions)); + PartitionSchema ps; + ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps)); + CheckSerializationFunctions(ps_pb, ps, schema); + ASSERT_EQ("RANGE (a, b)", ps.DebugString(schema)); + + vector<Partition> partitions; + ASSERT_OK(ps.CreatePartitions(schema, &partitions)); ASSERT_EQ(2, partitions.size()); { @@ -1262,35 +1200,25 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) { } } -TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) { - // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c)) +TEST_F(PartitionTest, VaryingHashSchemasPerUnboundedRanges) { + // CREATE TABLE t (a STRING, b STRING, c STRING, PRIMARY KEY (a, b, c)) // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)]; Schema schema({ ColumnSchema("a", STRING), ColumnSchema("b", STRING), ColumnSchema("c", STRING) }, { ColumnId(0), ColumnId(1), ColumnId(2) }, 3); - PartitionSchemaPB schema_builder; - // Table-wide hash schema defined below. - AddHashDimension(&schema_builder, { "b" }, 2, 0); - PartitionSchema partition_schema; - ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema)); - CheckSerializationFunctions(schema_builder, partition_schema, schema); - - ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)", - partition_schema.DebugString(schema)); - - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; + PartitionSchemaPB ps_pb; + // Table-wide hash schema is defined below. + AddHashDimension(&ps_pb, { "b" }, 2, 0); { // [(_, _, _), (a1, _, c1)) KuduPartialRow lower(&schema); KuduPartialRow upper(&schema); ASSERT_OK(upper.SetStringCopy("a", "a1")); ASSERT_OK(upper.SetStringCopy("c", "c1")); - PartitionSchema::HashSchema hash_schema_4_buckets = {{{ColumnId(0)}, 4, 0}}; - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(hash_schema_4_buckets); + AddRangePartitionWithSchema( + schema, lower, upper, {{{ColumnId(0)}, 4, 0}}, &ps_pb); } { // [(a2, b2, _), (a3, b3, _)) @@ -1300,8 +1228,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) { ASSERT_OK(lower.SetStringCopy("b", "b2")); ASSERT_OK(upper.SetStringCopy("a", "a3")); ASSERT_OK(upper.SetStringCopy("b", "b3")); - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(PartitionSchema::HashSchema()); + AddRangePartitionWithSchema(schema, lower, upper, {}, &ps_pb); } { // [(a4, b4, _), (_, _, _)) @@ -1313,12 +1240,19 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) { {{ColumnId(0)}, 2, 0}, {{ColumnId(2)}, 3, 0} }; - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3); + AddRangePartitionWithSchema( + schema, lower, upper, + {{{ColumnId(0)}, 2, 0}, {{ColumnId(2)}, 3, 0}}, &ps_pb); } + PartitionSchema ps; + ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps)); + CheckSerializationFunctions(ps_pb, ps, schema); + + ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)", ps.DebugString(schema)); + vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions)); + ASSERT_OK(ps.CreatePartitions(schema, &partitions)); ASSERT_EQ(11, partitions.size()); // Partitions below sorted by range, can verify that the partition keyspace is filled by checking // that the start key of the first partition and the end key of the last partition is cleared. @@ -1407,33 +1341,23 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) { } TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) { - // CREATE TABLE t (a VARCHAR, b VARCHAR, PRIMARY KEY (a, b)) + // CREATE TABLE t (a STRING, b STRING, PRIMARY KEY (a, b)) // PARTITION BY [HASH BUCKET (b), RANGE (a, b)]; Schema schema({ ColumnSchema("a", STRING), ColumnSchema("b", STRING) }, { ColumnId(0), ColumnId(1) }, 2); - PartitionSchemaPB schema_builder; - // Table-wide hash schema defined below. - AddHashDimension(&schema_builder, { "b" }, 2, 0); - PartitionSchema partition_schema; - ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema)); - CheckSerializationFunctions(schema_builder, partition_schema, schema); - - ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b)", - partition_schema.DebugString(schema)); - - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; + PartitionSchemaPB ps_pb; + // Table-wide hash schema is defined below. + AddHashDimension(&ps_pb, { "b" }, 2, 0); // [(_, _), (a1, _)) { KuduPartialRow lower(&schema); KuduPartialRow upper(&schema); ASSERT_OK(upper.SetStringCopy("a", "a1")); - PartitionSchema::HashSchema hash_schema_3_buckets = {{{ColumnId(0)}, 3, 0}}; - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(hash_schema_3_buckets); + AddRangePartitionWithSchema( + schema, lower, upper, {{{ColumnId(0)}, 3, 0}}, &ps_pb); } // [(a2, _), (a3, _)) @@ -1442,12 +1366,9 @@ TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) { KuduPartialRow upper(&schema); ASSERT_OK(lower.SetStringCopy("a", "a2")); ASSERT_OK(upper.SetStringCopy("a", "a3")); - bounds.emplace_back(lower, upper); - PartitionSchema::HashSchema hash_schema_3_buckets_by_2 = { - {{ColumnId(0)}, 3, 0}, - {{ColumnId(1)}, 2, 0} - }; - range_hash_schemas.emplace_back(hash_schema_3_buckets_by_2); + AddRangePartitionWithSchema( + schema, lower, upper, + {{{ColumnId(0)}, 3, 0}, {{ColumnId(1)}, 2, 0}}, &ps_pb); } // [(a4, _), (_, _)) @@ -1455,13 +1376,17 @@ TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) { KuduPartialRow lower(&schema); KuduPartialRow upper(&schema); ASSERT_OK(lower.SetStringCopy("a", "a4")); - bounds.emplace_back(lower, upper); - range_hash_schemas.emplace_back(PartitionSchema::HashSchema()); + AddRangePartitionWithSchema(schema, lower, upper, {}, &ps_pb); } + PartitionSchema ps; + ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps)); + CheckSerializationFunctions(ps_pb, ps, schema); + + ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b)", ps.DebugString(schema)); + vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions( - {}, bounds, range_hash_schemas, schema, &partitions)); + ASSERT_OK(ps.CreatePartitions(schema, &partitions)); ASSERT_EQ(10, partitions.size()); { diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc index cad1ab7ff..d02838979 100644 --- a/src/kudu/common/partition.cc +++ b/src/kudu/common/partition.cc @@ -436,7 +436,7 @@ Status PartitionSchema::EncodeRangeBounds( auto& bounds_whs = *bounds_with_hash_schemas; DCHECK(bounds_whs.empty()); if (range_bounds.empty()) { - bounds_whs.emplace_back(RangeWithHashSchema{"", "", {}}); + bounds_whs.emplace_back(RangeWithHashSchema{"", "", hash_schema_}); return Status::OK(); } @@ -460,7 +460,7 @@ Status PartitionSchema::EncodeRangeBounds( "range partition lower bound must be less than the upper bound", RangePartitionDebugString(bound.first, bound.second)); } - RangeWithHashSchema temp{std::move(lower), std::move(upper), {}}; + RangeWithHashSchema temp{std::move(lower), std::move(upper), hash_schema_}; if (!range_hash_schemas.empty()) { temp.hash_schema = range_hash_schemas[j++]; } @@ -499,11 +499,13 @@ Status PartitionSchema::SplitRangeBounds( const Schema& schema, const vector<string>& splits, RangesWithHashSchemas* bounds_with_hash_schemas) const { + DCHECK(bounds_with_hash_schemas); if (splits.empty()) { return Status::OK(); } - auto expected_bounds = std::max(1UL, bounds_with_hash_schemas->size()) + splits.size(); + const auto expected_bounds = + std::max(1UL, bounds_with_hash_schemas->size()) + splits.size(); RangesWithHashSchemas new_bounds_with_hash_schemas; new_bounds_with_hash_schemas.reserve(expected_bounds); @@ -526,12 +528,12 @@ Status PartitionSchema::SplitRangeBounds( // Split the current bound. Add the lower section to the result list, // and continue iterating on the upper section. new_bounds_with_hash_schemas.emplace_back( - RangeWithHashSchema{std::move(lower), *split, {}}); + RangeWithHashSchema{std::move(lower), *split, hash_schema_}); lower = *split; } new_bounds_with_hash_schemas.emplace_back( - RangeWithHashSchema{std::move(lower), upper, {}}); + RangeWithHashSchema{std::move(lower), upper, hash_schema_}); } if (split != splits.end()) { @@ -546,46 +548,23 @@ Status PartitionSchema::SplitRangeBounds( Status PartitionSchema::CreatePartitions( const vector<KuduPartialRow>& split_rows, const vector<pair<KuduPartialRow, KuduPartialRow>>& range_bounds, - const vector<HashSchema>& range_hash_schemas, const Schema& schema, vector<Partition>* partitions) const { - const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32)); + DCHECK(partitions); - if (!range_hash_schemas.empty()) { - if (!split_rows.empty()) { - return Status::InvalidArgument("Both 'split_rows' and 'range_hash_schemas' cannot be " - "populated at the same time."); - } - if (range_bounds.size() != range_hash_schemas.size()) { - return Status::InvalidArgument( - Substitute("$0 vs $1: per range hash schemas and range bounds " - "must have the same size", - range_hash_schemas.size(), range_bounds.size())); - } - } - - auto base_hash_partitions = GenerateHashPartitions(hash_schema_, hash_encoder); - - std::unordered_set<int> range_column_idxs; - for (const ColumnId& column_id : range_schema_.column_ids) { - int column_idx = schema.find_column_by_id(column_id); - if (column_idx == Schema::kColumnNotFound) { - return Status::InvalidArgument(Substitute("range partition column ID $0 " - "not found in table schema.", column_id)); - } - if (!InsertIfNotPresent(&range_column_idxs, column_idx)) { - return Status::InvalidArgument("duplicate column in range partition", - schema.column(column_idx).name()); - } - } + RETURN_NOT_OK(CheckRangeSchema(schema)); RangesWithHashSchemas bounds_with_hash_schemas; - vector<string> splits; - RETURN_NOT_OK(EncodeRangeBounds(range_bounds, range_hash_schemas, schema, + RETURN_NOT_OK(EncodeRangeBounds(range_bounds, {}, schema, &bounds_with_hash_schemas)); + vector<string> splits; RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits)); RETURN_NOT_OK(SplitRangeBounds(schema, splits, &bounds_with_hash_schemas)); + const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32)); + const auto base_hash_partitions = + GenerateHashPartitions(hash_schema_, hash_encoder); + // Even if no hash partitioning for a table is specified, there must be at // least one element in 'base_hash_partitions': it's used to build the result // set of range partitions. @@ -594,112 +573,73 @@ Status PartitionSchema::CreatePartitions( DCHECK(base_hash_partitions.size() > 1 || base_hash_partitions.front().hash_buckets().empty()); - // Maps each partition to its respective hash schema within - // 'bounds_with_hash_schemas', needed for the logic later in this method - // for filling in holes in partition key space. The container is empty if no - // per-range hash schemas are present. - vector<size_t> partition_idx_to_hash_schema_idx; - - if (range_hash_schemas.empty()) { - // Create a partition per range bound and hash bucket combination. - vector<Partition> new_partitions; - for (const Partition& base_partition : base_hash_partitions) { - for (const auto& bound : bounds_with_hash_schemas) { - Partition p = base_partition; - p.begin_.mutable_range_key()->append(bound.lower); - p.end_.mutable_range_key()->append(bound.upper); - new_partitions.emplace_back(std::move(p)); - } + // Create a partition per range bound and hash bucket combination. + vector<Partition> new_partitions; + for (const Partition& base_partition : base_hash_partitions) { + for (const auto& bound : bounds_with_hash_schemas) { + Partition p = base_partition; + p.begin_.mutable_range_key()->append(bound.lower); + p.end_.mutable_range_key()->append(bound.upper); + new_partitions.emplace_back(std::move(p)); } - *partitions = std::move(new_partitions); - } else { - // The number of ranges should match the size of range_hash_schemas. - DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size()); - // No split rows should be defined if range_hash_schemas is populated. - DCHECK(split_rows.empty()); - vector<Partition> result_partitions; - // Iterate through each bound and its hash schemas to generate hash partitions. - for (size_t i = 0; i < bounds_with_hash_schemas.size(); ++i) { - const auto& bound = bounds_with_hash_schemas[i]; - vector<Partition> current_bound_hash_partitions = GenerateHashPartitions( - bound.hash_schema, hash_encoder); - // Add range information to the partition key. - for (Partition& p : current_bound_hash_partitions) { - DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString(); - p.begin_.mutable_range_key()->assign(bound.lower); - DCHECK(p.end().range_key().empty()); - p.end_.mutable_range_key()->assign(bound.upper); - partition_idx_to_hash_schema_idx.emplace_back(i); - } - result_partitions.insert(result_partitions.end(), - std::make_move_iterator(current_bound_hash_partitions.begin()), - std::make_move_iterator(current_bound_hash_partitions.end())); - } - DCHECK_EQ(partition_idx_to_hash_schema_idx.size(), result_partitions.size()); - *partitions = std::move(result_partitions); } - // Note: the following discussion and logic only takes effect when the table's - // partition schema includes at least one hash bucket component, and the - // absolute upper and/or absolute lower range bound is unbounded. - // - // At this point, we have the full set of partitions built up, but each - // partition only covers a finite slice of the partition key-space. Some - // operations involving partitions are easier (pruning, client meta cache) if - // it can be assumed that the partition keyspace does not have holes. - // - // In order to 'fill in' the partition key space, the absolute first and last - // partitions are extended to cover the rest of the lower and upper partition - // range by clearing the start and end partition key, respectively. - // - // When the table has two or more hash components, there will be gaps in - // between partitions at the boundaries of the component ranges. Similar to - // the absolute start and end case, these holes are filled by clearing the - // partition key beginning at the hash component. For a concrete example, - // see PartitionTest::TestCreatePartitions. - for (size_t partition_idx = 0; partition_idx < partitions->size(); ++partition_idx) { - auto& p = (*partitions)[partition_idx]; - const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size()); - // Find the first nonzero-valued bucket from the end and truncate the - // partition key starting from that bucket onwards for zero-valued buckets. - // - // TODO(aserbin): is this really necessary -- zeros in hash key are the - // minimum possible values, and the range part is already - // empty? - if (p.begin().range_key().empty()) { - for (int i = hash_buckets_num - 1; i >= 0; --i) { - if (p.hash_buckets()[i] != 0) { - break; - } - p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i); - } - } - // Starting from the last hash bucket, truncate the partition key until we hit the first - // non-max-valued bucket, at which point, replace the encoding with the next-incremented - // bucket value. For example, the following range end partition keys should be transformed, - // where the key is (hash_comp1, hash_comp2, range_comp): - // - // [ (0, 0, "") -> (0, 1, "") ] - // [ (0, 1, "") -> (1, _, "") ] - // [ (1, 0, "") -> (1, 1, "") ] - // [ (1, 1, "") -> (_, _, "") ] - if (p.end().range_key().empty()) { - const auto& hash_schema = range_hash_schemas.empty() - ? hash_schema_ - : bounds_with_hash_schemas[partition_idx_to_hash_schema_idx[partition_idx]].hash_schema; - for (int i = hash_buckets_num - 1; i >= 0; --i) { - p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i); - const int32_t hash_bucket = p.hash_buckets()[i] + 1; - if (hash_bucket != hash_schema[i].num_buckets) { - hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key()); - break; - } - } + + UpdatePartitionBoundaries(&new_partitions); + *partitions = std::move(new_partitions); + + return Status::OK(); +} + +Status PartitionSchema::CreatePartitions( + const RangesWithHashSchemas& ranges_with_hash_schemas, + const Schema& schema, + vector<Partition>* partitions) const { + DCHECK(partitions); + + RETURN_NOT_OK(CheckRangeSchema(schema)); + + const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32)); + vector<Partition> result_partitions; + // Iterate through each bound and its hash schemas to generate hash partitions. + for (size_t i = 0; i < ranges_with_hash_schemas.size(); ++i) { + const auto& range = ranges_with_hash_schemas[i]; + vector<Partition> current_bound_hash_partitions = GenerateHashPartitions( + range.hash_schema, hash_encoder); + // Add range information to the partition key. + for (Partition& p : current_bound_hash_partitions) { + DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString(); + p.begin_.mutable_range_key()->assign(range.lower); + DCHECK(p.end().range_key().empty()); + p.end_.mutable_range_key()->assign(range.upper); } + result_partitions.insert(result_partitions.end(), + std::make_move_iterator(current_bound_hash_partitions.begin()), + std::make_move_iterator(current_bound_hash_partitions.end())); } + UpdatePartitionBoundaries(&result_partitions); + *partitions = std::move(result_partitions); + return Status::OK(); } +Status PartitionSchema::CreatePartitions(const Schema& schema, + vector<Partition>* partitions) const { + return CreatePartitions(ranges_with_hash_schemas_, schema, partitions); +} + +Status PartitionSchema::CreatePartitionsForRange( + const pair<KuduPartialRow, KuduPartialRow>& range_bound, + const HashSchema& range_hash_schema, + const Schema& schema, + std::vector<Partition>* partitions) const { + RangesWithHashSchemas ranges_with_hash_schemas; + RETURN_NOT_OK(EncodeRangeBounds( + {range_bound}, {range_hash_schema}, schema, &ranges_with_hash_schemas)); + + return CreatePartitions(ranges_with_hash_schemas, schema, partitions); +} + template<typename Row> bool PartitionSchema::PartitionContainsRowImpl(const Partition& partition, const Row& row) const { @@ -1376,6 +1316,83 @@ Status PartitionSchema::Validate(const Schema& schema) const { return Status::OK(); } +Status PartitionSchema::CheckRangeSchema(const Schema& schema) const { + std::unordered_set<int> range_column_idxs; + for (const ColumnId& column_id : range_schema_.column_ids) { + const auto column_idx = schema.find_column_by_id(column_id); + if (column_idx == Schema::kColumnNotFound) { + return Status::InvalidArgument(Substitute( + "range partition column ID $0 not found in table schema", column_id)); + } + if (!InsertIfNotPresent(&range_column_idxs, column_idx)) { + return Status::InvalidArgument("duplicate column in range partition", + schema.column(column_idx).name()); + } + } + return Status::OK(); +} + +void PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) const { + // Note: the following discussion and logic only takes effect when the table's + // partition schema includes at least one hash bucket component, and the + // absolute upper and/or absolute lower range bound is unbounded. + // + // At this point, we have the full set of partitions built up, but each + // partition only covers a finite slice of the partition key-space. Some + // operations involving partitions are easier (pruning, client meta cache) if + // it can be assumed that the partition keyspace does not have holes. + // + // In order to 'fill in' the partition key space, the absolute first and last + // partitions are extended to cover the rest of the lower and upper partition + // range by clearing the start and end partition key, respectively. + // + // When the table has two or more hash components, there will be gaps in + // between partitions at the boundaries of the component ranges. Similar to + // the absolute start and end case, these holes are filled by clearing the + // partition key beginning at the hash component. For a concrete example, + // see PartitionTest::TestCreatePartitions. + const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32)); + for (size_t partition_idx = 0; partition_idx < partitions->size(); ++partition_idx) { + auto& p = (*partitions)[partition_idx]; + const auto& hash_schema = GetHashSchemaForRange(p.begin().range_key()); + CHECK_EQ(hash_schema.size(), p.hash_buckets().size()); + const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size()); + // Find the first nonzero-valued bucket from the end and truncate the + // partition key starting from that bucket onwards for zero-valued buckets. + // + // TODO(aserbin): is this really necessary -- zeros in hash key are the + // minimum possible values, and the range part is already + // empty? + if (p.begin().range_key().empty()) { + for (int i = hash_buckets_num - 1; i >= 0; --i) { + if (p.hash_buckets()[i] != 0) { + break; + } + p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i); + } + } + // Starting from the last hash bucket, truncate the partition key until we hit the first + // non-max-valued bucket, at which point, replace the encoding with the next-incremented + // bucket value. For example, the following range end partition keys should be transformed, + // where the key is (hash_comp1, hash_comp2, range_comp): + // + // [ (0, 0, "") -> (0, 1, "") ] + // [ (0, 1, "") -> (1, _, "") ] + // [ (1, 0, "") -> (1, 1, "") ] + // [ (1, 1, "") -> (_, _, "") ] + if (p.end().range_key().empty()) { + for (int i = hash_buckets_num - 1; i >= 0; --i) { + p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i); + const int32_t hash_bucket = p.hash_buckets()[i] + 1; + if (hash_bucket != hash_schema[i].num_buckets) { + hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key()); + break; + } + } + } + } +} + namespace { // Increments an unset column in the row. diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h index fd03d5419..0cddc64d2 100644 --- a/src/kudu/common/partition.h +++ b/src/kudu/common/partition.h @@ -286,10 +286,11 @@ class PartitionSchema { // in each dimension of the hash schema. typedef std::vector<HashDimension> HashSchema; + // A structure representing a range with custom hash schema. struct RangeWithHashSchema { - std::string lower; - std::string upper; - HashSchema hash_schema; + std::string lower; // encoded range key: lower boundary + std::string upper; // encoded range key: upper boundary + HashSchema hash_schema; // hash schema for the range }; typedef std::vector<RangeWithHashSchema> RangesWithHashSchemas; @@ -322,14 +323,22 @@ class PartitionSchema { // of resulting partitions is the product of the number of hash buckets for // each hash bucket component, multiplied by // (split_rows.size() + max(1, range_bounds.size())). - // 'range_hash_schemas' contains each range's HashSchema, - // its order corresponds to the bounds in 'range_bounds'. - // If 'range_hash_schemas' is empty, the table wide hash schema is used per range. - // Size of 'range_hash_schemas' and 'range_bounds' are equal if 'range_hash_schema' isn't empty. Status CreatePartitions( const std::vector<KuduPartialRow>& split_rows, const std::vector<std::pair<KuduPartialRow, KuduPartialRow>>& range_bounds, - const std::vector<HashSchema>& range_hash_schemas, + const Schema& schema, + std::vector<Partition>* partitions) const WARN_UNUSED_RESULT; + + // Similar to the method above, but uses the 'ranges_with_hash_schemas_' + // member field as the input to produce the result partitions. + Status CreatePartitions( + const Schema& schema, + std::vector<Partition>* partitions) const WARN_UNUSED_RESULT; + + // Create the set of partitions for a single range with specified hash schema. + Status CreatePartitionsForRange( + const std::pair<KuduPartialRow, KuduPartialRow>& range_bound, + const HashSchema& range_hash_schema, const Schema& schema, std::vector<Partition>* partitions) const WARN_UNUSED_RESULT; @@ -512,6 +521,13 @@ class PartitionSchema { const HashSchema& hash_schema, const KeyEncoder<std::string>& hash_encoder); + // Create the set of partitions given the specified ranges with per-range + // hash schemas. The 'partitions' output parameter must be non-null. + Status CreatePartitions( + const RangesWithHashSchemas& ranges_with_hash_schemas, + const Schema& schema, + std::vector<Partition>* partitions) const; + // PartitionKeyDebugString implementation for row types. template<typename Row> std::string PartitionKeyDebugStringImpl(const Row& row) const; @@ -579,6 +595,20 @@ class PartitionSchema { // appropriate error code for an invalid partition schema. Status Validate(const Schema& schema) const; + // Check the range partition schema for consistency: make sure the columns + // used in the range partitioning are present in the table's schema and there + // aren't any duplicate columns. + Status CheckRangeSchema(const Schema& schema) const; + + // Update partitions' boundaries provided with 'partitions' in-out parameter, + // assuming the 'partitions' are populated by calling the CreatePartitions() + // method. When a table has two or more hash components, there will be gaps + // in between partitions at the boundaries of the component ranges. This + // method fills in the address space to have the proper ordering of the + // serialized partition keys -- that's important for partition pruning and + // overall ordering of the serialized partition keys. + void UpdatePartitionBoundaries(std::vector<Partition>* partitions) const; + // Validates the split rows, converts them to partition key form, and inserts // them into splits in sorted order. Status EncodeRangeSplits(const std::vector<KuduPartialRow>& split_rows, diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc index 1552eecef..5f4a41246 100644 --- a/src/kudu/common/partition_pruner-test.cc +++ b/src/kudu/common/partition_pruner-test.cc @@ -73,8 +73,6 @@ class PartitionPrunerTest : public KuduTest { const vector<ColumnNameAndIntValue>& lower_int_cols, const vector<ColumnNameAndIntValue>& upper_int_cols, const vector<ColumnNamesNumBucketsAndSeed>& hash_schemas, - vector<pair<KuduPartialRow, KuduPartialRow>>* bounds, - vector<PartitionSchema::HashSchema>* range_hash_schemas, PartitionSchemaPB* pb); }; @@ -137,8 +135,6 @@ void PartitionPrunerTest::AddRangePartitionWithSchema( const vector<ColumnNameAndIntValue>& lower_int_cols, const vector<ColumnNameAndIntValue>& upper_int_cols, const vector<ColumnNamesNumBucketsAndSeed>& hash_buckets_info, - vector<pair<KuduPartialRow, KuduPartialRow>>* bounds, - vector<PartitionSchema::HashSchema>* range_hash_schemas, PartitionSchemaPB* pb) { auto* range = pb->add_custom_hash_schema_ranges(); RowOperationsPBEncoder encoder(range->mutable_range_bounds()); @@ -172,8 +168,6 @@ void PartitionPrunerTest::AddRangePartitionWithSchema( hash_dimension.seed = get<2>(hash_bucket_info); hash_schema.emplace_back(hash_dimension); } - range_hash_schemas->emplace_back(std::move(hash_schema)); - bounds->emplace_back(lower, upper); } TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) { @@ -202,7 +196,7 @@ TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) { ASSERT_OK(split2.SetInt8("c", 10)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, &partitions)); // Creates a scan with optional lower and upper bounds, and checks that the // expected number of tablets are pruned. @@ -320,7 +314,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) { ASSERT_OK(split2.SetStringCopy("b", "r")); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, &partitions)); // Applies the specified lower and upper bound primary keys against the // schema, and checks that the expected number of partitions are pruned. @@ -429,7 +423,7 @@ TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) { ASSERT_OK(split.SetInt8("b", 0)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, schema, &partitions)); // Applies the specified lower and upper bound primary keys against the // schema, and checks that the expected number of partitions are pruned. @@ -518,7 +512,7 @@ TEST_F(PartitionPrunerTest, TestRangePruning) { ASSERT_OK(split2.SetStringCopy("b", "r")); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, &partitions)); // Applies the specified predicates to a scan and checks that the expected // number of partitions are pruned. @@ -687,7 +681,7 @@ TEST_F(PartitionPrunerTest, TestHashPruning) { ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {}, + ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions)); @@ -763,7 +757,7 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) { ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {}, + ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions)); @@ -841,10 +835,9 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) { ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {}, + ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions)); - // Applies the specified predicates to a scan and checks that the expected // number of partitions are pruned. const auto check = [&] (const vector<ColumnPredicate>& predicates, @@ -940,7 +933,7 @@ TEST_F(PartitionPrunerTest, TestPruning) { vector<Partition> partitions; ASSERT_OK(partition_schema.CreatePartitions( - vector<KuduPartialRow>{ split }, {}, {}, schema, &partitions)); + vector<KuduPartialRow>{ split }, {}, schema, &partitions)); ASSERT_EQ(4, partitions.size()); // Applies the specified predicates to a scan and checks that the expected @@ -1047,7 +1040,7 @@ TEST_F(PartitionPrunerTest, TestKudu2173) { KuduPartialRow split1(&schema); ASSERT_OK(split1.SetInt8("a", 10)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, schema, &partitions)); // Applies the specified predicates to a scan and checks that the expected // number of partitions are pruned. @@ -1099,9 +1092,6 @@ TEST_F(PartitionPrunerTest, DISABLED_TestHashSchemasPerRangePruning) { PartitionSchemaPB pb; CreatePartitionSchemaPB({"C"}, { {{"A"}, 2, 0}, {{"B"}, 2, 0} }, &pb); - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; - // Need to add per range hash schema components to the field // 'ranges_with_hash_schemas_' of PartitionSchema because PartitionPruner will // use them to construct partition key ranges. Currently, @@ -1112,26 +1102,24 @@ TEST_F(PartitionPrunerTest, DISABLED_TestHashSchemasPerRangePruning) { // [(_, _, a), (_, _, c)) AddRangePartitionWithSchema(schema, {{"C", "a"}}, {{"C", "c"}}, {}, {}, - { {{"A"}, 3, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"A"}, 3, 0} }, &pb); // [(_, _, d), (_, _, f)) AddRangePartitionWithSchema(schema, {{"C", "d"}}, {{"C", "f"}}, {}, {}, - { {{"A"}, 2, 0}, {{"B"}, 3, 0} }, - &bounds, &range_hash_schemas, &pb); + { {{"A"}, 2, 0}, {{"B"}, 3, 0} }, &pb); // [(_, _, h), (_, _, j)) - AddRangePartitionWithSchema(schema, {{"C", "h"}}, {{"C", "j"}}, {}, {}, - {}, &bounds, &range_hash_schemas, &pb); + AddRangePartitionWithSchema(schema, {{"C", "h"}}, {{"C", "j"}}, {}, {}, {}, &pb); // [(_, _, k), (_, _, m)) AddRangePartitionWithSchema(schema, {{"C", "k"}}, {{"C", "m"}}, {}, {}, - { {{"B"}, 2, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"B"}, 2, 0} }, &pb); PartitionSchema partition_schema; ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions)); ASSERT_EQ(12, partitions.size()); // Applies the specified predicates to a scan and checks that the expected @@ -1291,26 +1279,23 @@ TEST_F(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePru PartitionSchemaPB pb; CreatePartitionSchemaPB({"a", "b"}, {}, &pb); - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; - // [(0, 0, _), (2, 2, _)) AddRangePartitionWithSchema(schema, {}, {}, {{"a", 0}, {"b", 0}}, {{"a", 2}, {"b", 2}}, - { {{"c"}, 2, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"c"}, 2, 0} }, &pb); // [(2, 2, _), (4, 4, _)) AddRangePartitionWithSchema(schema, {}, {}, {{"a", 2}, {"b", 2}}, {{"a", 4}, {"b", 4}}, - { {{"c"}, 3, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"c"}, 3, 0} }, &pb); // [(4, 4, _), (6, 6, _)) AddRangePartitionWithSchema(schema, {}, {}, {{"a", 4}, {"b", 4}}, {{"a", 6}, {"b", 6}}, - { {{"c"}, 4, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"c"}, 4, 0} }, &pb); PartitionSchema partition_schema; ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions)); ASSERT_EQ(9, partitions.size()); Arena arena(1024); @@ -1398,28 +1383,25 @@ TEST_F(PartitionPrunerTest, TestInListHashPruningPerRange) { PartitionSchemaPB pb; CreatePartitionSchemaPB({"A"}, { {{"B", "C"}, 3, 0} }, &pb); - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; - // None of the ranges below uses the table-wide hash schema. // [(a, _, _), (c, _, _)) AddRangePartitionWithSchema(schema, {{"A", "a"}}, {{"A", "c"}}, {}, {}, - { {{"B"}, 3, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"B"}, 3, 0} }, &pb); // [(c, _, _), (e, _, _)) AddRangePartitionWithSchema(schema, {{"A", "c"}}, {{"A", "e"}}, {}, {}, - {}, &bounds, &range_hash_schemas, &pb); + {}, &pb); // [(e, _, _), (g, _, _)) AddRangePartitionWithSchema(schema, {{"A", "e"}}, {{"A", "g"}}, {}, {}, - { {{"C"}, 3, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"C"}, 3, 0} }, &pb); PartitionSchema partition_schema; ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions)); ASSERT_EQ(7, partitions.size()); // Applies the specified predicates to a scan and checks that the expected @@ -1497,30 +1479,26 @@ TEST_F(PartitionPrunerTest, DISABLED_TestSingleRangeElementAndBoundaryCase) { PartitionSchemaPB pb; CreatePartitionSchemaPB({"A"}, {}, &pb); - vector<pair<KuduPartialRow, KuduPartialRow>> bounds; - vector<PartitionSchema::HashSchema> range_hash_schemas; - // [(_, _), (1, _)) AddRangePartitionWithSchema(schema, {}, {}, {}, {{"A", 1}}, - {{{"B"}, 4, 0}}, &bounds, &range_hash_schemas, &pb); + {{{"B"}, 4, 0}}, &pb); // [(1, _), (2, _)) AddRangePartitionWithSchema(schema, {}, {}, {{"A", 1}}, {{"A", 2}}, - { {{"B"}, 2, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"B"}, 2, 0} }, &pb); // [(2, _), (3, _)) AddRangePartitionWithSchema(schema, {}, {}, {{"A", 2}}, {{"A", 3}}, - { {{"B"}, 3, 0} }, &bounds, &range_hash_schemas, &pb); + { {{"B"}, 3, 0} }, &pb); // [(3, _), (_, _)) - AddRangePartitionWithSchema(schema, {}, {}, {{"A", 3}}, {}, - {}, &bounds, &range_hash_schemas, &pb); + AddRangePartitionWithSchema(schema, {}, {}, {{"A", 3}}, {}, {}, &pb); PartitionSchema partition_schema; ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions)); ASSERT_EQ(10, partitions.size()); // Applies the specified predicates to a scan and checks that the expected diff --git a/src/kudu/common/scan_spec-test.cc b/src/kudu/common/scan_spec-test.cc index a43cdfeab..14a2a46de 100644 --- a/src/kudu/common/scan_spec-test.cc +++ b/src/kudu/common/scan_spec-test.cc @@ -432,7 +432,7 @@ TEST_F(CompositeIntKeysTest, OneHashKeyInListHashPruning) { GeneratePartitionSchema(schema, { { { "a" }, 3 } }, {}, &partition_schema); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); ASSERT_EQ(3, partitions.size()); // Verify the splitted values can merge into original set without overlapping. @@ -474,7 +474,7 @@ TEST_F(CompositeIntKeysTest, OneHashKeyOneRangeKeyInListHashPruning) { vector<Partition> partitions; ASSERT_OK(partition_schema.CreatePartitions( - { split1, split2 }, {}, {}, schema, &partitions)); + { split1, split2 }, {}, schema, &partitions)); ASSERT_EQ(9, partitions.size()); ASSERT_EQ("a IN () AND b IN (50, 100)", @@ -552,7 +552,7 @@ TEST_F(CompositeIntKeysTest, OneHashKeyMultiRangeKeyInListHashPruning) { vector<Partition> partitions; ASSERT_OK(partition_schema.CreatePartitions( - { split1, split2 }, {}, {}, schema, &partitions)); + { split1, split2 }, {}, schema, &partitions)); ASSERT_EQ(9, partitions.size()); ASSERT_EQ("PK >= (int8 a=4, int8 b=0, int8 c=-128) AND " @@ -630,7 +630,7 @@ TEST_F(CompositeIntKeysTest, DifferentHashRangeKeyInListHashPruning) { vector<Partition> partitions; ASSERT_OK(partition_schema.CreatePartitions( - { split1, split2 }, {}, {}, schema, &partitions)); + { split1, split2 }, {}, schema, &partitions)); ASSERT_EQ(9, partitions.size()); ASSERT_EQ("PK >= (int8 a=4, int8 b=0, int8 c=-128) AND " @@ -702,7 +702,7 @@ TEST_F(CompositeIntKeysTest, HashKeyInListHashPruningEmptyDetect) { schema, { { { "a" }, 3 } }, {}, &partition_schema); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); ASSERT_EQ(3, partitions.size()); ASSERT_EQ("PK >= (int8 a=4, int8 b=50, int8 c=-128) AND " @@ -734,7 +734,7 @@ TEST_F(CompositeIntKeysTest, MultiHashKeyOneColumnInListHashPruning) { schema, { { { "a" }, 3 }, { { "b" }, 3 }, }, {}, &partition_schema); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); ASSERT_EQ(9, partitions.size()); // p1, p2, p3 should have the same predicate values to be pushed on hash(a). @@ -807,7 +807,7 @@ TEST_F(CompositeIntKeysTest, MultiHashColumnsInListHashPruning) { schema, { { { "a", "b" }, 3 } }, {}, &partition_schema); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); ASSERT_EQ(3, partitions.size()); ASSERT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND " @@ -844,7 +844,7 @@ TEST_F(CompositeIntKeysTest, MultiHashKeyMultiHashInListHashPruning) { schema, { { { "a", "b" }, 3 }, { { "c" }, 3 } }, {}, &partition_schema); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); ASSERT_EQ(9, partitions.size()); ASSERT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=40) AND " @@ -922,7 +922,7 @@ TEST_F(CompositeIntKeysTest, NonKeyValuesInListHashPruning) { schema, { { { "a" }, 3 } }, {}, &partition_schema); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); ASSERT_EQ(3, partitions.size()); ASSERT_EQ("d IN (1, 2, 3, 4, 5)", diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc index 57e6e6b43..1b898d0e6 100644 --- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc +++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc @@ -1018,7 +1018,7 @@ Status GetPartitionForTxnStatusTablet(int64_t start_txn_id, int64_t end_txn_id, RETURN_NOT_OK(upper_bound.SetInt64(TxnStatusTablet::kTxnIdColName, end_txn_id)); vector<Partition> ps; RETURN_NOT_OK(pschema.CreatePartitions(/*split_rows=*/{}, - { std::make_pair(lower_bound, upper_bound) }, {}, schema, &ps)); + { std::make_pair(lower_bound, upper_bound) }, schema, &ps)); *partition = ps[0]; *partition_schema = pschema; return Status::OK(); diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index ab3380f23..374587be6 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -1917,31 +1917,33 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } } - // TODO(aserbin): make sure range boundaries in - // req.partition_schema().custom_hash_schema_ranges() - // correspond to range_bounds? - vector<PartitionSchema::HashSchema> range_hash_schemas; - if (FLAGS_enable_per_range_hash_schemas) { - // TODO(aserbin): the signature of CreatePartitions() require the - // 'range_hash_schemas' parameters: update its signature - // to remove the extra parameter and rely on its - // 'ranges_with_hash_schemas_' member field; the path in - // CatalogManager::ApplyAlterPartitioningSteps() involving - // CreatePartitions() should be updated correspondingly. - const auto& ps = req.partition_schema(); - for (int i = 0; i < ps.custom_hash_schema_ranges_size(); i++) { - PartitionSchema::HashSchema hash_schema; - RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB( - schema, ps.custom_hash_schema_ranges(i).hash_schema(), &hash_schema)); - range_hash_schemas.emplace_back(std::move(hash_schema)); + vector<Partition> partitions; + if (const auto& ps = req.partition_schema(); + FLAGS_enable_per_range_hash_schemas && !ps.custom_hash_schema_ranges().empty()) { + // TODO(aserbin): in addition, should switch to specifying range information + // only via 'PartitionSchemaPB::custom_hash_schema_ranges' or + // 'CreateTableRequestPB::split_rows_range_bounds', don't mix + if (!split_rows.empty()) { + return Status::InvalidArgument( + "both split rows and custom hash schema ranges cannot be " + "populated at the same time"); } + if (const auto ranges_with_hash_schemas_size = + partition_schema.ranges_with_hash_schemas().size(); + range_bounds.size() != ranges_with_hash_schemas_size) { + return Status::InvalidArgument( + Substitute("$0 vs $1: per range hash schemas and range bounds " + "must have the same size", + ranges_with_hash_schemas_size, range_bounds.size())); + } + // Create partitions based on specified ranges with custom hash schemas. + RETURN_NOT_OK(partition_schema.CreatePartitions(schema, &partitions)); + } else { + // Create partitions based on specified partition schema and split rows. + RETURN_NOT_OK(partition_schema.CreatePartitions( + split_rows, range_bounds, schema, &partitions)); } - // Create partitions based on specified partition schema and split rows. - vector<Partition> partitions; - RETURN_NOT_OK(partition_schema.CreatePartitions( - split_rows, range_bounds, range_hash_schemas, schema, &partitions)); - // Check the restriction on the same number of hash dimensions across all the // ranges. Also, check that the table-wide hash schema has the same number // of hash dimensions as all the partitions with custom hash schemas. @@ -2628,29 +2630,15 @@ Status CatalogManager::ApplyAlterPartitioningSteps( vector<PartitionSchema::HashSchema> range_hash_schemas; for (const auto& step : steps) { + CHECK(step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION || + step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION); + const auto& range_bouds = + step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION + ? step.add_range_partition().range_bounds() + : step.drop_range_partition().range_bounds(); + RowOperationsPBDecoder decoder(&range_bouds, &client_schema, &schema, nullptr); vector<DecodedRowOperation> ops; - if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) { - if (FLAGS_enable_per_range_hash_schemas && - step.add_range_partition().custom_hash_schema_size() > 0) { - const Schema schema = client_schema.CopyWithColumnIds(); - PartitionSchema::HashSchema hash_schema; - RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB( - schema, step.add_range_partition().custom_hash_schema(), &hash_schema)); - if (partition_schema.hash_schema().size() != hash_schema.size()) { - return Status::NotSupported( - "varying number of hash dimensions per range is not yet supported"); - } - range_hash_schemas.emplace_back(std::move(hash_schema)); - } - RowOperationsPBDecoder decoder(&step.add_range_partition().range_bounds(), - &client_schema, &schema, nullptr); - RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops)); - } else { - CHECK_EQ(step.type(), AlterTableRequestPB::DROP_RANGE_PARTITION); - RowOperationsPBDecoder decoder(&step.drop_range_partition().range_bounds(), - &client_schema, &schema, nullptr); - RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops)); - } + RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops)); if (ops.size() != 2) { return Status::InvalidArgument( @@ -2677,8 +2665,26 @@ Status CatalogManager::ApplyAlterPartitioningSteps( } vector<Partition> partitions; - RETURN_NOT_OK(partition_schema.CreatePartitions( - {}, {{ *ops[0].split_row, *ops[1].split_row }}, range_hash_schemas, schema, &partitions)); + const pair<KuduPartialRow, KuduPartialRow> range_bound = + { *ops[0].split_row, *ops[1].split_row }; + if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION && + FLAGS_enable_per_range_hash_schemas && + step.add_range_partition().custom_hash_schema_size() > 0) { + const Schema schema = client_schema.CopyWithColumnIds(); + PartitionSchema::HashSchema hash_schema; + RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB( + schema, step.add_range_partition().custom_hash_schema(), &hash_schema)); + if (partition_schema.hash_schema().size() != hash_schema.size()) { + return Status::NotSupported( + "varying number of hash dimensions per range is not yet supported"); + } + RETURN_NOT_OK(partition_schema.CreatePartitionsForRange( + range_bound, hash_schema, schema, &partitions)); + } else { + RETURN_NOT_OK(partition_schema.CreatePartitions( + {}, { range_bound }, schema, &partitions)); + } + switch (step.type()) { case AlterTableRequestPB::ADD_RANGE_PARTITION: { for (const Partition& partition : partitions) { diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc index d69003f56..d91dbb653 100644 --- a/src/kudu/master/master-test.cc +++ b/src/kudu/master/master-test.cc @@ -1049,8 +1049,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) { range_hash_schemas); ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), - "Both 'split_rows' and 'range_hash_schemas' cannot be " - "populated at the same time."); + "both split rows and custom hash schema ranges " + "cannot be populated at the same time"); } // No non-range columns. diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index 5b61cd7f8..23cf59e59 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -332,7 +332,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) { vector<KuduPartialRow> split_rows; vector<Partition> partitions; - RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, {}, {}, schema, &partitions)); + RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, {}, schema, &partitions)); DCHECK_EQ(1, partitions.size()); RETURN_NOT_OK(tablet::TabletMetadata::CreateNew(fs_manager, diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h index 9b23445e0..b02655447 100644 --- a/src/kudu/tablet/tablet-harness.h +++ b/src/kudu/tablet/tablet-harness.h @@ -53,7 +53,7 @@ static std::pair<PartitionSchema, Partition> CreateDefaultPartition(const Schema // Create the tablet partitions. std::vector<Partition> partitions; - CHECK_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions)); + CHECK_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions)); CHECK_EQ(1, partitions.size()); return std::make_pair(partition_schema, partitions[0]); } diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 68fde5aad..15ad33065 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -4122,7 +4122,7 @@ TEST_F(TabletServerTest, TestWriteOutOfBounds) { ASSERT_OK(end_row.SetInt32("key", 20)); vector<Partition> partitions; - ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, {}, schema, &partitions)); + ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, schema, &partitions)); ASSERT_EQ(3, partitions.size());