This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.17.x in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.17.x by this push: new 597d2bf15 KUDU-3564: Fix IN list predicate pruning 597d2bf15 is described below commit 597d2bf156df097e7b04c7040323a55b291d0f3f Author: zhangyifan27 <chinazhangyi...@163.com> AuthorDate: Fri Apr 5 09:35:46 2024 +0800 KUDU-3564: Fix IN list predicate pruning This patch fixes IN list predicate pruning with a range specific hash schema by modifying the content of 'PartitionMayContainRow' method. We now get the right hash schema based on specific partition's lower bound key. This is a follow-up to 607d9d0. Change-Id: I964b1ccfb85602741843ab555cdee53343217033 Reviewed-on: http://gerrit.cloudera.org:8080/21243 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> (cherry picked from commit 5a2c776dfb894310fc286f3ebe60d53c8a5e9341) Reviewed-on: http://gerrit.cloudera.org:8080/21253 Reviewed-by: Yifan Zhang <chinazhangyi...@163.com> --- src/kudu/common/column_predicate.cc | 2 +- src/kudu/common/partition.cc | 4 +- src/kudu/common/scan_spec-test.cc | 101 ++++++++++++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 14 deletions(-) diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc index 4b163da9e..08d582715 100644 --- a/src/kudu/common/column_predicate.cc +++ b/src/kudu/common/column_predicate.cc @@ -464,7 +464,7 @@ void ColumnPredicate::MergeIntoIsNull(const ColumnPredicate &other) { void ColumnPredicate::MergeIntoInList(const ColumnPredicate &other) { CHECK(predicate_type_ == PredicateType::InList); - DCHECK(values_.size() > 1); + DCHECK(values_.size() >= 1); switch (other.predicate_type()) { case PredicateType::None: { diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc index cd8599ca3..0d3c150a9 100644 --- a/src/kudu/common/partition.cc +++ b/src/kudu/common/partition.cc @@ -808,9 +808,7 @@ bool PartitionSchema::PartitionMayContainRow(const Partition& partition, return false; } - string range_key; - EncodeColumns(row, range_schema_.column_ids, &range_key); - const auto& hash_schema = GetHashSchemaForRange(range_key); + const auto& hash_schema = GetHashSchemaForRange(partition.begin_.range_key()); for (size_t i = 0; i < hash_schema.size(); ++i) { const auto& hash_dimension = hash_schema[i]; if (hash_dimension.column_ids.size() == 1 && diff --git a/src/kudu/common/scan_spec-test.cc b/src/kudu/common/scan_spec-test.cc index 6c8e68ac9..90549a32a 100644 --- a/src/kudu/common/scan_spec-test.cc +++ b/src/kudu/common/scan_spec-test.cc @@ -35,6 +35,8 @@ #include "kudu/common/partial_row.h" #include "kudu/common/partition.h" #include "kudu/common/row.h" +#include "kudu/common/row_operations.h" +#include "kudu/common/row_operations.pb.h" #include "kudu/common/schema.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/stringpiece.h" @@ -51,16 +53,13 @@ using std::vector; namespace kudu { namespace { -// Generate partition schema of a table with given hash_partitions and range partition keys. -// E.g. GeneratePartitionSchema(schema, {make_pair({a, b}, 3), make_pair({c}, 5) }) -// Returns 'partition by hash(a, b) partitions 3, hash(c) partitions 5'. -void GeneratePartitionSchema(const Schema& schema, - const vector<pair<vector<string>, int>>& hash_partitions, - const vector<string>& range_partition_columns, - PartitionSchema* partition_schema) { - PartitionSchemaPB partition_schema_pb; + +void GeneratePartitionSchemaPB(const Schema& schema, + const vector<pair<vector<string>, int>>& hash_partitions, + const vector<string>& range_partition_columns, + PartitionSchemaPB* partition_schema_pb) { for (const auto& col_names_and_num_buckets : hash_partitions) { - auto* hash_dimension_pb = partition_schema_pb.add_hash_schema(); + auto* hash_dimension_pb = partition_schema_pb->add_hash_schema(); hash_dimension_pb->set_num_buckets(col_names_and_num_buckets.second); hash_dimension_pb->set_seed(0); for (const auto& col_name : col_names_and_num_buckets.first) { @@ -71,14 +70,53 @@ void GeneratePartitionSchema(const Schema& schema, } } if (!range_partition_columns.empty()) { - auto* range_schema = partition_schema_pb.mutable_range_schema(); + auto* range_schema = partition_schema_pb->mutable_range_schema(); for (const auto& range_column : range_partition_columns) { range_schema->add_columns()->set_name(range_column); } } +} + +// Generate partition schema of a table with given hash_partitions and range partition keys. +// E.g. GeneratePartitionSchema(schema, {make_pair({a, b}, 3), make_pair({c}, 5) }) +// Returns 'partition by hash(a, b) partitions 3, hash(c) partitions 5'. +void GeneratePartitionSchema(const Schema& schema, + const vector<pair<vector<string>, int>>& hash_partitions, + const vector<string>& range_partition_columns, + PartitionSchema* partition_schema) { + PartitionSchemaPB partition_schema_pb; + GeneratePartitionSchemaPB( + schema, hash_partitions, range_partition_columns, &partition_schema_pb); CHECK_OK(PartitionSchema::FromPB(partition_schema_pb, schema, partition_schema)); } +void AddRangePartitionWithSchema(const Schema& schema, + const vector<pair<vector<string>, int>>& hash_partitions, + const vector<pair<string, int8_t>>& lower_int_cols, + const vector<pair<string, int8_t>>& upper_int_cols, + PartitionSchemaPB* pb) { + auto* range = pb->add_custom_hash_schema_ranges(); + RowOperationsPBEncoder encoder(range->mutable_range_bounds()); + KuduPartialRow lower(&schema); + KuduPartialRow upper(&schema); + for (const auto& [column_name, value] : lower_int_cols) { + ASSERT_OK(lower.SetInt8(column_name, value)); + } + for (const auto& [column_name, value] : upper_int_cols) { + ASSERT_OK(upper.SetInt8(column_name, value)); + } + encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower); + encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper); + for (const auto& [col_names, num_buckets] : hash_partitions) { + auto* hash_dimension_pb = range->add_hash_schema(); + for (const auto& column_name : col_names) { + hash_dimension_pb->add_columns()->set_name(column_name); + } + hash_dimension_pb->set_num_buckets(num_buckets); + hash_dimension_pb->set_seed(0); + } +} + // Copy a spec and return the pruned spec string. string PruneInlistValuesAndGetSchemaString(const ScanSpec& spec, const Schema& schema, @@ -938,6 +976,49 @@ TEST_F(CompositeIntKeysTest, NonKeyValuesInListHashPruning) { spec, schema, partitions[2], partition_schema, &arena_)); } +// Test that IN list pruning works with a per range hash schema. +TEST_F(CompositeIntKeysTest, CustomRangePartitionInListPruning) { + ScanSpec spec; + AddInPredicate<int8_t>(&spec, "a", { 1, 2}); + AddInPredicate<int8_t>(&spec, "b", { 1, 2}); + + const auto schema = schema_.CopyWithColumnIds(); + PartitionSchemaPB partition_schema_pb; + GeneratePartitionSchemaPB( + schema, { { { "a" }, 3 } }, {"b"}, &partition_schema_pb); + AddRangePartitionWithSchema( + schema, { { { "a" }, 2 } }, {{"b", 1}}, {{"b", 2}}, &partition_schema_pb); + PartitionSchema partition_schema; + PartitionSchema::RangesWithHashSchemas ranges; + ASSERT_OK(PartitionSchema::FromPB(partition_schema_pb, schema, &partition_schema, &ranges)); + + vector<Partition> partitions; + ASSERT_OK(partition_schema.CreatePartitions(ranges, schema, &partitions)); + ASSERT_EQ(2, partitions.size()); + + // Set lower and upper bound for 'spec' to imitate a scan request from client that + // has start_primary_key and stop_primary_key. + KuduPartialRow lower_bound(&schema_); + CHECK_OK(lower_bound.SetInt8("a", -128)); + CHECK_OK(lower_bound.SetInt8("b", -128)); + CHECK_OK(lower_bound.SetInt8("c", -128)); + SetLowerBound(&spec, lower_bound); + KuduPartialRow upper_bound(&schema_); + CHECK_OK(upper_bound.SetInt8("a", 127)); + CHECK_OK(upper_bound.SetInt8("b", 127)); + CHECK_OK(upper_bound.SetInt8("c", 127)); + SetExclusiveUpperBound(&spec, upper_bound); + + ASSERT_EQ("PK >= (int8 a=2, int8 b=1, int8 c=-128) AND " + "PK < (int8 a=2, int8 b=2, int8 c=-128) AND a IN (2) AND b IN (1)", + PruneInlistValuesAndGetSchemaString( + spec, schema, partitions[0], partition_schema, &arena_)); + ASSERT_EQ("PK >= (int8 a=1, int8 b=1, int8 c=-128) AND " + "PK < (int8 a=1, int8 b=2, int8 c=-128) AND a IN (1) AND b IN (1)", + PruneInlistValuesAndGetSchemaString( + spec, schema, partitions[1], partition_schema, &arena_)); +} + // Test that IN list mixed with range predicates get pushed into the primary key // bounds. TEST_F(CompositeIntKeysTest, TestInListPushdownWithRange) {