This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.13.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.13.x by this push:
     new 7c8dca6  KUDU-3254 fix bug in meta-cache exposed by KUDU-1802
7c8dca6 is described below

commit 7c8dca60d15b560017ef7e726a379788727502ba
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Wed Mar 3 21:49:56 2021 -0800

    KUDU-3254 fix bug in meta-cache exposed by KUDU-1802
    
    This patch fixes an issue resulting in a SIGABRT crash in Kudu client
    when working with stale scan tokens which contain information about
    tablet locations for a table (see KUDU-1802) whose range partition
    was dropped.  The patch also adds a test scenario reproducing the crash;
    now it passes and can catch future regressions.
    
    This patch is a follow-up to d23ee5d38ddc4317f431dd65df0c825c00cc968a.
    
    Prior the change in src/kudu/client/meta_cache.cc was back-ported from
    Kudu 1.14 as part of this fix, the scenario crashed with SIGABRT when
    running with the stack trace similar to the following (this one below
    was captured on macOS):
    
      * frame #0: 0x00007fff7035833a libsystem_kernel.dylib`__pthread_kill + 10
        frame #1: 0x00007fff70414e60 libsystem_pthread.dylib`pthread_kill + 430
        frame #2: 0x00007fff702df808 libsystem_c.dylib`abort + 120
        frame #3: 0x000000010ca1a259 libglog.0.dylib`google::logging_fail() at 
logging.cc:1474:3
        frame #4: 0x000000010ca19121 
libglog.0.dylib`google::LogMessage::SendToLog() [inlined] 
google::LogMessage::Fail() at logging.cc:1488:3
        frame #5: 0x000000010ca1911b 
libglog.0.dylib`google::LogMessage::SendToLog() at logging.cc:1442
        frame #6: 0x000000010ca19815 
libglog.0.dylib`google::LogMessage::Flush() at logging.cc:1311:5
        frame #7: 0x000000010ca1d76f 
libglog.0.dylib`google::LogMessageFatal::~LogMessageFatal() at logging.cc:2023:5
        frame #8: 0x000000010ca1a5f9 
libglog.0.dylib`google::LogMessageFatal::~LogMessageFatal() at 
logging.cc:2022:37
        frame #9: 0x0000000103e365e3 
libkudu_client.dylib`std::__1::map<std::__1::basic_string<char, 
std::__1::char_traits<char>, std::__1::allocator<char> >, 
kudu::client::internal::MetaCacheEntry, 
std::__1::less<std::__1::basic_string<char, std::__1::char_traits<char>, 
std::__1::allocator<char> > >, 
std::__1::allocator<std::__1::pair<std::__1::basic_string<char, 
std::__1::char_traits<char>, std::__1::allocator<char> > const, 
kudu::client::internal::MetaCacheEntry> > >::mapped_type& Find [...]
        frame #10: 0x0000000103e34cbb 
libkudu_client.dylib`kudu::client::internal::MetaCache::ProcessGetTableLocationsResponse()
 at meta_cache.cc:943:23
        frame #11: 0x0000000103e86166 
libkudu_client.dylib`kudu::client::KuduScanToken::Data::PBIntoScanner() at 
scan_token-internal.cc:192:35
        frame #12: 0x0000000103e88051 
libkudu_client.dylib`kudu::client::KuduScanToken::Data::DeserializeIntoScanner()
 at scan_token-internal.cc:111:10
        frame #13: 0x0000000103d55d3c 
libkudu_client.dylib`kudu::client::KuduScanToken::DeserializeIntoScanner() at 
client.cc:1879:10
    
    Change-Id: I5b8370290c13b1e496f461ed5bc2e0193bdf4b19
    Reviewed-on: http://gerrit.cloudera.org:8080/17152
    Tested-by: Alexey Serbin <aser...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/meta_cache.cc      |  19 ++-
 src/kudu/client/scan_token-test.cc | 263 +++++++++++++++++++++++++++++++++++--
 2 files changed, 264 insertions(+), 18 deletions(-)

diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 5a680e4..427d8a7 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -940,10 +940,21 @@ Status MetaCache::ProcessGetTableLocationsResponse(const 
KuduTable* table,
                               Substitute("failed to refresh locations for 
tablet $0",
                                          tablet_id));
         // Update the entry TTL.
