This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b746978c7 KUDU-2671: Follow up pruning patch.
b746978c7 is described below

commit b746978c71ce4a95b69d49c43d0ac852909a8b4e
Author: Mahesh Reddy <mre...@cloudera.com>
AuthorDate: Fri Apr 1 01:37:52 2022 -0700

    KUDU-2671: Follow up pruning patch.
    
    This patch flattens the result set of the pruner into a one
    dimensional container. The new container only contains the
    partition key ranges and no longer stores the range bounds.
    
    Currently, full scans using KuduScanner with no predicates are
    functional. Scans with range predicates are also functional on
    tables with covering ranges as well as on tables with non
    covering ranges.
    
    There are a few commented out test cases within
    flex_partitioning_client-test. These test cases involve a scan
    with range predicates that are both out of bounds. They fail
    because the non covering range case is triggered in
    scanner_internal and we return early from this function before
    the proxy_ is set. Check(proxy_) is where the tests fails in
    KuduScanner::NextBatch within client.cc.
    
    Using KuduScanTokens to scan tables with range specific hash schemas
    is not yet compatible. A follow up patch should address this deficiency.
    
    The scan token tests with custom hash schemas are failing when
    verifying the tablet info. It seems that the data_ field of the
    KuduTablets isn't set.
    
    Change-Id: I3a1bf5344c0ef856072d3ed102714dce5ba21060
    Reviewed-on: http://gerrit.cloudera.org:8080/17879
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/client/client.h                         |   5 +
 src/kudu/client/flex_partitioning_client-test.cc | 356 +++++++++++++++++++++--
 src/kudu/client/scan_token-internal.cc           |   1 -
 src/kudu/client/scan_token-test.cc               | 203 ++++++++++++-
 src/kudu/client/scanner-internal.cc              |   2 +-
 src/kudu/common/partition_pruner-test.cc         |   1 -
 src/kudu/common/partition_pruner.cc              | 120 ++++----
 src/kudu/common/partition_pruner.h               |  16 +-
 8 files changed, 582 insertions(+), 122 deletions(-)

diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 296a0daf7..418dfe261 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -3078,6 +3078,7 @@ class KUDU_EXPORT KuduScanner {
   Status NextBatch(internal::ScanBatchDataInterface* batch);
 
   friend class KuduScanToken;
+  friend class FlexPartitioningTest;
   FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
   FRIEND_TEST(ClientTest, TestScanCloseProxy);
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
@@ -3113,6 +3114,10 @@ class KUDU_EXPORT KuduScanner {
 ///
 /// Scan token locality information can be inspected using the
 /// KuduScanToken::tablet() function.
+///
+/// Scan tokens are not yet compatible for tables that contain range-specific
+/// hash schemas. To be clear, the existing use case of tables with all ranges
+/// using the table wide hash schema is functional as expected.
 class KUDU_EXPORT KuduScanToken {
  public:
 
diff --git a/src/kudu/client/flex_partitioning_client-test.cc 
b/src/kudu/client/flex_partitioning_client-test.cc
index 97c4a5375..04013900c 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -17,7 +17,6 @@
 
 #include <cstdint>
 #include <cstdlib>
-#include <ios>
 #include <iostream>
 #include <memory>
 #include <string>
@@ -43,10 +42,13 @@
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/tablet.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -54,13 +56,13 @@
 DECLARE_bool(enable_per_range_hash_schemas);
 DECLARE_int32(heartbeat_interval_ms);
 
+METRIC_DECLARE_counter(scans_started);
+
 using kudu::client::sp::shared_ptr;
 using kudu::client::KuduValue;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
 using kudu::master::CatalogManager;
-using kudu::master::TabletInfo;
-using kudu::tablet::TabletReplica;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -234,12 +236,38 @@ class FlexPartitioningTest : public KuduTest {
     ASSERT_EQ(expected_count, count);
   }
 
-  void CheckLiveRowCount(const char* table_name,
-                         int64_t expected_count) {
+  void CheckScanWithColumnPredicate(const string& table_name, Slice col_name,
+                                    int expected_count, int tablets, int 
lower, int upper) {
     shared_ptr<KuduTable> table;
     ASSERT_OK(client_->OpenTable(table_name, &table));
 
-    vector<scoped_refptr<TabletInfo>> all_tablets_info;
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetTimeoutMillis(60000));
+    if (lower != INT8_MIN) {
+      ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+          col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
+    }
+    if (upper != INT8_MAX) {
+      ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+          col_name, KuduPredicate::LESS, KuduValue::FromInt(upper))));
+    }
+    ASSERT_OK(scanner.Open());
+
+    KuduScanBatch batch;
+    int64_t count = 0;
+    while (scanner.HasMoreRows()) {
+      ASSERT_OK(scanner.NextBatch(&batch));
+      count += batch.NumRows();
+    }
+    ASSERT_EQ(count, expected_count);
+
+    NO_FATALS(CheckTabletsScanned(table_name, tablets));
+  }
+
+  void CheckTabletsScanned(const string& table_name, int tablets) {
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(table_name, &table));
+    vector<scoped_refptr<master::TabletInfo>> all_tablets_info;
     {
       auto* cm = cluster_->mini_master(0)->master()->catalog_manager();
       CatalogManager::ScopedLeaderSharedLock l(cm);
@@ -247,21 +275,20 @@ class FlexPartitioningTest : public KuduTest {
       ASSERT_OK(cm->GetTableInfo(table->id(), &table_info));
       table_info->GetAllTablets(&all_tablets_info);
     }
-    vector<scoped_refptr<TabletReplica>> replicas;
+
+    int64_t tablets_scanned = 0;
     for (const auto& tablet_info : all_tablets_info) {
       for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
-        scoped_refptr<TabletReplica> r;
+        scoped_refptr<tablet::TabletReplica> replica;
         ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->
-                        tablet_manager()->LookupTablet(tablet_info->id(), &r));
-        replicas.emplace_back(std::move(r));
+        tablet_manager()->LookupTablet(tablet_info->id(), &replica));
+        ASSERT_TRUE(replica->tablet()->GetMetricEntity());
+        auto scans_started = 
METRIC_scans_started.Instantiate(replica->tablet()->GetMetricEntity());
+        tablets_scanned += scans_started->value();
+        scans_started->Reset();
       }
     }
