This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5063e80e1ca26c5f1b763a6ce3e4708cd8196a26
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Tue Jun 11 18:44:39 2024 -0700

    KUDU-3584 fix flakiness in TableKeyRangeTest
    
    When running client-test in TSAN/ASAN builds, the
    TableKeyRangeTest.TestGetTableKeyRange scenario would sometimes fail
    on busy nodes.  This patch addresses the issue by increasing the
    timeout for scanners and for the write session, and making the related
    code more robust overall.
    
    I also took the liberty of cleaning up the related code.
    
    Change-Id: I1efc2fe6ee7f2dfe94b52b14f1316ffbafd39d52
    Reviewed-on: http://gerrit.cloudera.org:8080/21506
    Reviewed-by: KeDeng <kdeng...@gmail.com>
    Reviewed-by: Marton Greber <greber...@gmail.com>
    Tested-by: Marton Greber <greber...@gmail.com>
---
 src/kudu/client/client-test.cc | 105 ++++++++++++++++++++++++++++-------------
 1 file changed, 71 insertions(+), 34 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index c3b46baa5..c89fc3855 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -412,6 +412,7 @@ class ClientTest : public KuduTest {
 
   void CheckTokensInfo(const vector<KuduScanToken*>& tokens,
                        int replica_num = 1) {
+    ASSERT_GE(replica_num, 1);
     for (const auto* t : tokens) {
       const KuduTablet& tablet = t->tablet();
       ASSERT_EQ(replica_num, tablet.replicas().size());
@@ -430,7 +431,7 @@ class ClientTest : public KuduTest {
         tablet_copy.reset(ptr);
       }
       ASSERT_EQ(tablet.id(), tablet_copy->id());
-      ASSERT_EQ(1, tablet_copy->replicas().size());
+      ASSERT_EQ(replica_num, tablet_copy->replicas().size());
       const KuduReplica* replica_copy = tablet_copy->replicas()[0];
 
       ASSERT_EQ(replica->is_leader(), replica_copy->is_leader());
@@ -440,36 +441,60 @@ class ClientTest : public KuduTest {
     }
   }
 
-  int CountRows(const vector<KuduScanToken*>& tokens) {
-    atomic<uint32_t> rows(0);
+  Status CountRows(const vector<KuduScanToken*>& tokens, size_t* count) {
+    #define THR_RET_NOT_OK(s, res_status) do { \
+        if (const Status& _s = (s); !_s.ok()) { \
+          *(res_status) = _s; \
+          return; \
+        } \
+      } while (false)
+
+    vector<Status> thread_status(tokens.size(), Status::OK());
     vector<thread> threads;
-    for (KuduScanToken* token : tokens) {
-      string buf;
-      CHECK_OK(token->Serialize(&buf));
+    atomic<size_t> rows(0);
+    for (size_t i = 0; i < tokens.size(); ++i) {
+      const size_t thread_idx = i;
+      const auto* token = tokens[thread_idx];
+
+      threads.emplace_back([this, &rows] (const KuduScanToken* token, Status* 
thread_status) {
+        string serialized_token;
+        THR_RET_NOT_OK(token->Serialize(&serialized_token), thread_status);
 
-      threads.emplace_back([this, &rows] (const string& serialized_token) {
         shared_ptr<KuduClient> client;
-        ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+        THR_RET_NOT_OK(cluster_->CreateClient(nullptr, &client), 
thread_status);
+
         KuduScanner* scanner_ptr;
-        ASSERT_OK(KuduScanToken::DeserializeIntoScanner(
-            client.get(), serialized_token, &scanner_ptr));
+        THR_RET_NOT_OK(KuduScanToken::DeserializeIntoScanner(
+            client.get(), serialized_token, &scanner_ptr), thread_status);
+
         unique_ptr<KuduScanner> scanner(scanner_ptr);
-        ASSERT_OK(scanner->Open());
+        // Try to avoid flakiness when running at busy nodes.
+        scanner->SetTimeoutMillis(60 * 1000);
+        // Make sure to read the most recent data if the test table has
+        // multiple replicas.
+        THR_RET_NOT_OK(scanner->SetSelection(KuduClient::LEADER_ONLY), 
thread_status);
+        THR_RET_NOT_OK(scanner->Open(), thread_status);
 
         while (scanner->HasMoreRows()) {
           KuduScanBatch batch;
-          ASSERT_OK(scanner->NextBatch(&batch));
+          THR_RET_NOT_OK(scanner->NextBatch(&batch), thread_status);
           rows += batch.NumRows();
         }
-        scanner->Close();
-      }, std::move(buf));
+      }, token, &thread_status[thread_idx]);
     }
 
     for (thread& thread : threads) {
       thread.join();
     }
+    for (const auto& s : thread_status) {
+      if (!s.ok()) {
+        return s;
+      }
+    }
 
-    return rows;
+    *count = rows;
+    return Status::OK();
+    #undef THR_RET_NOT_OK
   }
 
   // Return the number of lookup-related RPCs which have been serviced by the 
master.
@@ -8970,14 +8995,13 @@ class TableKeyRangeTest : public ClientTest {
   }
 
   static void InsertTestRowsWithStrings(KuduTable* table, KuduSession* 
session, int num_rows) {
-    string str_val = "*";
+    static const string kStringVal = "*";
     for (int i = 0; i < num_rows; i++) {
       unique_ptr<KuduInsert> insert(table->NewInsert());
       ASSERT_OK(insert->mutable_row()->SetInt32("key", i));
       ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2));
-      ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val));
+      ASSERT_OK(insert->mutable_row()->SetStringNoCopy("string_val", 
kStringVal));
       ASSERT_OK(session->Apply(insert.release()));
-      ASSERT_OK(session->Flush());
     }
   }
 