-        auto& entry = FindOrDie(tablets_by_key, tablet_lower_bound);
-        DCHECK(!entry.is_non_covered_range() &&
-               entry.upper_bound_partition_key() == tablet_upper_bound);
-        entry.refresh_expiration_time(expiration_time);
+        auto* entry = FindOrNull(tablets_by_key, tablet_lower_bound);
+        if (entry) {
+          DCHECK(!entry->is_non_covered_range() &&
+                 entry->upper_bound_partition_key() == tablet_upper_bound);
+          entry->refresh_expiration_time(expiration_time);
+        } else {
+          // A remote tablet exists, but isn't indexed for key-based lookups.
+          // This might happen if the entry was removed after tablet range
+          // was dropped, but then a scan token with stale information on 
tablet
+          // locations was provided to start a scan.
+          MetaCacheEntry entry(expiration_time, remote);
+          VLOG(3) << Substitute("Caching '$0' entry $1",
+              table->name(), entry.DebugString(table));
+          EmplaceOrDie(&tablets_by_key, tablet_lower_bound, std::move(entry));
+        }
         continue;
       }
 
diff --git a/src/kudu/client/scan_token-test.cc 
b/src/kudu/client/scan_token-test.cc
index da1753c..3435fc9 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -43,8 +43,10 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
@@ -63,19 +65,20 @@ DECLARE_bool(tserver_enforce_access_control);
 
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
 
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
 
