This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e572e5967 [catalog_manager] KUDU-2671 fix bug in 
CreatePartitionsForRange()
e572e5967 is described below

commit e572e59673f6dade62f96eb38b6417ef018bc7bf
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Tue Jul 26 16:44:42 2022 -0700

    [catalog_manager] KUDU-2671 fix bug in CreatePartitionsForRange()
    
    This patch fixes a bug in PartitionSchema::CreatePartitionsForRange().
    The manifestation of the bug was the inability to add an unbounded range
    with custom hash schema (e.g. [0, +inf)) by AlterTable due to a conflict
    with already existing range (e.g., [-inf, 0)) when if fact there was no
    conflict at all.  The root cause was the assumption that PartitionSchema
    contained information on the range to be added in its internal map
    'hash_schema_idx_by_encoded_range_start_' but that wasn't the case,
    so GetHashSchemaForRange() would return the table-wide hash schema for
    the new range being added instead of proper range-specific hash schema.
    That lead to incorrect updating of range boundaries in
    UpdatePartitionBoundaries(), producing wrong results.
    
    This patch also contains a new test scenario that allowed to reproduce
    the issue: the new scenario is failing without the fix applied.
    
    Change-Id: I33a2bdea2e71bf4b567664c0166e9fbc07c4b882
    Reviewed-on: http://gerrit.cloudera.org:8080/18793
    Tested-by: Kudu Jenkins
    Reviewed-by: Mahesh Reddy <mre...@cloudera.com>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
    Reviewed-by: Attila Bukor <abu...@apache.org>
---
 src/kudu/client/flex_partitioning_client-test.cc |  63 ++++++++++++++
 src/kudu/common/partition.cc                     | 103 ++++++++++++++---------
 src/kudu/common/partition.h                      |  11 ++-
 3 files changed, 137 insertions(+), 40 deletions(-)

diff --git a/src/kudu/client/flex_partitioning_client-test.cc 
b/src/kudu/client/flex_partitioning_client-test.cc
index 3a89aea24..6b42eff1b 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -1775,6 +1775,69 @@ TEST_F(FlexPartitioningAlterTableTest, 
ReadAndWriteToCustomRangePartition) {
   }
 }
 
+TEST_F(FlexPartitioningAlterTableTest, 
ReadAndWriteToUnboundedCustomRangePartition) {
+  constexpr const char* const kTableName =
+      "ReadAndWriteToUnboundedCustomRangePartition";
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+  ASSERT_OK(lower->SetInt32(kKeyColumn, -100));
+  unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+  ASSERT_OK(upper->SetInt32(kKeyColumn, 100));
+  table_creator->table_name(kTableName)
+      .schema(&schema_)
+      .num_replicas(1)
+      .add_hash_partitions({ kKeyColumn }, 2)
+      .set_range_partition_columns({ kKeyColumn })
+      .add_range_partition(lower.release(), upper.release());
+  ASSERT_OK(table_creator->Create());
+
+  unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableName));
+  {
+    auto p = CreateRangePartitionNoLowerBound(-100);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+    table_alterer->AddRangePartition(p.release());
+  }
+  {
+    auto p = CreateRangePartitionNoUpperBound(100);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
+    table_alterer->AddRangePartition(p.release());
+  }
+  ASSERT_OK(table_alterer->Alter());
+  NO_FATALS(CheckTabletCount(kTableName, 9));  // 2 + 3 + 4 = 9
+
+  // Make sure it's possible to insert rows into the table for all the existing
+  // the partitions: first check the range of table-wide schema, then check
+  // the ranges with custom hash schemas.
+  ASSERT_OK(InsertTestRows(kTableName, -100, 100));
+  NO_FATALS(CheckTableRowsNum(kTableName, 200));
+  ASSERT_OK(InsertTestRows(kTableName, -200, -100));
+  NO_FATALS(CheckTableRowsNum(kTableName, 300));
+  ASSERT_OK(InsertTestRows(kTableName, 100, 200));
+  NO_FATALS(CheckTableRowsNum(kTableName, 400));
+  ASSERT_OK(InsertTestRows(kTableName, INT32_MIN, INT32_MIN + 100));
+  NO_FATALS(CheckTableRowsNum(kTableName, 500));
+  ASSERT_OK(InsertTestRows(kTableName, INT32_MAX - 100, INT32_MAX));
+  NO_FATALS(CheckTableRowsNum(kTableName, 600));
+
+  NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 300, 400));
+
+  // Drop a partition in the middle and re-scan with various ranges.
+  {
+    unique_ptr<KuduTableAlterer> 
table_alterer_drop(client_->NewTableAlterer(kTableName));
+    unique_ptr<KuduPartialRow> lower_drop(schema_.NewRow());
+    ASSERT_OK(lower_drop->SetInt32(kKeyColumn, -100));
+    unique_ptr<KuduPartialRow> upper_drop(schema_.NewRow());
+    ASSERT_OK(upper_drop->SetInt32(kKeyColumn, 100));
+    table_alterer_drop->DropRangePartition(lower_drop.release(), 
upper_drop.release());
+    ASSERT_OK(table_alterer_drop->Alter());
+  }
+  NO_FATALS(CheckTableRowsNum(kTableName, 400));
+  NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 0, 300, 100));
+  NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 0, 100));
+  NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -200, 200, 200));
+  NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -500, 500, 200));
+}
+
 // When working with a cluster that doesn't support range-specific hash schemas
 // for tables, the client should receive proper error while trying to add
 // a range with custom hash schema.
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 85bfbfb96..3d428999e 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -622,7 +622,7 @@ Status PartitionSchema::CreatePartitions(
     }
   }
 
