This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new e572e5967 [catalog_manager] KUDU-2671 fix bug in CreatePartitionsForRange() e572e5967 is described below commit e572e59673f6dade62f96eb38b6417ef018bc7bf Author: Alexey Serbin <ale...@apache.org> AuthorDate: Tue Jul 26 16:44:42 2022 -0700 [catalog_manager] KUDU-2671 fix bug in CreatePartitionsForRange() This patch fixes a bug in PartitionSchema::CreatePartitionsForRange(). The manifestation of the bug was the inability to add an unbounded range with custom hash schema (e.g. [0, +inf)) by AlterTable due to a conflict with already existing range (e.g., [-inf, 0)) when if fact there was no conflict at all. The root cause was the assumption that PartitionSchema contained information on the range to be added in its internal map 'hash_schema_idx_by_encoded_range_start_' but that wasn't the case, so GetHashSchemaForRange() would return the table-wide hash schema for the new range being added instead of proper range-specific hash schema. That lead to incorrect updating of range boundaries in UpdatePartitionBoundaries(), producing wrong results. This patch also contains a new test scenario that allowed to reproduce the issue: the new scenario is failing without the fix applied. Change-Id: I33a2bdea2e71bf4b567664c0166e9fbc07c4b882 Reviewed-on: http://gerrit.cloudera.org:8080/18793 Tested-by: Kudu Jenkins Reviewed-by: Mahesh Reddy <mre...@cloudera.com> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> Reviewed-by: Attila Bukor <abu...@apache.org> --- src/kudu/client/flex_partitioning_client-test.cc | 63 ++++++++++++++ src/kudu/common/partition.cc | 103 ++++++++++++++--------- src/kudu/common/partition.h | 11 ++- 3 files changed, 137 insertions(+), 40 deletions(-) diff --git a/src/kudu/client/flex_partitioning_client-test.cc b/src/kudu/client/flex_partitioning_client-test.cc index 3a89aea24..6b42eff1b 100644 --- a/src/kudu/client/flex_partitioning_client-test.cc +++ b/src/kudu/client/flex_partitioning_client-test.cc @@ -1775,6 +1775,69 @@ TEST_F(FlexPartitioningAlterTableTest, ReadAndWriteToCustomRangePartition) { } } +TEST_F(FlexPartitioningAlterTableTest, ReadAndWriteToUnboundedCustomRangePartition) { + constexpr const char* const kTableName = + "ReadAndWriteToUnboundedCustomRangePartition"; + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + unique_ptr<KuduPartialRow> lower(schema_.NewRow()); + ASSERT_OK(lower->SetInt32(kKeyColumn, -100)); + unique_ptr<KuduPartialRow> upper(schema_.NewRow()); + ASSERT_OK(upper->SetInt32(kKeyColumn, 100)); + table_creator->table_name(kTableName) + .schema(&schema_) + .num_replicas(1) + .add_hash_partitions({ kKeyColumn }, 2) + .set_range_partition_columns({ kKeyColumn }) + .add_range_partition(lower.release(), upper.release()); + ASSERT_OK(table_creator->Create()); + + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); + { + auto p = CreateRangePartitionNoLowerBound(-100); + ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1)); + table_alterer->AddRangePartition(p.release()); + } + { + auto p = CreateRangePartitionNoUpperBound(100); + ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2)); + table_alterer->AddRangePartition(p.release()); + } + ASSERT_OK(table_alterer->Alter()); + NO_FATALS(CheckTabletCount(kTableName, 9)); // 2 + 3 + 4 = 9 + + // Make sure it's possible to insert rows into the table for all the existing + // the partitions: first check the range of table-wide schema, then check + // the ranges with custom hash schemas. + ASSERT_OK(InsertTestRows(kTableName, -100, 100)); + NO_FATALS(CheckTableRowsNum(kTableName, 200)); + ASSERT_OK(InsertTestRows(kTableName, -200, -100)); + NO_FATALS(CheckTableRowsNum(kTableName, 300)); + ASSERT_OK(InsertTestRows(kTableName, 100, 200)); + NO_FATALS(CheckTableRowsNum(kTableName, 400)); + ASSERT_OK(InsertTestRows(kTableName, INT32_MIN, INT32_MIN + 100)); + NO_FATALS(CheckTableRowsNum(kTableName, 500)); + ASSERT_OK(InsertTestRows(kTableName, INT32_MAX - 100, INT32_MAX)); + NO_FATALS(CheckTableRowsNum(kTableName, 600)); + + NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 300, 400)); + + // Drop a partition in the middle and re-scan with various ranges. + { + unique_ptr<KuduTableAlterer> table_alterer_drop(client_->NewTableAlterer(kTableName)); + unique_ptr<KuduPartialRow> lower_drop(schema_.NewRow()); + ASSERT_OK(lower_drop->SetInt32(kKeyColumn, -100)); + unique_ptr<KuduPartialRow> upper_drop(schema_.NewRow()); + ASSERT_OK(upper_drop->SetInt32(kKeyColumn, 100)); + table_alterer_drop->DropRangePartition(lower_drop.release(), upper_drop.release()); + ASSERT_OK(table_alterer_drop->Alter()); + } + NO_FATALS(CheckTableRowsNum(kTableName, 400)); + NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 0, 300, 100)); + NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 0, 100)); + NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -200, 200, 200)); + NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -500, 500, 200)); +} + // When working with a cluster that doesn't support range-specific hash schemas // for tables, the client should receive proper error while trying to add // a range with custom hash schema. diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc index 85bfbfb96..3d428999e 100644 --- a/src/kudu/common/partition.cc +++ b/src/kudu/common/partition.cc @@ -622,7 +622,7 @@ Status PartitionSchema::CreatePartitions( } } - UpdatePartitionBoundaries(&new_partitions); + UpdatePartitionBoundaries(hash_encoder, &new_partitions); *partitions = std::move(new_partitions); return Status::OK(); @@ -655,7 +655,7 @@ Status PartitionSchema::CreatePartitions( std::make_move_iterator(current_bound_hash_partitions.end())); } - UpdatePartitionBoundaries(&result_partitions); + UpdatePartitionBoundaries(hash_encoder, &result_partitions); *partitions = std::move(result_partitions); return Status::OK(); @@ -666,11 +666,28 @@ Status PartitionSchema::CreatePartitionsForRange( const HashSchema& range_hash_schema, const Schema& schema, std::vector<Partition>* partitions) const { + RETURN_NOT_OK(CheckRangeSchema(schema)); + RangesWithHashSchemas ranges_with_hash_schemas; RETURN_NOT_OK(EncodeRangeBounds( {range_bound}, {range_hash_schema}, schema, &ranges_with_hash_schemas)); + DCHECK_EQ(1, ranges_with_hash_schemas.size()); - return CreatePartitions(ranges_with_hash_schemas, schema, partitions); + const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32)); + const auto& range = ranges_with_hash_schemas.front(); + vector<Partition> result_partitions = GenerateHashPartitions( + range.hash_schema, hash_encoder); + // Add range information to the partition key. + for (Partition& p : result_partitions) { + DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString(); + p.begin_.mutable_range_key()->assign(range.lower); + DCHECK(p.end().range_key().empty()); + p.end_.mutable_range_key()->assign(range.upper); + UpdatePartitionBoundaries(hash_encoder, range_hash_schema, &p); + } + *partitions = std::move(result_partitions); + + return Status::OK(); } template<typename Row> @@ -1455,7 +1472,20 @@ Status PartitionSchema::CheckRangeSchema(const Schema& schema) const { return Status::OK(); } -void PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) const { +void PartitionSchema::UpdatePartitionBoundaries( + const KeyEncoder<string>& hash_encoder, + vector<Partition>* partitions) const { + for (size_t idx = 0; idx < partitions->size(); ++idx) { + auto& p = (*partitions)[idx]; + UpdatePartitionBoundaries( + hash_encoder, GetHashSchemaForRange(p.begin().range_key()), &p); + } +} + +void PartitionSchema::UpdatePartitionBoundaries( + const KeyEncoder<string>& hash_encoder, + const HashSchema& partition_hash_schema, + Partition* partition) { // Note: the following discussion and logic only takes effect when the table's // partition schema includes at least one hash bucket component, and the // absolute upper and/or absolute lower range bound is unbounded. @@ -1474,43 +1504,38 @@ void PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) c // the absolute start and end case, these holes are filled by clearing the // partition key beginning at the hash component. For a concrete example, // see PartitionTest::TestCreatePartitions. - const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32)); - for (size_t partition_idx = 0; partition_idx < partitions->size(); ++partition_idx) { - auto& p = (*partitions)[partition_idx]; - const auto& hash_schema = GetHashSchemaForRange(p.begin().range_key()); - CHECK_EQ(hash_schema.size(), p.hash_buckets().size()); - const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size()); - // Find the first nonzero-valued bucket from the end and truncate the - // partition key starting from that bucket onwards for zero-valued buckets. - // - // TODO(aserbin): is this really necessary -- zeros in hash key are the - // minimum possible values, and the range part is already - // empty? - if (p.begin().range_key().empty()) { - for (int i = hash_buckets_num - 1; i >= 0; --i) { - if (p.hash_buckets()[i] != 0) { - break; - } - p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i); + DCHECK(partition); + auto& p = *partition; // just a handy shortcut + CHECK_EQ(partition_hash_schema.size(), p.hash_buckets().size()); + const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size()); + // Find the first nonzero-valued bucket from the end and truncate the + // partition key starting from that bucket onwards for zero-valued buckets. + // This is necessary to complement the truncation performed below for the + // hash part of the partition's end key below. + if (p.begin().range_key().empty()) { + for (int i = hash_buckets_num - 1; i >= 0; --i) { + if (p.hash_buckets()[i] != 0) { + break; } + p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i); } - // Starting from the last hash bucket, truncate the partition key until we hit the first - // non-max-valued bucket, at which point, replace the encoding with the next-incremented - // bucket value. For example, the following range end partition keys should be transformed, - // where the key is (hash_comp1, hash_comp2, range_comp): - // - // [ (0, 0, "") -> (0, 1, "") ] - // [ (0, 1, "") -> (1, _, "") ] - // [ (1, 0, "") -> (1, 1, "") ] - // [ (1, 1, "") -> (_, _, "") ] - if (p.end().range_key().empty()) { - for (int i = hash_buckets_num - 1; i >= 0; --i) { - p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i); - const int32_t hash_bucket = p.hash_buckets()[i] + 1; - if (hash_bucket != hash_schema[i].num_buckets) { - hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key()); - break; - } + } + // Starting from the last hash bucket, truncate the partition key until we hit the first + // non-max-valued bucket, at which point, replace the encoding with the next-incremented + // bucket value. For example, the following range end partition keys should be transformed, + // where the key is (hash_comp1, hash_comp2, range_comp): + // + // [ (0, 0, "") -> (0, 1, "") ] + // [ (0, 1, "") -> (1, _, "") ] + // [ (1, 0, "") -> (1, 1, "") ] + // [ (1, 1, "") -> (_, _, "") ] + if (p.end().range_key().empty()) { + for (int i = hash_buckets_num - 1; i >= 0; --i) { + p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i); + const int32_t hash_bucket = p.hash_buckets()[i] + 1; + if (hash_bucket != partition_hash_schema[i].num_buckets) { + hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key()); + break; } } } diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h index 90e67ef6b..9fb886196 100644 --- a/src/kudu/common/partition.h +++ b/src/kudu/common/partition.h @@ -24,6 +24,8 @@ #include <utility> #include <vector> +#include <gtest/gtest_prod.h> + #include "kudu/common/schema.h" #include "kudu/gutil/port.h" #include "kudu/util/slice.h" @@ -634,7 +636,14 @@ class PartitionSchema { // method fills in the address space to have the proper ordering of the // serialized partition keys -- that's important for partition pruning and // overall ordering of the serialized partition keys. - void UpdatePartitionBoundaries(std::vector<Partition>* partitions) const; + void UpdatePartitionBoundaries(const KeyEncoder<std::string>& hash_encoder, + std::vector<Partition>* partitions) const; + // Similar to the above, but update the boundaries for just a single partition + // specified along with its hash schema. + static void UpdatePartitionBoundaries( + const KeyEncoder<std::string>& hash_encoder, + const HashSchema& partition_hash_schema, + Partition* partition); // Validates the split rows, converts them to partition key form, and inserts // them into splits in sorted order.