@@ -8988,18 +9012,22 @@ class TableKeyRangeTest : public ClientTest {
 };
 
 TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
-  client::sp::shared_ptr<KuduTable> table;
+  constexpr const size_t kRowsNum = 1000;
+  shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable(kTableName, &table));
   {
+    // Should have no rows to begin with.
+    ASSERT_EQ(0, CountRowsFromClient(table.get()));
+
     // Create session
     shared_ptr<KuduSession> session = client_->NewSession();
-    session->SetTimeoutMillis(10000);
     ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    // Try to avoid flakiness when running at busy nodes.
+    session->SetTimeoutMillis(60 * 1000);
 
-    // Should have no rows to begin with.
-    ASSERT_EQ(0, CountRowsFromClient(table.get()));
     // Insert rows
-    NO_FATALS(InsertTestRowsWithStrings(client_table_.get(), session.get(), 
1000));
+    NO_FATALS(InsertTestRowsWithStrings(client_table_.get(), session.get(), 
kRowsNum));
+    ASSERT_OK(session->Flush());
     NO_FATALS(CheckNoRpcOverflow());
   }
 
@@ -9015,7 +9043,9 @@ TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
     ASSERT_EQ(4, tokens.size());
 
     NO_FATALS(CheckTokensInfo(tokens));
-    ASSERT_EQ(1000, CountRows(tokens));
+    size_t count = 0;
+    ASSERT_OK(CountRows(tokens, &count));
+    ASSERT_EQ(kRowsNum, count);
   }
 
   {
@@ -9031,7 +9061,9 @@ TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
     ASSERT_EQ(4, tokens.size());
 
     NO_FATALS(CheckTokensInfo(tokens));
-    ASSERT_EQ(1000, CountRows(tokens));
+    size_t count = 0;
+    ASSERT_OK(CountRows(tokens, &count));
+    ASSERT_EQ(kRowsNum, count);
   }
 
   uint32_t token_size_a = 0;
@@ -9047,7 +9079,9 @@ TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
     ASSERT_LE(4, token_size_a);
 
     NO_FATALS(CheckTokensInfo(tokens));
-    ASSERT_EQ(1000, CountRows(tokens));
+    size_t count = 0;
+    ASSERT_OK(CountRows(tokens, &count));
+    ASSERT_EQ(kRowsNum, count);
   }
 
   uint32_t token_size_b = 0;
@@ -9063,14 +9097,15 @@ TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
     ASSERT_LE(4, token_size_b);
 
     NO_FATALS(CheckTokensInfo(tokens));
-    ASSERT_EQ(1000, CountRows(tokens));
+    size_t count = 0;
+    ASSERT_OK(CountRows(tokens, &count));
+    ASSERT_EQ(kRowsNum, count);
   }
 
-  // Different "splitSizeBytes" values typically leads to
-  // different numbers of tokens, although they may also
-  // leads to the same number of tokens.
-  // However, a smaller "splitSizeBytes" value will definitely
-  // not generate fewer tokens than a larger "splitSizeBytes" value.
+  // Different "splitSizeBytes" values typically result in different number
+  // of scan tokens, but relatively close "splitSizeBytes" values may result
+  // in the same number of tokens. However, a much smaller "splitSizeBytes"
+  // value definitely results in many more scan tokens than a much larger one.
   ASSERT_LE(token_size_a, token_size_b);
 
   {
@@ -9081,10 +9116,12 @@ TEST_F(TableKeyRangeTest, TestGetTableKeyRange) {
     // set splitSizeBytes > tablet's size
     builder.SetSplitSizeBytes(1024 * 1024 * 1024);
     ASSERT_OK(builder.Build(&tokens));
-    ASSERT_EQ(tokens.size(), 4);
+    ASSERT_EQ(4, tokens.size());
 
     NO_FATALS(CheckTokensInfo(tokens));
-    ASSERT_EQ(1000, CountRows(tokens));
+    size_t count = 0;
+    ASSERT_OK(CountRows(tokens, &count));
+    ASSERT_EQ(kRowsNum, count);
   }
 }
 

Reply via email to