-
-    int64_t count = 0;
-    for (const auto& r : replicas) {
-      count += r->CountLiveRowsNoFail();
-    }
-    ASSERT_EQ(expected_count, count);
+    ASSERT_EQ(tablets, tablets_scanned);
   }
 
   void CheckTableRowsNum(const char* table_name, const char* col_name,
@@ -354,10 +381,7 @@ TEST_F(FlexPartitioningCreateTableTest, 
EmptyTableWideHashSchema) {
   // There should be 2 tablets total: one per each range created.
   NO_FATALS(CheckTabletCount(kTableName, 2));
   ASSERT_OK(InsertTestRows(kTableName, -111, 222, KuduSession::MANUAL_FLUSH));
-  NO_FATALS(CheckLiveRowCount(kTableName, 333));
-  // TODO(aserbin): uncomment the line below once PartitionPruner handles such
-  //                cases properly
-  //NO_FATALS(CheckTableRowsNum(kTableName, 333));
+  NO_FATALS(CheckTableRowsNum(kTableName, 333));
 }
 
 // Create tables with range partitions using custom hash bucket schemas only.
@@ -420,9 +444,8 @@ TEST_F(FlexPartitioningCreateTableTest, 
DISABLED_SingleCustomRangeEmptyHashSchem
   // the partitions: first check the range of table-wide schema, then check
   // the ranges with custom hash schemas.
   ASSERT_OK(InsertTestRows(kTableName, -111, 0));
-  NO_FATALS(CheckLiveRowCount(kTableName, 111));
+  //NO_FATALS(CheckTableRowsNum(kTableName, 111));
   ASSERT_OK(InsertTestRows(kTableName, 111, 222));
-  NO_FATALS(CheckLiveRowCount(kTableName, 222));
   // TODO(aserbin): uncomment the line below once PartitionPruner handles such
   //                cases properly
   //NO_FATALS(CheckTableRowsNum(kTableName, 222));
@@ -493,9 +516,9 @@ TEST_F(FlexPartitioningCreateTableTest, 
DefaultAndCustomHashSchemas) {
   // the partitions: first check the range of table-wide schema, then check
   // the ranges with custom hash schemas.
   ASSERT_OK(InsertTestRows(kTableName, -111, 0));
-  NO_FATALS(CheckLiveRowCount(kTableName, 111));
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
   ASSERT_OK(InsertTestRows(kTableName, 111, 444));
-  NO_FATALS(CheckLiveRowCount(kTableName, 444));
+  NO_FATALS(CheckTableRowsNum(kTableName, 444));
 
   // Meanwhile, inserting into non-covered ranges should result in a proper
   // error status return to the client attempting such an operation.
@@ -513,7 +536,7 @@ TEST_F(FlexPartitioningCreateTableTest, 
DefaultAndCustomHashSchemas) {
                         "No tablet covering the requested range partition");
   }
   // Try same as in the scope above, but do so for multiple rows to make sure
-  // custom hash bucketing isn't inducing any unexpected side-effects.
+  // custom hash bucketing isn't inducing any unexpected side effects.
   {
     constexpr int kNumRows = 10;
     vector<KuduError*> errors;
@@ -884,7 +907,7 @@ TEST_F(FlexPartitioningCreateTableTest, 
DISABLED_NoUpperBoundRangeCustomHashSche
   }
 
   // Add a range partition with custom hash sub-partitioning rules:
-  // 3 buckets with hash based on the "key" column with hash seed 1.
+  // 2 buckets with hash based on the "key" column with hash seed 1.
   {
     auto p = CreateRangePartition(111, 222);
     ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
@@ -901,9 +924,282 @@ TEST_F(FlexPartitioningCreateTableTest, 
DISABLED_NoUpperBoundRangeCustomHashSche
   NO_FATALS(CheckTabletCount(kTableName, 6));
 
   // Make sure it's possible to insert rows into the table for all the existing
-  // paritions.
+  // partitions.
   ASSERT_OK(InsertTestRows(kTableName, 0, 555));
-  NO_FATALS(CheckLiveRowCount(kTableName, 555));
+  NO_FATALS(CheckTableRowsNum(kTableName, 555));
+}
+
+TEST_F(FlexPartitioningCreateTableTest, ScansWithRangePredicates) {
+
+  constexpr const char* const kTableName = "ScansWithRangePredicates";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+    .schema(&schema_)
+    .num_replicas(1)
+    .add_hash_partitions({ kKeyColumn }, 2)
+    .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 4 buckets with hash based on the "key" column with hash seed 1.
+  {
+    auto p = CreateRangePartition(111, 222);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 1));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 3 buckets with hash based on the "key" column with hash seed 1.
+  {
+    auto p = CreateRangePartition(222, 333);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  ASSERT_OK(table_creator->Create());
+  NO_FATALS(CheckTabletCount(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 0, 111));
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 111, 222));
+  NO_FATALS(CheckTableRowsNum(kTableName, 222));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 222, 333));
+  NO_FATALS(CheckTableRowsNum(kTableName, 333));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  // Filler values for lower or upper bound when it's not meant to be set.
+  constexpr int lower_filler = INT8_MIN;
+  constexpr int upper_filler = INT8_MAX;
+
+  CheckScanWithColumnPredicate(kTableName, "key", 150, 6,  0, 150);
+  CheckScanWithColumnPredicate(kTableName, "key", 183, 7, 150, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 75, 2, 0, 75);
+  CheckScanWithColumnPredicate(kTableName, "key", 175, 9, 75, 250);
+  CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 0, 1);
+  CheckScanWithColumnPredicate(kTableName, "key", 2, 6, 110, 112);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, -10, -5);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 350, 400);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 332, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, 334);
+  CheckScanWithColumnPredicate(kTableName, "key", 100, 2, lower_filler, 100);
+  CheckScanWithColumnPredicate(kTableName, "key", 250, 9, lower_filler, 250);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 334);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -10, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 233, 9, 100, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 133, 7, 200, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, upper_filler);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, upper_filler);
+
+  // Meanwhile, inserting into non-covered ranges should result in a proper
+  // error status return to the client attempting such an operation.
+  {
+    constexpr int kNumRows = 10;
+    vector<KuduError*> errors;
+    ElementDeleter drop(&errors);
+    auto s = InsertTestRows(
+        kTableName, 334, 334 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+    ASSERT_EQ(kNumRows, errors.size());
+    for (const auto& e : errors) {
+      const auto& err = e->status();
+      EXPECT_TRUE(err.IsNotFound()) << err.ToString();
+      ASSERT_STR_CONTAINS(err.ToString(),
+                          "No tablet covering the requested range partition");
+    }
+  }
+}
+
+TEST_F(FlexPartitioningCreateTableTest, 
ScansWithRangePredicatesWithSameHashBuckets) {
+
+  constexpr const char* const kTableName = 
"ScansWithRangePredicatesWithSameHashBuckets";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+  .schema(&schema_)
+  .num_replicas(1)
+  .add_hash_partitions({ kKeyColumn }, 3)
+  .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 222));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 333));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+
+  ASSERT_OK(table_creator->Create());
+  NO_FATALS(CheckTabletCount(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 0, 111));
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 111, 222));
+  NO_FATALS(CheckTableRowsNum(kTableName, 222));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 222, 333));
+  NO_FATALS(CheckTableRowsNum(kTableName, 333));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  // Filler values for lower or upper bound when it's not meant to be set.
+  constexpr int lower_filler = INT8_MIN;
+  constexpr int upper_filler = INT8_MAX;
+
+  // The test cases commented out below fail. Initially, it was thought to be 
a regression but these
+  // failures are not reproducible in a real cluster.
+  CheckScanWithColumnPredicate(kTableName, "key", 150, 6, 0, 150);
+  CheckScanWithColumnPredicate(kTableName, "key", 183, 6, 150, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 75, 3, 0, 75);
+  CheckScanWithColumnPredicate(kTableName, "key", 175, 9, 75, 250);
+  CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 0, 1);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 110, 111);
+  CheckScanWithColumnPredicate(kTableName, "key", 3, 6, 110, 113);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 332, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, 334);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, -10, -5);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 350, 400);
+  CheckScanWithColumnPredicate(kTableName, "key", 100, 3, lower_filler, 100);
+  CheckScanWithColumnPredicate(kTableName, "key", 250, 9, lower_filler, 250);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 334);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -10, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 233, 9, 100, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 133, 6, 200, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 83, 3, 250, upper_filler);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 333, upper_filler);
+
+  // Meanwhile, inserting into non-covered ranges should result in a proper
+  // error status return to the client attempting such an operation.
+  {
+    constexpr int kNumRows = 10;
+    vector<KuduError*> errors;
+    ElementDeleter drop(&errors);
+    auto s = InsertTestRows(
+        kTableName, 334, 334 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+    ASSERT_EQ(kNumRows, errors.size());
+    for (const auto& e : errors) {
+      const auto& err = e->status();
+      EXPECT_TRUE(err.IsNotFound()) << err.ToString();
+      ASSERT_STR_CONTAINS(err.ToString(),
+                          "No tablet covering the requested range partition");
+    }
+  }
+}
+
+TEST_F(FlexPartitioningCreateTableTest, ScansWithNonCoveringRanges) {
+
+  constexpr const char* const kTableName = "ScansWithNonCoveringRanges";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+  .schema(&schema_)
+  .num_replicas(1)
+  .add_hash_partitions({ kKeyColumn }, 2)
+  .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 4 buckets with hash based on the "key" column with hash seed 1.
+  {
+    auto p = CreateRangePartition(0, 111);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 1));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 222));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 333));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 3 buckets with hash based on the "key" column with hash seed 1.
+  {
+    auto p = CreateRangePartition(444, 555);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  ASSERT_OK(table_creator->Create());
+  NO_FATALS(CheckTabletCount(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 0, 111));
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 222, 333));
+  NO_FATALS(CheckTableRowsNum(kTableName, 222));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  ASSERT_OK(InsertTestRows(kTableName, 444, 555));
+  NO_FATALS(CheckTableRowsNum(kTableName, 333));
+  NO_FATALS(CheckTabletsScanned(kTableName, 9));
+
+  // Filler values for lower or upper bound when it's not meant to be set.
+  constexpr int lower_filler = INT8_MIN;
+  constexpr int upper_filler = INT8_MAX;
+
+  CheckScanWithColumnPredicate(kTableName, "key", 111, 4, -10, 111);
+  CheckScanWithColumnPredicate(kTableName, "key", 111, 4, -10, 150);
+  CheckScanWithColumnPredicate(kTableName, "key", 111, 2, 150, 333);
+  CheckScanWithColumnPredicate(kTableName, "key", 33, 6, 100, 244);
+  CheckScanWithColumnPredicate(kTableName, "key", 111, 2, 144, 344);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -50, 600);
+  CheckScanWithColumnPredicate(kTableName, "key", 222, 5, 150, 600);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 0, 1);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, -10, -5);
+  //CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 600, 650);
+  CheckScanWithColumnPredicate(kTableName, "key", 1, 1, 554, 555);
+  CheckScanWithColumnPredicate(kTableName, "key", 0, 0, 555, 556);
+  CheckScanWithColumnPredicate(kTableName, "key", 111, 4, lower_filler, 150);
+  CheckScanWithColumnPredicate(kTableName, "key", 222, 6, lower_filler, 350);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, lower_filler, 560);
+  CheckScanWithColumnPredicate(kTableName, "key", 333, 9, -10, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 222, 5, 150, upper_filler);
+  CheckScanWithColumnPredicate(kTableName, "key", 111, 3, 350, upper_filler);
 }
 
 // Negative tests scenarios to cover non-OK status codes for various operations