-  UpdatePartitionBoundaries(&new_partitions);
+  UpdatePartitionBoundaries(hash_encoder, &new_partitions);
   *partitions = std::move(new_partitions);
 
   return Status::OK();
@@ -655,7 +655,7 @@ Status PartitionSchema::CreatePartitions(
                              
std::make_move_iterator(current_bound_hash_partitions.end()));
   }
 
-  UpdatePartitionBoundaries(&result_partitions);
+  UpdatePartitionBoundaries(hash_encoder, &result_partitions);
   *partitions = std::move(result_partitions);
 
   return Status::OK();
@@ -666,11 +666,28 @@ Status PartitionSchema::CreatePartitionsForRange(
     const HashSchema& range_hash_schema,
     const Schema& schema,
     std::vector<Partition>* partitions) const {
+  RETURN_NOT_OK(CheckRangeSchema(schema));
+
   RangesWithHashSchemas ranges_with_hash_schemas;
   RETURN_NOT_OK(EncodeRangeBounds(
       {range_bound}, {range_hash_schema}, schema, &ranges_with_hash_schemas));
+  DCHECK_EQ(1, ranges_with_hash_schemas.size());
 
-  return CreatePartitions(ranges_with_hash_schemas, schema, partitions);
+  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  const auto& range = ranges_with_hash_schemas.front();
+  vector<Partition> result_partitions = GenerateHashPartitions(
+      range.hash_schema, hash_encoder);
+  // Add range information to the partition key.
+  for (Partition& p : result_partitions) {
+    DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString();
+    p.begin_.mutable_range_key()->assign(range.lower);
+    DCHECK(p.end().range_key().empty());
+    p.end_.mutable_range_key()->assign(range.upper);
+    UpdatePartitionBoundaries(hash_encoder, range_hash_schema, &p);
+  }
+  *partitions = std::move(result_partitions);
+
+  return Status::OK();
 }
 
 template<typename Row>
@@ -1455,7 +1472,20 @@ Status PartitionSchema::CheckRangeSchema(const Schema& 
schema) const {
   return Status::OK();
 }
 