-namespace kudu {
-namespace client {
-
-using cluster::InternalMiniCluster;
-using cluster::InternalMiniClusterOptions;
-using sp::shared_ptr;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::tserver::MiniTabletServer;
 using std::atomic;
 using std::string;
 using std::thread;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
-using tserver::MiniTabletServer;
+
+namespace kudu {
+namespace client {
 
 class ScanTokenTest : public KuduTest {
 
@@ -128,6 +131,28 @@ class ScanTokenTest : public KuduTest {
     return rows;
   }
 
+  // Similar to CountRows() above, but use the specified client handle
+  // and run all the scanners sequentially, one by one.
+  Status CountRowsSeq(KuduClient* client,
+                      vector<KuduScanToken*> tokens,
+                      int64_t* row_count) {
+    int64_t count = 0;
+    for (auto* t : tokens) {
+      unique_ptr<KuduScanToken> token(t);
+      unique_ptr<KuduScanner> scanner;
+      RETURN_NOT_OK(IntoUniqueScanner(client, *token, &scanner));
+
+      RETURN_NOT_OK(scanner->Open());
+      while (scanner->HasMoreRows()) {
+        KuduScanBatch batch;
+        RETURN_NOT_OK(scanner->NextBatch(&batch));
+        count += batch.NumRows();
+      }
+    }
+    *row_count = count;
+    return Status::OK();
+  }
+
   void VerifyTabletInfo(const vector<KuduScanToken*>& tokens) {
     unordered_set<string> tablet_ids;
     for (auto t : tokens) {
@@ -196,7 +221,7 @@ TEST_F(ScanTokenTest, TestScanTokens) {
   {
     unique_ptr<KuduPartialRow> split(schema.NewRow());
     ASSERT_OK(split->SetInt64("col", 0));
-    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
     ASSERT_OK(table_creator->table_name("table")
@@ -391,7 +416,7 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
   {
     unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
     unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
-    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
     table_creator->table_name("table");
     table_creator->num_replicas(1);
     table_creator->schema(&schema);
@@ -549,7 +574,7 @@ TEST_P(TimestampPropagationParamTest, Test) {
     {
       unique_ptr<KuduPartialRow> split(schema.NewRow());
       ASSERT_OK(split->SetInt64(kKeyColumnName, 0));
-      unique_ptr<client::KuduTableCreator> creator(client_->NewTableCreator());
+      unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
       ASSERT_OK(creator->table_name(kTableName)
@@ -625,7 +650,7 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
   // Create table
   shared_ptr<KuduTable> table;
   {
-    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
     ASSERT_OK(table_creator->table_name(kTableName)
                             .schema(&schema)
                             .set_range_partition_columns({})
@@ -716,7 +741,7 @@ TEST_F(ScanTokenTest, TestConcurrentRenameTable) {
   // Create table
   shared_ptr<KuduTable> table;
   {
-    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
     ASSERT_OK(table_creator->table_name(kTableName)
                             .schema(&schema)
                             .set_range_partition_columns({})
@@ -759,7 +784,7 @@ TEST_F(ScanTokenTest, TestMasterRequestsWithMetadata) {
   // Create table
   shared_ptr<KuduTable> table;
   {
-    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
     ASSERT_OK(table_creator->table_name(kTableName)
                             .schema(&schema)
                             .set_range_partition_columns({})
@@ -815,7 +840,7 @@ TEST_F(ScanTokenTest, TestMasterRequestsNoMetadata) {
   // Create table
   shared_ptr<KuduTable> table;
   {
-    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
     ASSERT_OK(table_creator->table_name(kTableName)
                   .schema(&schema)
                   .set_range_partition_columns({})
@@ -853,5 +878,215 @@ TEST_F(ScanTokenTest, TestMasterRequestsNoMetadata) {
   ASSERT_EQ(init_location_requests + 1, NumGetTableLocationsRequests());
 }
 
+enum FirstRangeChangeMode {
+  BEGIN = 0,
+  RANGE_DROPPED = 0,
+  RANGE_DROPPED_AND_PRECEDING_RANGE_ADDED = 1,
+  RANGE_DROPPED_AND_LARGER_ONE_ADDED = 2,
+  RANGE_DROPPED_AND_SMALLER_ONE_ADDED = 3,
+  RANGE_REPLACED_WITH_SAME = 4,
+  RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED = 5,
+  RANGE_REPLACED_WITH_TWO_SMALLER_ONES = 6,
+  END = 7,
+};
+
+class StaleScanTokensParamTest :
+    public ScanTokenTest,
+    public ::testing::WithParamInterface<FirstRangeChangeMode> {
+};
+
+// Create scan tokens for one state of the table and store it for future use.
+// Use the tokens to scan the table. Alter the table dropping first range
+// partition, optionally replacing it according with FirstRangeChangeMode
+// enum. Open the altered table via the client handle which was used to run
+// the token-based scan prior. Now, attempt to scan the table using stale
+// tokens generated with the original state of the table.
+TEST_P(StaleScanTokensParamTest, DroppingFirstRange) {
+  constexpr const char* const kTableName = "stale-scan-tokens-dfr";
+  KuduSchema schema;
+  {
+    KuduSchemaBuilder builder;
+    builder.AddColumn("key")->
+        NotNull()->
+        Type(KuduColumnSchema::INT64)->
+        PrimaryKey();
+    ASSERT_OK(builder.Build(&schema));
+  }
+
+  shared_ptr<KuduTable> table;
+  {
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    {
+      unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+      ASSERT_OK(lower_bound->SetInt64("key", -100));
+      unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+      ASSERT_OK(upper_bound->SetInt64("key", 0));
+      table_creator->add_range_partition(
+          lower_bound.release(), upper_bound.release());
+    }
+    {
+      unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+      ASSERT_OK(lower_bound->SetInt64("key", 0));
+      unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+      ASSERT_OK(upper_bound->SetInt64("key", 100));
+      table_creator->add_range_partition(
+          lower_bound.release(), upper_bound.release());
+    }
+
+    ASSERT_OK(table_creator->table_name(kTableName)
+        .schema(&schema)
+        .set_range_partition_columns({ "key" })
+        .num_replicas(1)
+        .Create());
+    ASSERT_OK(client_->OpenTable(kTableName, &table));
+  }
+
+  // Populate the table with data.
+  {
+    shared_ptr<KuduSession> session = client_->NewSession();
+    session->SetTimeoutMillis(10000);
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+    for (int i = -50; i < 50; ++i) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
+      ASSERT_OK(session->Apply(insert.release()));
+    }
+  }
+
+  // Prepare two sets of scan tokens.
+  vector<KuduScanToken*> tokens_a;
+  {
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.IncludeTableMetadata(true));
+    ASSERT_OK(builder.IncludeTabletMetadata(true));
+    ASSERT_OK(builder.Build(&tokens_a));
+  }
+  ASSERT_EQ(2, tokens_a.size());
+
+  vector<KuduScanToken*> tokens_b;
+  {
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.IncludeTableMetadata(true));
+    ASSERT_OK(builder.IncludeTabletMetadata(true));
+    ASSERT_OK(builder.Build(&tokens_b));
+  }
+  ASSERT_EQ(2, tokens_b.size());
+
+  // Drop the first range partition, running the operation via the 'client_'
+  // handle.
+  {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(lower_bound->SetInt64("key", -100));
+    ASSERT_OK(upper_bound->SetInt64("key", 0));
+    unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
+    ASSERT_OK(alterer->DropRangePartition(
+        lower_bound.release(), upper_bound.release())->Alter());
+  }
+
+  shared_ptr<KuduClient> new_client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+
+  int64_t row_count_a = 0;
+  ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_a), &row_count_a));
+  ASSERT_EQ(50, row_count_a);
+
+  // Open the test table via 'new_client' handle to populate the metadata
+  // with actual table metadata, including non-covered ranges. This purges
+  // an entry for the [-100, 0) range from the 'tablet_by_key_' map, still
+  // keeping corresponding RemoteTable entry in the 'tablets_by_id_' map.
+  {
+    shared_ptr<KuduTable> t;
+    ASSERT_OK(new_client->OpenTable(kTableName, &t));
+  }
+
+  const auto range_adder = [&schema](
+      KuduClient* c, int64_t range_beg, int64_t range_end) {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    RETURN_NOT_OK(lower_bound->SetInt64("key", range_beg));
+    RETURN_NOT_OK(upper_bound->SetInt64("key", range_end));
+    unique_ptr<KuduTableAlterer> alterer(c->NewTableAlterer(kTableName));
+    return alterer->AddRangePartition(lower_bound.release(),
+                                      upper_bound.release())->Alter();
+   };
+
+  // The bifurcation point.
+  const auto mode = GetParam();
+  switch (mode) {
+    case RANGE_DROPPED:
+      break;
+    case RANGE_DROPPED_AND_PRECEDING_RANGE_ADDED:
+      ASSERT_OK(range_adder(client_.get(), -200, -100));
+      break;
+    case RANGE_DROPPED_AND_LARGER_ONE_ADDED:
+      ASSERT_OK(range_adder(client_.get(), -200, 0));
+      break;
+    case RANGE_DROPPED_AND_SMALLER_ONE_ADDED:
+      ASSERT_OK(range_adder(client_.get(), -50, 0));
+      break;
+    case RANGE_REPLACED_WITH_SAME:
+      ASSERT_OK(range_adder(client_.get(), -100, 0));
+      break;
+    case RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED:
+      ASSERT_OK(range_adder(client_.get(), -100, 0));
+      ASSERT_OK(range_adder(client_.get(), -200, -100));
+      break;
+    case RANGE_REPLACED_WITH_TWO_SMALLER_ONES:
+      ASSERT_OK(range_adder(client_.get(), -100, -50));
+      ASSERT_OK(range_adder(client_.get(), -50, 0));
+      break;
+    default:
+      FAIL() << strings::Substitute("$0: unsupported partition change mode",
+                                    static_cast<uint16_t>(mode));
+  }
+
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  int64_t expected_row_count = 50;
+  switch (mode) {
+    case RANGE_DROPPED_AND_LARGER_ONE_ADDED:
+      expected_row_count += 100;
+      FALLTHROUGH_INTENDED;
+    case RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED:
+      for (int i = -200; i < -100; ++i) {
+        unique_ptr<KuduInsert> insert(table->NewInsert());
+        ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
+        ASSERT_OK(session->Apply(insert.release()));
+      }
+      // The rows in the preceeding range should not be read if using the
+      // token for the [-100, 0) original range.
+      FALLTHROUGH_INTENDED;
+    case RANGE_DROPPED_AND_SMALLER_ONE_ADDED:
+    case RANGE_REPLACED_WITH_SAME:
+    case RANGE_REPLACED_WITH_TWO_SMALLER_ONES:
+      for (int i = -25; i < 0; ++i) {
+        unique_ptr<KuduInsert> insert(table->NewInsert());
+        ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
+        ASSERT_OK(session->Apply(insert.release()));
+      }
+      expected_row_count += 25;
+      break;
+    default:
+      break;
+  }
+
+  // Start another tablet scan using the other identical set of scan tokens.
+  // The client metacache should not produce any errors: it should re-fetch
+  // the information about the current partitioning scheme and scan the table
+  // within the range of the new partitions which correspond to the originally
+  // supplied range.
+  int64_t row_count_b = -1;
+  ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_b), &row_count_b));
+  ASSERT_EQ(expected_row_count, row_count_b);
+}
+
+INSTANTIATE_TEST_CASE_P(FirstRangeDropped, StaleScanTokensParamTest,
+                        testing::Range(FirstRangeChangeMode::BEGIN,
+                                       FirstRangeChangeMode::END));
+
 } // namespace client
 } // namespace kudu

Reply via email to