@@ -1238,9 +1534,9 @@ TEST_F(FlexPartitioningAlterTableTest, 
ReadAndWriteToCustomRangePartition) {
   // the partitions: first check the range of table-wide schema, then check
   // the ranges with custom hash schemas.
   ASSERT_OK(InsertTestRows(kTableName, 0, 111));
-  NO_FATALS(CheckLiveRowCount(kTableName, 111));
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
   ASSERT_OK(InsertTestRows(kTableName, 111, 444));
-  NO_FATALS(CheckLiveRowCount(kTableName, 444));
+  NO_FATALS(CheckTableRowsNum(kTableName, 444));
 
   // WIP: Scan the data present after rebasing on pruning patches
   /*
diff --git a/src/kudu/client/scan_token-internal.cc 
b/src/kudu/client/scan_token-internal.cc
index eec4bf91c..434ce5803 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -462,7 +462,6 @@ Status 
KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
       pruner.RemovePartitionKeyRange(tablet->partition().end());
       continue;
     }
-
     vector<internal::RemoteReplica> replicas;
     tablet->GetRemoteReplicas(&replicas);
 
diff --git a/src/kudu/client/scan_token-test.cc 
b/src/kudu/client/scan_token-test.cc
index 77c00b99b..11e9697ee 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -46,11 +46,14 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -59,6 +62,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(enable_per_range_hash_schemas);
 DECLARE_bool(tserver_enforce_access_control);
 
 
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
@@ -68,6 +72,9 @@ using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
+using kudu::master::CatalogManager;
+using kudu::master::TabletInfo;
+using kudu::tablet::TabletReplica;
 using kudu::tserver::MiniTabletServer;
 using std::atomic;
 using std::string;
@@ -129,15 +136,14 @@ class ScanTokenTest : public KuduTest {
 
   // Similar to CountRows() above, but use the specified client handle
   // and run all the scanners sequentially, one by one.
-  Status CountRowsSeq(KuduClient* client,
-                      vector<KuduScanToken*> tokens,
+  static Status CountRowsSeq(KuduClient* client,
+                      const vector<KuduScanToken*>& tokens,
                       int64_t* row_count) {
     int64_t count = 0;
     for (auto* t : tokens) {
       unique_ptr<KuduScanToken> token(t);
       unique_ptr<KuduScanner> scanner;
       RETURN_NOT_OK(IntoUniqueScanner(client, *token, &scanner));
-
       RETURN_NOT_OK(scanner->Open());
       while (scanner->HasMoreRows()) {
         KuduScanBatch batch;
@@ -185,7 +191,7 @@ class ScanTokenTest : public KuduTest {
     return Status::OK();
   }
 
-  // Create a table with the specified name and schema with replicaction factor
+  // Create a table with the specified name and schema with replication factor
   // of one and empty list of range partitions.
   Status CreateAndOpenTable(const string& table_name,
                             const KuduSchema& schema,
@@ -211,6 +217,37 @@ class ScanTokenTest : public KuduTest {
         .Instantiate(ent)->TotalCount();
   }
 
+  Status CheckLiveRowCount(const char* table_name,
+                           uint64_t* expected_count) {
+    shared_ptr<KuduTable> table;
+    RETURN_NOT_OK(client_->OpenTable(table_name, &table));
+
+    vector<scoped_refptr<TabletInfo>> all_tablets_info;
+    {
+      auto* cm = cluster_->mini_master(0)->master()->catalog_manager();
+      CatalogManager::ScopedLeaderSharedLock l(cm);
+      scoped_refptr<master::TableInfo> table_info;
+      RETURN_NOT_OK(cm->GetTableInfo(table->id(), &table_info));
+      table_info->GetAllTablets(&all_tablets_info);
+    }
+    vector<scoped_refptr<TabletReplica>> replicas;
+    for (const auto& tablet_info : all_tablets_info) {
+      for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+        scoped_refptr<TabletReplica> r;
+        EXPECT_TRUE(cluster_->mini_tablet_server(i)->server()->
+        tablet_manager()->LookupTablet(tablet_info->id(), &r));
+        replicas.emplace_back(std::move(r));
+      }
+    }
+
+    uint64_t count = 0;
+    for (const auto& r : replicas) {
+      count += r->CountLiveRowsNoFail();
+    }
+    *expected_count = count;
+    return Status::OK();
+  }
+
   shared_ptr<KuduClient> client_;
   unique_ptr<InternalMiniCluster> cluster_;
 };
@@ -546,6 +583,156 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) 
{
   }
 }
 
+// TODO(mreddy) : Enable test once there is support for scan tokens with
+// custom hash schema per range.
+TEST_F(ScanTokenTest, DISABLED_TestScanTokensWithCustomHashSchemasPerRange) {
+  FLAGS_enable_per_range_hash_schemas = true;
+  KuduSchema schema;
+  // Create schema
+  {
+    KuduSchemaBuilder builder;
+    
builder.AddColumn("col")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+    ASSERT_OK(builder.Build(&schema));
+  }
+
+  // Create table
+  shared_ptr<KuduTable> table;
+  {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    table_creator->table_name("table");
+    table_creator->num_replicas(1);
+    table_creator->schema(&schema);
+
+    table_creator->add_hash_partitions({ "col" }, 2);
+
+    {
+      ASSERT_OK(lower_bound->SetInt64("col", 0));
+      ASSERT_OK(upper_bound->SetInt64("col", 100));
+      unique_ptr<KuduTableCreator::KuduRangePartition> range_partition
+        (new KuduTableCreator::KuduRangePartition(lower_bound.release(), 
upper_bound.release()));
+      range_partition->add_hash_partitions({ "col" }, 4);
+      table_creator->add_custom_range_partition(range_partition.release());
+    }
+
+    {
+      lower_bound.reset(schema.NewRow());
+      upper_bound.reset(schema.NewRow());
+      ASSERT_OK(lower_bound->SetInt64("col", 100));
+      ASSERT_OK(upper_bound->SetInt64("col", 200));
+      unique_ptr<KuduTableCreator::KuduRangePartition> range_partition
+        (new KuduTableCreator::KuduRangePartition(lower_bound.release(), 
upper_bound.release()));
+      range_partition->add_hash_partitions({ "col"}, 2);
+      table_creator->add_custom_range_partition(range_partition.release());
+    }
+
+    ASSERT_OK(table_creator->Create());
+    ASSERT_OK(client_->OpenTable("table", &table));
+  }
+
+  // Create session
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+  // Insert rows
+  for (int i = 0; i < 200; i++) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
+    ASSERT_OK(session->Apply(insert.release()));
+  }
+  ASSERT_OK(session->Flush());
+
+  uint64_t expected_count = 0;
+  CheckLiveRowCount("table", &expected_count);
+  ASSERT_EQ(expected_count, 200);
+
+  { // no predicates
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+
+    ASSERT_EQ(6, tokens.size());
+    ASSERT_EQ(200, CountRows(tokens));
+    shared_ptr<KuduClient> new_client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+    int64_t row_count = 0;
+    ASSERT_OK(CountRowsSeq(new_client.get(), tokens, &row_count));
+    ASSERT_EQ(200, row_count);
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
+  { // range predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate(
+        "col", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(100)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(2, tokens.size());
+    ASSERT_EQ(100, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
+  { // equality predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate(
+        "col", KuduPredicate::EQUAL, KuduValue::FromInt(42)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(1, tokens.size());
+    ASSERT_EQ(1, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
+  { // IS NOT NULL predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewIsNotNullPredicate("col"));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(6, tokens.size());
+    ASSERT_EQ(200, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
+  { // IS NULL predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewIsNullPredicate("col"));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_GE(0, tokens.size());
+    ASSERT_EQ(0, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
+  { // primary key bound
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(upper_bound->SetInt64("col", 40));
+
+    ASSERT_OK(builder.AddUpperBound(*upper_bound));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(4, tokens.size());
+    ASSERT_EQ(40, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+}
+
 class TimestampPropagationParamTest :
     public ScanTokenTest,
     public ::testing::WithParamInterface<kudu::ReadMode> {
@@ -870,7 +1057,7 @@ class StaleScanTokensParamTest :
 
 // Create scan tokens for one state of the table and store it for future use.
 // Use the tokens to scan the table. Alter the table dropping first range
-// partition, optionally replacing it according with FirstRangeChangeMode
+// partition, optionally replacing it according to FirstRangeChangeMode
 // enum. Open the altered table via the client handle which was used to run
 // the token-based scan prior. Now, attempt to scan the table using stale
 // tokens generated with the original state of the table.
@@ -962,7 +1149,7 @@ TEST_P(StaleScanTokensParamTest, DroppingFirstRange) {
   ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
 
   int64_t row_count_a = 0;
-  ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_a), &row_count_a));
+  ASSERT_OK(CountRowsSeq(new_client.get(), tokens_a, &row_count_a));
   ASSERT_EQ(50, row_count_a);
 
   // Open the test table via 'new_client' handle to populate the metadata
@@ -1030,7 +1217,7 @@ TEST_P(StaleScanTokensParamTest, DroppingFirstRange) {
         ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
         ASSERT_OK(session->Apply(insert.release()));
       }
-      // The rows in the preceeding range should not be read if using the
+      // The rows in the preceding range should not be read if using the
       // token for the [-100, 0) original range.
       [[fallthrough]];
     case RANGE_DROPPED_AND_SMALLER_ONE_ADDED:
@@ -1053,7 +1240,7 @@ TEST_P(StaleScanTokensParamTest, DroppingFirstRange) {
   // within the range of the new partitions which correspond to the originally
   // supplied range.
   int64_t row_count_b = -1;
-  ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_b), &row_count_b));
+  ASSERT_OK(CountRowsSeq(new_client.get(), tokens_b, &row_count_b));
   ASSERT_EQ(expected_row_count, row_count_b);
 }
 
diff --git a/src/kudu/client/scanner-internal.cc 
b/src/kudu/client/scanner-internal.cc
index 1acb1cf3f..18a436fba 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -540,7 +540,7 @@ Status KuduScanner::Data::OpenTablet(const PartitionKey& 
partition_key,
     // it's likely that the tablet is undergoing a leader election and will
     // soon have one.
     if (lookup_status.IsServiceUnavailable() && MonoTime::Now() < deadline) {
-      // ServiceUnavailable means that we have already blacklisted all of the 
candidate
+      // ServiceUnavailable means that we have already blacklisted all the 
candidate
       // tablet servers. So, we clear the list so that we will cycle through 
them all
       // another time.
       blacklist->clear();
diff --git a/src/kudu/common/partition_pruner-test.cc 
b/src/kudu/common/partition_pruner-test.cc
index 40c0ed398..af3326669 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -1161,7 +1161,6 @@ TEST_F(PartitionPrunerTest, 
DISABLED_TestHashSchemasPerRangePruning) {
 
   // No Bounds
   NO_FATALS(check({}, {}, {}, 12, 12));
-
   // A = 1
   NO_FATALS(check({ ColumnPredicate::Equality(schema.column(0), &one) },
             {}, {}, 7, 7));
diff --git a/src/kudu/common/partition_pruner.cc 
b/src/kudu/common/partition_pruner.cc
index 567e91bc4..3b228926b 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -310,18 +310,18 @@ vector<PartitionPruner::PartitionKeyRange> 
PartitionPruner::ConstructPartitionKe
   }
 
   // Remove all partition key ranges past the scan spec's upper bound 
partition key.
-  if (!scan_spec.exclusive_upper_bound_partition_key().empty()) {
+  const auto& upper_bound_partition_key = 
scan_spec.exclusive_upper_bound_partition_key();
+  if (!upper_bound_partition_key.empty()) {
     for (auto range = partition_key_ranges.rbegin();
          range != partition_key_ranges.rend();
          ++range) {
-      if (!(*range).end.empty() &&
-          scan_spec.exclusive_upper_bound_partition_key() >= (*range).end) {
+      if (!(*range).end.empty() && upper_bound_partition_key >= (*range).end) {
         break;
       }
-      if (scan_spec.exclusive_upper_bound_partition_key() <= (*range).start) {
+      if (upper_bound_partition_key <= (*range).start) {
         partition_key_ranges.pop_back();
       } else {
-        (*range).end = scan_spec.exclusive_upper_bound_partition_key();
+        (*range).end = upper_bound_partition_key;
       }
     }
   }
@@ -538,21 +538,15 @@ void PartitionPruner::Init(const Schema& schema,
     }
   }
 
-  // Store ranges and their corresponding hash schemas if they fall within
-  // the range bounds specified by the scan.
   if (partition_schema.ranges_with_custom_hash_schemas().empty()) {
     auto partition_key_ranges = ConstructPartitionKeyRanges(
         schema, scan_spec, partition_schema.hash_schema_,
         {scan_range_lower_bound, scan_range_upper_bound});
-    // Reverse the order of the partition key ranges, so that it is efficient
-    // to remove the partition key ranges from the vector in ascending order.
-    range_bounds_to_partition_key_ranges_.resize(1);
-    auto& first_range = range_bounds_to_partition_key_ranges_[0];
-    first_range.partition_key_ranges.resize(partition_key_ranges.size());
+    partition_key_ranges_.resize(partition_key_ranges.size());
     move(partition_key_ranges.rbegin(), partition_key_ranges.rend(),
-         first_range.partition_key_ranges.begin());
+         partition_key_ranges_.begin());
   } else {
-    // Build the preliminary set or ranges: that's to convey information on
+    // Build the preliminary set of ranges: that's to convey information on
     // range-specific hash schemas since some ranges in the table can have
     // custom (i.e. different from the table-wide) hash schemas.
     PartitionSchema::RangesWithHashSchemas preliminary_ranges;
@@ -563,21 +557,27 @@ void PartitionPruner::Init(const Schema& schema,
         partition_schema.ranges_with_custom_hash_schemas(),
         &preliminary_ranges);
 
-    range_bounds_to_partition_key_ranges_.resize(preliminary_ranges.size());
     // Construct partition key ranges from the ranges and their respective hash
     // schemas that falls within the scan's bounds.
     for (size_t i = 0; i < preliminary_ranges.size(); ++i) {
       const auto& hash_schema = preliminary_ranges[i].hash_schema;
-      RangeBounds range_bounds{
-          preliminary_ranges[i].lower, preliminary_ranges[i].upper};
+      RangeBounds range_bounds {preliminary_ranges[i].lower, 
preliminary_ranges[i].upper};
       auto partition_key_ranges = ConstructPartitionKeyRanges(
           schema, scan_spec, hash_schema, range_bounds);
-      auto& current_range = range_bounds_to_partition_key_ranges_[i];
-      current_range.range_bounds = std::move(range_bounds);
-      current_range.partition_key_ranges.resize(partition_key_ranges.size());
-      move(partition_key_ranges.rbegin(), partition_key_ranges.rend(),
-           current_range.partition_key_ranges.begin());
+      partition_key_ranges_.resize(partition_key_ranges_.size() + 
partition_key_ranges.size());
+      move(partition_key_ranges.begin(), partition_key_ranges.end(),
+           partition_key_ranges_.rbegin());
     }
+    // Reverse the order of the partition key ranges, so that it is efficient
+    // to remove the partition key ranges from the vector in ascending order.
+    constexpr struct {
+      bool operator()(const PartitionKeyRange& lhs, const PartitionKeyRange& 
rhs) const {
+        return lhs.start > rhs.start;
+      }
+    } PartitionKeyRangeLess;
+    sort(partition_key_ranges_.begin(),
+         partition_key_ranges_.end(),
+         PartitionKeyRangeLess);
   }
 
   // Remove all partition key ranges before the scan spec's lower bound 
partition key.
@@ -592,71 +592,55 @@ bool PartitionPruner::HasMorePartitionKeyRanges() const {
 
 const PartitionKey& PartitionPruner::NextPartitionKey() const {
   CHECK(HasMorePartitionKeyRanges());
-  return 
range_bounds_to_partition_key_ranges_.back().partition_key_ranges.back().start;
+  return partition_key_ranges_.back().start;
 }
 
 void PartitionPruner::RemovePartitionKeyRange(const PartitionKey& upper_bound) 
{
   if (upper_bound.empty()) {
-    range_bounds_to_partition_key_ranges_.clear();
+    partition_key_ranges_.clear();
     return;
   }
 
-  for (auto& range_bounds_and_partition_key_range : 
range_bounds_to_partition_key_ranges_) {
-    auto& partition_key_range = 
range_bounds_and_partition_key_range.partition_key_ranges;
-    for (auto range_it = partition_key_range.rbegin();
-         range_it != partition_key_range.rend();
-         ++range_it) {
-      if (upper_bound <= (*range_it).start) { break; }
-      if ((*range_it).end.empty() || upper_bound < (*range_it).end) {
-        (*range_it).start = upper_bound;
-      } else {
-        partition_key_range.pop_back();
-      }
+  for (auto range_it = partition_key_ranges_.rbegin();
+       range_it != partition_key_ranges_.rend();
+       ++range_it) {
+    if (upper_bound <= (*range_it).start) { break; }
+    // Condition met if upper_bound lies in the middle of current partition 
key range
+    if ((*range_it).end.empty() || upper_bound < (*range_it).end) {
+       (*range_it).start = upper_bound;
+    } else {
+      partition_key_ranges_.pop_back();
     }
   }
 }
 
 bool PartitionPruner::ShouldPrune(const Partition& partition) const {
-  for (const auto& [range_bounds, partition_key_ranges] : 
range_bounds_to_partition_key_ranges_) {
-    // Check if the partition belongs to the same range as the partition key 
range.
-    if (!range_bounds.lower.empty() && partition.begin().range_key() != 
range_bounds.lower &&
-        !range_bounds.upper.empty() && partition.end().range_key() != 
range_bounds.upper) {
-      continue;
-    }
-    // range is an iterator that points to the first partition key range which
-    // overlaps or is greater than the partition.
-    auto range = lower_bound(partition_key_ranges.rbegin(), 
partition_key_ranges.rend(),
-                             partition, [] (const PartitionKeyRange& 
scan_range,
-                                 const Partition& partition) {
-                               // return true if scan_range < partition
-                               const auto& scan_upper = scan_range.end;
-                               return !scan_upper.empty()
-                               && scan_upper <= partition.begin();
-                             });
-    if (range == partition_key_ranges.rend()) {
-      continue;
-    }
-    if (partition.end().empty() || (*range).start < partition.end()) {
-      return false;
-    }
+  // range is an iterator that points to the first partition key range which
+  // overlaps or is greater than the partition.
+  auto range = lower_bound(partition_key_ranges_.rbegin(), 
partition_key_ranges_.rend(), partition,
+                           [] (const PartitionKeyRange& scan_range, const 
Partition& partition) {
+    // return true if scan_range < partition
+    const auto& scan_upper = scan_range.end;
+    return !scan_upper.empty() && scan_upper <= partition.begin();
+  });
+  if (range == partition_key_ranges_.rend()) {
+    return true;
   }
-  return true;
+  return !(partition.end().empty() || partition.end() > (*range).start);
 }
 
 string PartitionPruner::ToString(const Schema& schema,
                                  const PartitionSchema& partition_schema) 
const {
   vector<string> strings;
-  for (const auto& partition_key_range : 
range_bounds_to_partition_key_ranges_) {
-    for (auto range = partition_key_range.partition_key_ranges.rbegin();
-         range != partition_key_range.partition_key_ranges.rend();
-         ++range) {
-      strings.push_back(strings::Substitute(
-          "[($0), ($1))",
-          (*range).start.empty() ? "<start>" :
-            partition_schema.PartitionKeyDebugString((*range).start, schema),
+  for (auto range = partition_key_ranges_.rbegin();
+       range != partition_key_ranges_.rend();
+       ++range) {
+    strings.push_back(strings::Substitute(
+        "[($0), ($1))",
+        (*range).start.empty() ? "<start>" :
+          partition_schema.PartitionKeyDebugString((*range).start, schema),
           (*range).end.empty() ? "<end>" :
-            partition_schema.PartitionKeyDebugString((*range).end, schema)));
-    }
+          partition_schema.PartitionKeyDebugString((*range).end, schema)));
   }
 
   return JoinStrings(strings, ", ");
diff --git a/src/kudu/common/partition_pruner.h 
b/src/kudu/common/partition_pruner.h
index fb17e2ee1..145854356 100644
--- a/src/kudu/common/partition_pruner.h
+++ b/src/kudu/common/partition_pruner.h
@@ -62,11 +62,7 @@ class PartitionPruner {
 
   // Returns the number of partition key ranges remaining in the scan.
   size_t NumRangesRemaining() const {
-    size_t num_ranges = 0;
-    for (const auto& range: range_bounds_to_partition_key_ranges_) {
-      num_ranges += range.partition_key_ranges.size();
-    }
-    return num_ranges;
+    return partition_key_ranges_.size();
   }
 
   // Returns a text description of this partition pruner suitable for debug
@@ -87,11 +83,6 @@ class PartitionPruner {
     PartitionKey end;
   };
 
-  struct RangeBoundsAndPartitionKeyRanges {
-    RangeBounds range_bounds;
-    std::vector<PartitionKeyRange> partition_key_ranges;
-  };
-
   // Search all combinations of in-list and equality predicates.
   // Return hash values bitset of these combinations.
   static std::vector<bool> PruneHashComponent(
@@ -126,10 +117,9 @@ class PartitionPruner {
     const PartitionSchema::RangesWithHashSchemas& 
ranges_with_custom_hash_schemas,
     PartitionSchema::RangesWithHashSchemas* ranges);
 
-  // A vector of a pair of lower and upper range bounds mapped to a reverse
-  // sorted set of partition key ranges. Each partition key range within the 
set
+  // The reverse sorted set of partition key ranges. Each range
   // has an inclusive lower bound and an exclusive upper bound.
-  std::vector<RangeBoundsAndPartitionKeyRanges> 
range_bounds_to_partition_key_ranges_;
+  std::vector<PartitionKeyRange> partition_key_ranges_;
 
   DISALLOW_COPY_AND_ASSIGN(PartitionPruner);
 };

Reply via email to