This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 3c415ea2f KUDU-2671 introduce PartitionPruner::PrepareRangeSet() 3c415ea2f is described below commit 3c415ea2f4418d60d94ff870154b3128100a7d77 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Sat Jun 25 09:19:26 2022 -0700 KUDU-2671 introduce PartitionPruner::PrepareRangeSet() Since PartitionSchema now provides and persists information only on ranges with custom hash schemas [1], but PartitionPruner::Init()'s logic assumed receiving information on all the existing ranges in case of tables with range-specific hash schemas, it needed an update. This patch does exactly so, adding a new PrepareRangeSet() method into the PartitionPruner class. The new method produces the preliminary set of scanner ranges with proper hash schemas per each range using the information on the table-wide hash schema and range-specific hash schemas provided. It splits the predicate-based range into sub-ranges and assigns corresponding hash schemas to them. In essence, the hash schemas for the ranges with custom hash schemas are known, and the rest of the sub-ranges have the table-wide hash schema. This patch also contains unit test for the newly introduced method. I updated TestHashSchemasPerRangeWithPartialPrimaryKeyRangePruning and TestInListHashPruningPerRange scenarios of the PartitionPrunerTest accordingly since now the number of initial ranges for pruning changed even if the number of non-pruned ranges to scan stayed the same. This is a follow-up to [1]. [1] https://gerrit.cloudera.org/#/c/18642/ Change-Id: I7f1903a444d47d30bbd7e119977cbb87bf1aa458 Reviewed-on: http://gerrit.cloudera.org:8080/18672 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Attila Bukor <abu...@apache.org> --- src/kudu/common/partition.cc | 11 + src/kudu/common/partition.h | 5 +- src/kudu/common/partition_pruner-test.cc | 606 ++++++++++++++++++++++++++++++- src/kudu/common/partition_pruner.cc | 176 ++++++--- src/kudu/common/partition_pruner.h | 24 ++ 5 files changed, 765 insertions(+), 57 deletions(-) diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc index 852f0fe15..309071ef4 100644 --- a/src/kudu/common/partition.cc +++ b/src/kudu/common/partition.cc @@ -301,6 +301,17 @@ Status PartitionSchema::FromPB( } } + // Sort the ranges. + constexpr struct { + bool operator()(const PartitionSchema::RangeWithHashSchema& lhs, + const PartitionSchema::RangeWithHashSchema& rhs) const { + return lhs.lower < rhs.lower; + } + } rangeLess; + sort(ranges_with_custom_hash_schemas.begin(), + ranges_with_custom_hash_schemas.end(), + rangeLess); + auto& dict = partition_schema->hash_schema_idx_by_encoded_range_start_; for (auto it = ranges_with_custom_hash_schemas.cbegin(); it != ranges_with_custom_hash_schemas.cend(); ++it) { diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h index b74ce566b..ef4b48d3a 100644 --- a/src/kudu/common/partition.h +++ b/src/kudu/common/partition.h @@ -462,6 +462,8 @@ class PartitionSchema { return hash_schema_; } + // Return all the known ranges that have custom hash schemas. The ranges are + // sorted by the lower bound in ascending order; the ranges do not intersect. const RangesWithHashSchemas& ranges_with_custom_hash_schemas() const { return ranges_with_custom_hash_schemas_; } @@ -666,7 +668,8 @@ class PartitionSchema { HashSchema hash_schema_; // This contains only ranges with range-specific (i.e. different from - // the table-wide) hash schemas. + // the table-wide) hash schemas. This array is sorted by a range's lower bound + // in ascending order; the ranges do not intersect. RangesWithHashSchemas ranges_with_custom_hash_schemas_; // Encoded start of the range --> index of the hash bucket schemas for the diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc index 85c3cef47..40c0ed398 100644 --- a/src/kudu/common/partition_pruner-test.cc +++ b/src/kudu/common/partition_pruner-test.cc @@ -20,6 +20,7 @@ #include <algorithm> #include <cstddef> #include <cstdint> +#include <iostream> #include <optional> #include <string> #include <tuple> @@ -49,10 +50,12 @@ using std::get; using std::make_tuple; using std::nullopt; using std::optional; +using std::ostream; using std::pair; using std::string; using std::tuple; using std::vector; +using strings::Substitute; namespace kudu { @@ -1283,7 +1286,7 @@ TEST_F(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePru 3); PartitionSchemaPB pb; - CreatePartitionSchemaPB({"a", "b"}, {}, &pb); + CreatePartitionSchemaPB({"a", "b"}, { {{"c"}, 2, 10} }, &pb); // [(0, 0, _), (2, 2, _)) AddRangePartitionWithSchema(schema, {}, {}, {{"a", 0}, {"b", 0}}, {{"a", 2}, {"b", 2}}, @@ -1339,16 +1342,16 @@ TEST_F(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePru }; // No bounds - NO_FATALS(check(nullopt, nullopt, 9, 9)); + NO_FATALS(check(nullopt, nullopt, 9, 13)); // PK < (2, 2, min) - NO_FATALS(check(nullopt, make_tuple<int8_t, int8_t, int8_t>(2, 2, INT8_MIN), 2, 2)); + NO_FATALS(check(nullopt, make_tuple<int8_t, int8_t, int8_t>(2, 2, INT8_MIN), 2, 4)); // PK < (2, 2, 0) - NO_FATALS(check(nullopt, make_tuple<int8_t, int8_t, int8_t>(2, 2, 0), 5, 5)); + NO_FATALS(check(nullopt, make_tuple<int8_t, int8_t, int8_t>(2, 2, 0), 5, 7)); // PK >= (2, 2, 0) - NO_FATALS(check(make_tuple<int8_t, int8_t, int8_t>(2, 2, 0), nullopt, 7, 7)); + NO_FATALS(check(make_tuple<int8_t, int8_t, int8_t>(2, 2, 0), nullopt, 7, 9)); // PK >= (2, 2, min) // PK < (4, 4, min) @@ -1366,7 +1369,7 @@ TEST_F(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePru make_tuple<int8_t, int8_t, int8_t>(4, 2, INT8_MIN), 5, 5)); // PK >= (6, 6, min) - NO_FATALS(check(make_tuple<int8_t, int8_t, int8_t>(6, 6, INT8_MIN), nullopt, 0, 0)); + NO_FATALS(check(make_tuple<int8_t, int8_t, int8_t>(6, 6, INT8_MIN), nullopt, 0, 2)); // PK >= (4, 4, min) // PK < (2, 2, min) @@ -1436,17 +1439,17 @@ TEST_F(PartitionPrunerTest, TestInListHashPruningPerRange) { // B in [0, 1, 8]; B_values = { &zero, &one, &eight }; NO_FATALS(check({ ColumnPredicate::InList(schema.column(1), &B_values) }, - 7, 7)); + 7, 13)); // B in [0, 1]; B_values = { &zero, &one }; NO_FATALS(check({ ColumnPredicate::InList(schema.column(1), &B_values) }, - 6, 6)); + 6, 12)); // C in [0, 1]; C_values = { &zero, &one }; NO_FATALS(check({ ColumnPredicate::InList(schema.column(2), &C_values) }, - 6, 6)); + 6, 12)); // B in [0, 1], C in [0, 1] // (0, 0) in bucket 2 @@ -1457,19 +1460,19 @@ TEST_F(PartitionPrunerTest, TestInListHashPruningPerRange) { C_values = { &zero, &one }; NO_FATALS(check({ ColumnPredicate::InList(schema.column(1), &B_values), ColumnPredicate::InList(schema.column(2), &C_values) }, - 5, 5)); + 5, 11)); // B = 0, C in [0, 1] C_values = { &zero, &one }; NO_FATALS(check({ ColumnPredicate::Equality(schema.column(1), &zero), ColumnPredicate::InList(schema.column(2), &C_values) }, - 4, 4)); + 4, 6)); // B = 1, C in [0, 1] C_values = { &zero, &one }; NO_FATALS(check({ ColumnPredicate::Equality(schema.column(1), &one), ColumnPredicate::InList(schema.column(2), &C_values) }, - 4, 4)); + 4, 8)); } // TODO(aserbin): re-enable this scenario once varying hash dimensions per range @@ -1595,4 +1598,583 @@ TEST_F(PartitionPrunerTest, DISABLED_TestSingleRangeElementAndBoundaryCase) { NO_FATALS(check({ ColumnPredicate::Range(schema.column(0), nullptr, &zero), ColumnPredicate::Equality(schema.column(1), &one)}, 1, 1)); } + +// Test for the functionality of PartitionPruner::PrepareRangeSet() method. +class PartitionPrunerRangeSetTest : public PartitionPrunerTest { + public: + struct TestStruct { + const string description; + const string scan_lower_bound; + const string scan_upper_bound; + const PartitionSchema::HashSchema& table_wide_hash_schema; + const PartitionSchema::RangesWithHashSchemas ranges_with_custom_hash_schemas; + const PartitionSchema::RangesWithHashSchemas expected_result; + }; + + static void DoCheck(const TestStruct& t) { + PartitionSchema::RangesWithHashSchemas result_ranges; + PartitionPruner::PrepareRangeSet( + t.scan_lower_bound, t.scan_upper_bound, t.table_wide_hash_schema, + t.ranges_with_custom_hash_schemas, + &result_ranges); + SCOPED_TRACE(t.description); + ASSERT_EQ(t.expected_result.size(), result_ranges.size()) + << result_ranges; + for (auto i = 0; i < result_ranges.size(); ++i) { + SCOPED_TRACE(Substitute("range $0", i)); + const auto& lhs = t.expected_result[i]; + const auto& rhs = result_ranges[i]; + ASSERT_EQ(lhs.lower, rhs.lower); + ASSERT_EQ(lhs.upper, rhs.upper); + ASSERT_EQ(lhs.hash_schema.size(), rhs.hash_schema.size()); + for (auto j = 0; j < lhs.hash_schema.size(); ++j) { + SCOPED_TRACE(Substitute("hash dimension $0", j)); + ASSERT_EQ(lhs.hash_schema[j].num_buckets, + rhs.hash_schema[j].num_buckets); + } + } + } +}; + +ostream& operator<<(ostream& os, + const PartitionSchema::RangeWithHashSchema& range) { + os << "(" << (range.lower.empty() ? "*" : range.lower) + << " " << (range.upper.empty() ? "*" : range.upper) << ")"; + return os; +} + +ostream& operator<<(ostream& os, + const PartitionSchema::RangesWithHashSchemas& ranges) { + for (const auto& range : ranges) { + os << range << " "; + } + return os; +} + +TEST_F(PartitionPrunerRangeSetTest, PrepareRangeSetX) { + const PartitionSchema::HashSchema _2 = { { {ColumnId(0)}, 2, 0 } }; + const PartitionSchema::HashSchema _3 = { { {ColumnId(0)}, 3, 0 } }; + const PartitionSchema::HashSchema _4 = { { {ColumnId(0)}, 4, 0 } }; + const PartitionSchema::HashSchema _5 = { { {ColumnId(0)}, 5, 0 } }; + + // For each element, there is a representation of the corresponding scenario + // in the 'description' field: the first line is for the scan boundaries, the + // second line is for the set of ranges with custom hash schemas. Two lines + // are properly aligned to express the disposition of the ranges with custom + // hash schemas vs the scan range. For the range bounds, the asterisk symbol + // means "unlimited", i.e. no bound. Square [] and regular () parentheses + // don't have the inclusivity/exclusivity semantics: they are rather to + // distinguish scan bounds from range bounds. + const vector<TestStruct> test_inputs_and_results = { + { +R"*( +"[a b]" +"" +)*", + "a", "b", _2, + {}, + { {"a", "b", _2} } + }, + { +R"*( +"[a *]" +"(a *)" +)*", + "a", "", _2, + { {"a", "", _3} }, + { {"a", "", _3} } + }, + { +R"*( +"[* b]" +"(* b)" +)*", + "", "b", _2, + { {"", "b", _3} }, + { {"", "b", _3} } + }, + { +R"*( +"[* c]" +" (b c)" +)*", + "", "c", _2, + { {"b", "c", _3} }, + { {"", "b", _2}, {"b", "c", _3} } + }, + { +R"*( +"[* *]" +"(* b)" +)*", + "", "", _2, + { {"", "b", _3} }, + { {"", "b", _3}, {"b", "", _2} } + }, + { +R"*( +"[* *]" +" (b *)" +)*", + "", "", _2, + { {"b", "", _3} }, + { { "", "b", _2}, {"b", "", _3} } + }, + { +R"*( +" [c d]" +"(a b)" +)*", + "c", "d", _2, + { {"a", "b", _3} }, + { {"c", "d", _2} } + }, + { +R"*( +" [c d]" +"(* b)" +)*", + "c", "d", _2, + { { "", "b", _3} }, + { {"c", "d", _2} } + }, + { +R"*( +" [c d]" +"(a b)(b c)" +)*", + "c", "d", _2, + { {"a", "b", _3}, {"b", "c", _4} }, + { {"c", "d", _2} } + }, + { +R"*( +" [d e]" +"(a b) (c d)" +)*", + "d", "e", _2, + { {"a", "b", _3}, {"c", "d", _4} }, + { {"d", "e", _2} } + }, + { +R"*( +"[a b]" +" (c d)" +)*", + "a", "b", _2, + { {"c", "d", _3} }, + { {"a", "b", _2} } + }, + { +R"*( +"[a b]" +" (c *)" +)*", + "a", "b", _2, + { {"c", "", _3} }, + { {"a", "b", _2} } + }, + { +R"*( +"[* b]" +" (c d)" +)*", + "", "b", _2, + { {"c", "d", _3} }, + { { "", "b", _2} } + }, + { +R"*( +"[* b]" +" (c *)" +)*", + "", "b", _2, + { {"c", "", _3} }, + { { "", "b", _2} } + }, + { +R"*( +" [c d]" +"(a b) (e f)" +)*", + "c", "d", _2, + { { "a", "b", _3 }, { "e", "f", _4 } }, + { { "c", "d", _2 } } + }, + { +R"*( +" [c d]" +"(* b) (e f)" +)*", + "c", "d", _2, + { { "", "b", _3}, {"e", "f", _4} }, + { {"c", "d", _2} } + }, + { +R"*( +" [c d]" +"(a b) (e *)" +)*", + "c", "d", _2, + { {"a", "b", _3}, {"e", "", _4} }, + { {"c", "d", _2} } + }, + { +R"*( +" [c d]" +"(* b) (e *)" +)*", + "c", "d", _2, + { { "", "b", _3}, {"e", "", _4} }, + { {"c", "d", _2} } + }, + { +R"*( +" [b c]" +"(a b) (c d)" +)*", + "b", "c", _2, + { {"a", "b", _3}, {"c", "d", _4} }, + { {"b", "c", _2} } + }, + { +R"*( +" [b c]" +"(* b) (c d)" +)*", + "b", "c", _2, + { { "", "b", _3}, {"c", "d", _4} }, + { {"b", "c", _2} } + }, + { +R"*( +" [b c]" +"(a b) (c *)" +)*", + "b", "c", _2, + { {"a", "b", _3}, {"c", "", _4} }, + { {"b", "c", _2} } + }, + { +R"*( +" [b c]" +"(* b) (c *)" +)*", + "b", "c", _2, + { { "", "b", _3}, {"c", "", _4} }, + { {"b", "c", _2} } + }, + { +R"*( +" [b c]" +"(* b)" +)*", + "b", "c", _2, + { { "", "b", _3} }, + { {"b", "c", _2} } + }, + { +R"*( +" [b c]" +"(a b)" +)*", + "b", "c", _2, + { {"a", "b", _3} }, + { {"b", "c", _2} } + }, + { +R"*( +"[a b]" +" (b c)" +)*", + "a", "b", _2, + { {"b", "c", _3} }, + { {"a", "b", _2} } + }, + { +R"*( +"[a b]" +" (b *)" +)*", + "a", "b", _2, + { {"b", "", _3} }, + { {"a", "b", _2} } + }, + { +R"*( +"[a c]" +"(a b)" +)*", + "a", "c", _2, + { {"a", "b", _3} }, + { {"a", "b", _3}, {"b", "c", _2} } + }, + { +R"*( +"[a c]" +" (b c)" +)*", + "a", "c", _2, + { {"b", "c", _3} }, + { {"a", "b", _2}, {"b", "c", _3} } + }, + { +R"*( +"[a *]" +" (b *)" +)*", + "a", "", _2, + { {"b", "", _3} }, + { {"a", "b", _2}, {"b", "", _3} } + }, + { +R"*( +"[* b]" +"(* a)" +)*", + "", "b", _2, + { { "", "a", _3} }, + { { "", "a", _3}, {"a", "b", _2} } + }, + { +R"*( +"[a d]" +" (b c)" +)*", + "a", "d", _2, + { {"b", "c", _3} }, + { {"a", "b", _2}, {"b", "c", _3}, {"c", "d", _2} } + }, + { +R"*( +"[* d]" +" (b c)" +)*", + "", "d", _2, + { {"b", "c", _3} }, + { { "", "b", _2}, {"b", "c", _3}, {"c", "d", _2} } + }, + { +R"*( +"[* *]" +" (b c)" +)*", + "", "", _2, + { {"b", "c", _3} }, + { { "", "b", _2}, {"b", "c", _3}, {"c", "", _2} } + }, + { +R"*( +" [b c]" +"(a d)" +)*", + "b", "c", _2, + { {"a", "d", _3} }, + { {"b", "c", _3} } + }, + { +R"*( +" [b c]" +"(a *)" +)*", + "b", "c", _2, + { {"a", "", _3} }, + { {"b", "c", _3} } + }, + { +R"*( +" [b c]" +"(* d)" +)*", + "b", "c", _2, + { { "", "d", _3} }, + { {"b", "c", _3} } + }, + { +R"*( +" [b c]" +"(* *)" +)*", + "b", "c", _2, + { { "", "", _3} }, + { {"b", "c", _3} } + }, + { +R"*( +"[a c]" +" (b *)" +)*", + "a", "c", _2, + { {"b", "", _3} }, + { {"a", "b", _2}, {"b", "c", _3} } + }, + { +R"*( +"[a c]" +"(a b)(b c)" +)*", + "a", "c", _2, + { {"a", "b", _3}, {"b", "c", _4} }, + { {"a", "b", _3}, {"b", "c", _4} } + }, + { +R"*( +"[a c]" +"(a b)(b *)" +)*", + "a", "c", _2, + { {"a", "b", _3}, {"b", "", _4} }, + { {"a", "b", _3}, {"b", "c", _4} } + }, + { +R"*( +"[a e]" +" (b c)(c d)" +)*", + "a", "e", _2, + { {"b", "c", _3}, {"c", "d", _4} }, + { {"a", "b", _2}, {"b", "c", _3}, {"c", "d", _4}, {"d", "e", _2} } + }, + { +R"*( +"[a *]" +" (b c) (d *)" +)*", + "a", "", _2, + { {"b", "c", _3}, {"d", "", _4} }, + { {"a", "b", _2}, {"b", "c", _3}, {"c", "d", _2}, {"d", "", _4} } + }, + { +R"*( +"[a f]" +" (b c) (d *)" +)*", + "a", "f", _2, + { {"b", "c", _3}, {"d", "", _4} }, + { {"a", "b", _2}, {"b", "c", _3}, {"c", "d", _2}, {"d", "f", _4} } + }, + { +R"*( +"[a f]" +" (b c) (d e)" +)*", + "a", "f", _2, + { {"b", "c", _3}, {"d", "e", _4} }, + { + {"a", "b", _2}, + {"b", "c", _3}, + {"c", "d", _2}, + {"d", "e", _4}, + {"e", "f", _2}, + } + }, + { +R"*( +"[a *]" +" (b c) (d e)" +)*", + "a", "", _2, + { {"b", "c", _3}, {"d", "e", _4} }, + { + {"a", "b", _2}, + {"b", "c", _3}, + {"c", "d", _2}, + {"d", "e", _4}, + {"e", "", _2}, + } + }, + { +R"*( +"[* f]" +" (b c) (d e)" +)*", + "", "f", _2, + { {"b", "c", _3}, {"d", "e", _4} }, + { + { "", "b", _2}, + {"b", "c", _3}, + {"c", "d", _2}, + {"d", "e", _4}, + {"e", "f", _2}, + } + }, + { +R"*( +"[a d]" +" (b c) (e f)" +)*", + "a", "d", _2, + { {"b", "c", _3}, {"e", "f", _4} }, + { {"a", "b", _2}, {"b", "c", _3}, {"c", "d", _2} } + }, + { +R"*( +" [b e]" +"(a c) (d *)" +)*", + "b", "e", _2, + { {"a", "c", _3}, {"d", "", _4} }, + { {"b", "c", _3}, {"c", "d", _2}, {"d", "e", _4} } + }, + { +R"*( +"[* e]" +" (b c) (d *)" +)*", + "", "e", _2, + { {"b", "c", _3}, {"d", "", _4} }, + { { "", "b", _2}, {"b", "c", _3}, {"c", "d", _2}, {"d", "e", _4} } + }, + { +R"*( +" [b e]" +"(a b) (c d) (e f)" +)*", + "b", "e", _2, + { {"a", "b", _3}, {"c", "d", _4}, {"e", "f", _5}, }, + { {"b", "c", _2}, {"c", "d", _4}, {"d", "e", _2}, } + }, + { +R"*( +" [b e]" +"(* b) (c d) (e f)" +)*", + "b", "e", _2, + { { "", "b", _3}, {"c", "d", _4}, {"e", "f", _5}, }, + { {"b", "c", _2}, {"c", "d", _4}, {"d", "e", _2}, } + }, + { +R"*( +" [b e]" +"(a b) (c d) (e *)" +)*", + "b", "e", _2, + { {"a", "b", _3}, {"c", "d", _4}, {"e", "", _5}, }, + { {"b", "c", _2}, {"c", "d", _4}, {"d", "e", _2}, } + }, + { +R"*( +" [b e]" +"(* b) (c d) (e *)" +)*", + "b", "e", _2, + { { "", "b", _3}, {"c", "d", _4}, {"e", "", _5}, }, + { {"b", "c", _2}, {"c", "d", _4}, {"d", "e", _2}, } + }, + { +R"*( +"[* e]" +"(* b) (c d) (e *)" +)*", + "", "e", _2, + { { "", "b", _3}, {"c", "d", _4}, {"e", "", _5}, }, + { { "", "b", _3}, {"b", "c", _2}, {"c", "d", _4}, {"d", "e", _2}, } + }, + }; + + for (const auto& scenario : test_inputs_and_results) { + NO_FATALS(DoCheck(scenario)); + } +} + } // namespace kudu diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc index f7240c60d..567e91bc4 100644 --- a/src/kudu/common/partition_pruner.cc +++ b/src/kudu/common/partition_pruner.cc @@ -329,6 +329,118 @@ vector<PartitionPruner::PartitionKeyRange> PartitionPruner::ConstructPartitionKe return partition_key_ranges; } +// NOTE: the lower ranges are inclusive, the upper ranges are exclusive. +void PartitionPruner::PrepareRangeSet( + const string& scan_lower_bound, + const string& scan_upper_bound, + const PartitionSchema::HashSchema& table_wide_hash_schema, + const PartitionSchema::RangesWithHashSchemas& ranges, + PartitionSchema::RangesWithHashSchemas* result_ranges) { + DCHECK(result_ranges); + CHECK(scan_upper_bound.empty() || scan_lower_bound < scan_upper_bound); + + // If there aren't any ranges with custom hash schemas or there isn't an + // intersection between the set of ranges with custom hash schemas and the + // scan range, the result is trivial: the whole scan range is attributed + // to the table-wide hash schema. + if (ranges.empty() || + (!scan_upper_bound.empty() && scan_upper_bound < ranges.front().lower) || + (!scan_lower_bound.empty() && !ranges.back().upper.empty() && + ranges.back().upper <= scan_lower_bound)) { + *result_ranges = + { { scan_lower_bound, scan_upper_bound, table_wide_hash_schema } }; + return; + } + + // Find the first range that is at or after the specified bounds. + const auto range_it = std::lower_bound(ranges.cbegin(), ranges.cend(), + RangeBounds{scan_lower_bound, scan_upper_bound}, + [] (const PartitionSchema::RangeWithHashSchema& range, + const RangeBounds& bounds) { + // return true if range < bounds + return !range.upper.empty() && range.upper <= bounds.lower; + }); + CHECK(range_it != ranges.cend()); + + // Current position of the iterator. + string cur_point = scan_lower_bound; + // Index of the known range with custom hash schema that the iterator is + // currently pointing at or about to point if the iterator is currently + // at the scan boundary. + size_t cur_idx = distance(ranges.begin(), range_it); + + CHECK_LT(cur_idx, ranges.size()); + + // Iterate over the scan range from one known boundary to the next one, + // enumerating the resulting consecutive sub-ranges and attributing each + // sub-range to a proper hash schema. If that's a known range with custom hash + // schema, it's attributed to its range-specific hash schema; otherwise, + // a sub-range is attributed to the table-wide hash schema. + PartitionSchema::RangesWithHashSchemas result; + while (cur_idx < ranges.size() && + (cur_point < scan_upper_bound || scan_upper_bound.empty())) { + // Check the disposition of cur_point related to the lower boundary + // of the range pointed to by 'cur_idx'. + const auto& cur_range = ranges[cur_idx]; + if (cur_point < cur_range.lower) { + // The iterator is before the current range: + // |---| + // ^ + // The next known bound is either the upper bound of the current range + // or the upper bound of the scan. + auto upper_bound = scan_upper_bound.empty() + ? cur_range.lower : std::min(cur_range.lower, scan_upper_bound); + result.emplace_back(PartitionSchema::RangeWithHashSchema{ + cur_point, upper_bound, table_wide_hash_schema}); + // Not advancing the 'cur_idx' since cur_point is either at the beginning + // of the range or before it at the upper bound of the scan. + } else if (cur_point == cur_range.lower) { + // The iterator is at the lower boundary of the current range: + // |---| + // ^ + if ((!cur_range.upper.empty() && cur_range.upper <= scan_upper_bound) || + scan_upper_bound.empty()) { + // The current range is within the scan boundaries. + result.emplace_back(cur_range); + } else { + // The current range spans over the upper bound of the scan. + result.emplace_back(PartitionSchema::RangeWithHashSchema{ + cur_point, scan_upper_bound, cur_range.hash_schema}); + } + // Done with the current range, advance to the next one, if any. + ++cur_idx; + } else { + // The iterator is ahead of the current range's lower boundary: + // |---| + // ^ + if ((!scan_upper_bound.empty() && scan_upper_bound <= cur_range.upper) || + cur_range.upper.empty()) { + result.emplace_back(PartitionSchema::RangeWithHashSchema{ + cur_point, scan_upper_bound, cur_range.hash_schema}); + } else { + result.emplace_back(PartitionSchema::RangeWithHashSchema{ + cur_point, cur_range.upper, cur_range.hash_schema}); + } + // Done with the current range, advance to the next one, if any. + ++cur_idx; + } + // Advance the iterator. + cur_point = result.back().upper; + } + + // If exiting from the cycle above by the 'cur_idx < ranges.size()' condition, + // check if the upper bound of the scan is beyond the upper bound of the last + // range with custom hash schema. If so, add an extra range that spans from + // the upper bound of the last range to the upper bound of the scan. + if (result.back().upper != scan_upper_bound) { + DCHECK_EQ(cur_point, result.back().upper); + result.emplace_back(PartitionSchema::RangeWithHashSchema{ + cur_point, scan_upper_bound, table_wide_hash_schema}); + } + + *result_ranges = std::move(result); +} + void PartitionPruner::Init(const Schema& schema, const PartitionSchema& partition_schema, const ScanSpec& scan_spec) { @@ -440,52 +552,28 @@ void PartitionPruner::Init(const Schema& schema, move(partition_key_ranges.rbegin(), partition_key_ranges.rend(), first_range.partition_key_ranges.begin()); } else { - vector<RangeBounds> range_bounds; - vector<PartitionSchema::HashSchema> hash_schemas_per_range; - for (const auto& range : partition_schema.ranges_with_custom_hash_schemas()) { - const auto& hash_schema = range.hash_schema; - // Both lower and upper bounds of the scan are unbounded. - if (scan_range_lower_bound.empty() && scan_range_upper_bound.empty()) { - range_bounds.emplace_back(RangeBounds{range.lower, range.upper}); - hash_schemas_per_range.emplace_back(hash_schema); - continue; - } - // Only one of the lower/upper bounds of the scan is unbounded. - if (scan_range_lower_bound.empty()) { - if (scan_range_upper_bound > range.lower) { - range_bounds.emplace_back(RangeBounds{range.lower, range.upper}); - hash_schemas_per_range.emplace_back(hash_schema); - } - continue; - } - if (scan_range_upper_bound.empty()) { - if (range.upper.empty() || scan_range_lower_bound < range.upper) { - range_bounds.emplace_back(RangeBounds{range.lower, range.upper}); - hash_schemas_per_range.emplace_back(hash_schema); - } - continue; - } - // Both lower and upper ranges of the scan are bounded. - if ((range.upper.empty() || scan_range_lower_bound < range.upper) && - scan_range_upper_bound > range.lower) { - range_bounds.emplace_back(RangeBounds{range.lower, range.upper}); - hash_schemas_per_range.emplace_back(hash_schema); - } - } - DCHECK_EQ(range_bounds.size(), hash_schemas_per_range.size()); - range_bounds_to_partition_key_ranges_.resize(hash_schemas_per_range.size()); - // Construct partition key ranges from the ranges and their respective hash schemas - // that falls within the scan's bounds. - for (size_t i = 0; i < hash_schemas_per_range.size(); ++i) { - const auto& hash_schema = hash_schemas_per_range[i]; - const auto bounds = - scan_range_lower_bound.empty() && scan_range_upper_bound.empty() - ? RangeBounds{range_bounds[i].lower, range_bounds[i].upper} - : RangeBounds{scan_range_lower_bound, scan_range_upper_bound}; + // Build the preliminary set or ranges: that's to convey information on + // range-specific hash schemas since some ranges in the table can have + // custom (i.e. different from the table-wide) hash schemas. + PartitionSchema::RangesWithHashSchemas preliminary_ranges; + PartitionPruner::PrepareRangeSet( + scan_range_lower_bound, + scan_range_upper_bound, + partition_schema.hash_schema(), + partition_schema.ranges_with_custom_hash_schemas(), + &preliminary_ranges); + + range_bounds_to_partition_key_ranges_.resize(preliminary_ranges.size()); + // Construct partition key ranges from the ranges and their respective hash + // schemas that falls within the scan's bounds. + for (size_t i = 0; i < preliminary_ranges.size(); ++i) { + const auto& hash_schema = preliminary_ranges[i].hash_schema; + RangeBounds range_bounds{ + preliminary_ranges[i].lower, preliminary_ranges[i].upper}; auto partition_key_ranges = ConstructPartitionKeyRanges( - schema, scan_spec, hash_schema, bounds); + schema, scan_spec, hash_schema, range_bounds); auto& current_range = range_bounds_to_partition_key_ranges_[i]; - current_range.range_bounds = range_bounds[i]; + current_range.range_bounds = std::move(range_bounds); current_range.partition_key_ranges.resize(partition_key_ranges.size()); move(partition_key_ranges.rbegin(), partition_key_ranges.rend(), current_range.partition_key_ranges.begin()); diff --git a/src/kudu/common/partition_pruner.h b/src/kudu/common/partition_pruner.h index e3c3aa33a..fb17e2ee1 100644 --- a/src/kudu/common/partition_pruner.h +++ b/src/kudu/common/partition_pruner.h @@ -21,6 +21,8 @@ #include <string> #include <vector> +#include <gtest/gtest_prod.h> + #include "kudu/common/partition.h" #include "kudu/gutil/macros.h" @@ -72,6 +74,9 @@ class PartitionPruner { std::string ToString(const Schema& schema, const PartitionSchema& partition_schema) const; private: + friend class PartitionPrunerRangeSetTest; + FRIEND_TEST(PartitionPrunerRangeSetTest, PrepareRangeSet); + struct RangeBounds { std::string lower; std::string upper; @@ -102,6 +107,25 @@ class PartitionPruner { const PartitionSchema::HashSchema& hash_schema, const RangeBounds& range_bounds); + // Produce the preliminary set of scanner ranges with proper hash schemas + // per each range for the scan range defined by 'scan_lower_bound' and + // 'scan_upper_bound'. The method uses the information on the table-wide hash + // schema and the range-specific hash schemas provided with + // 'table_wide_hash_schema' and 'ranges_with_custom_hash_schemas' + // correspondingly. + // + // The predicate-based range is split into sub-ranges that are assigned + // corresponding hash schemas to them. In essence, the hash schemas for the + // ranges with custom hash schemas are known from the + // 'ranges_with_custom_hash_schemas' parameter, and the rest of the sub-ranges + // have the table-wide hash schema. + static void PrepareRangeSet( + const std::string& scan_lower_bound, + const std::string& scan_upper_bound, + const PartitionSchema::HashSchema& table_wide_hash_schema, + const PartitionSchema::RangesWithHashSchemas& ranges_with_custom_hash_schemas, + PartitionSchema::RangesWithHashSchemas* ranges); + // A vector of a pair of lower and upper range bounds mapped to a reverse // sorted set of partition key ranges. Each partition key range within the set // has an inclusive lower bound and an exclusive upper bound.