-void PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) 
const {
+void PartitionSchema::UpdatePartitionBoundaries(
+    const KeyEncoder<string>& hash_encoder,
+    vector<Partition>* partitions) const {
+  for (size_t idx = 0; idx < partitions->size(); ++idx) {
+    auto& p = (*partitions)[idx];
+    UpdatePartitionBoundaries(
+        hash_encoder, GetHashSchemaForRange(p.begin().range_key()), &p);
+  }
+}
+
+void PartitionSchema::UpdatePartitionBoundaries(
+    const KeyEncoder<string>& hash_encoder,
+    const HashSchema& partition_hash_schema,
+    Partition* partition) {
   // Note: the following discussion and logic only takes effect when the 
table's
   // partition schema includes at least one hash bucket component, and the
   // absolute upper and/or absolute lower range bound is unbounded.
@@ -1474,43 +1504,38 @@ void 
PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) c
   // the absolute start and end case, these holes are filled by clearing the
   // partition key beginning at the hash component. For a concrete example,
   // see PartitionTest::TestCreatePartitions.
-  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
-  for (size_t partition_idx = 0; partition_idx < partitions->size(); 
++partition_idx) {
-    auto& p = (*partitions)[partition_idx];
-    const auto& hash_schema = GetHashSchemaForRange(p.begin().range_key());
-    CHECK_EQ(hash_schema.size(), p.hash_buckets().size());
-    const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size());
-    // Find the first nonzero-valued bucket from the end and truncate the
-    // partition key starting from that bucket onwards for zero-valued buckets.
-    //
-    // TODO(aserbin): is this really necessary -- zeros in hash key are the
-    //                minimum possible values, and the range part is already
-    //                empty?
-    if (p.begin().range_key().empty()) {
-      for (int i = hash_buckets_num - 1; i >= 0; --i) {
-        if (p.hash_buckets()[i] != 0) {
-          break;
-        }
-        p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i);
+  DCHECK(partition);
+  auto& p = *partition; // just a handy shortcut
+  CHECK_EQ(partition_hash_schema.size(), p.hash_buckets().size());
+  const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size());
+  // Find the first nonzero-valued bucket from the end and truncate the
+  // partition key starting from that bucket onwards for zero-valued buckets.
+  // This is necessary to complement the truncation performed below for the
+  // hash part of the partition's end key below.
+  if (p.begin().range_key().empty()) {
+    for (int i = hash_buckets_num - 1; i >= 0; --i) {
+      if (p.hash_buckets()[i] != 0) {
+        break;
       }
+      p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i);
     }
-    // Starting from the last hash bucket, truncate the partition key until we 
hit the first
-    // non-max-valued bucket, at which point, replace the encoding with the 
next-incremented
-    // bucket value. For example, the following range end partition keys 
should be transformed,
-    // where the key is (hash_comp1, hash_comp2, range_comp):
-    //
-    // [ (0, 0, "") -> (0, 1, "") ]
-    // [ (0, 1, "") -> (1, _, "") ]
-    // [ (1, 0, "") -> (1, 1, "") ]
-    // [ (1, 1, "") -> (_, _, "") ]
-    if (p.end().range_key().empty()) {
-      for (int i = hash_buckets_num - 1; i >= 0; --i) {
-        p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i);
-        const int32_t hash_bucket = p.hash_buckets()[i] + 1;
-        if (hash_bucket != hash_schema[i].num_buckets) {
-          hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key());
-          break;
-        }
+  }
+  // Starting from the last hash bucket, truncate the partition key until we 
hit the first
+  // non-max-valued bucket, at which point, replace the encoding with the 
next-incremented
+  // bucket value. For example, the following range end partition keys should 
be transformed,
+  // where the key is (hash_comp1, hash_comp2, range_comp):
+  //
+  // [ (0, 0, "") -> (0, 1, "") ]
+  // [ (0, 1, "") -> (1, _, "") ]
+  // [ (1, 0, "") -> (1, 1, "") ]
+  // [ (1, 1, "") -> (_, _, "") ]
+  if (p.end().range_key().empty()) {
+    for (int i = hash_buckets_num - 1; i >= 0; --i) {
+      p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i);
+      const int32_t hash_bucket = p.hash_buckets()[i] + 1;
+      if (hash_bucket != partition_hash_schema[i].num_buckets) {
+        hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key());
+        break;
       }
     }
   }
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 90e67ef6b..9fb886196 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -24,6 +24,8 @@
 #include <utility>
 #include <vector>
 
+#include <gtest/gtest_prod.h>
+
 #include "kudu/common/schema.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/slice.h"
@@ -634,7 +636,14 @@ class PartitionSchema {
   // method fills in the address space to have the proper ordering of the
   // serialized partition keys -- that's important for partition pruning and
   // overall ordering of the serialized partition keys.
-  void UpdatePartitionBoundaries(std::vector<Partition>* partitions) const;
+  void UpdatePartitionBoundaries(const KeyEncoder<std::string>& hash_encoder,
+                                 std::vector<Partition>* partitions) const;
+  // Similar to the above, but update the boundaries for just a single 
partition
+  // specified along with its hash schema.
+  static void UpdatePartitionBoundaries(
+      const KeyEncoder<std::string>& hash_encoder,
+      const HashSchema& partition_hash_schema,
+      Partition* partition);
 
   // Validates the split rows, converts them to partition key form, and inserts
   // them into splits in sorted order.

Reply via email to