This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f4a47fe04 Add a benchmark for CBTree concurrent writes.
f4a47fe04 is described below

commit f4a47fe041b7f547fd4816706347523e06f94f6d
Author: Zoltan Martonka <zmarto...@gmail.com>
AuthorDate: Wed May 29 13:23:17 2024 +0000

    Add a benchmark for CBTree concurrent writes.
    
    Before updating CBTree for ARM (where it is misbehaving currently),
    we should have a proper test for two scenarios:
    
    + Writing on multiple threads.
    + Reading on multiple threads while there are also active writes.
    
    If read threads wait for values to be inserted, it defeats the purpose
    of benchmarking. Therefore, we should first populate a tree with
    values for the read threads. The read threads will then read values
    that are already in the tree, while the write threads continue to insert
    new values.
    
    Setting up the tree for the second scenario essentially involves
    performing the first scenario. This is why both scenarios are combined
    into a single test.
    
    The new test provides the following new features (compared to just
    running DoTestConcurrentInsert with higher parameters):
    
    + Different threads read the value that inserted it
    + Reader threads can't be assigned to a certain writer thread.
    + Keys are better distributed than the previous shuffle method.
    + Allows measuring read-heavy performance (with a flag).
    
    Reading threads start concurrently with writing threads, not at the
    end of each write thread (unlike DoTestConcurrentInsert).
    
    Note that running only concurrent reads should not differ from
    TestScanPerformance, since no locking takes place and they do not
    sabotage each other. So no new test is required for that scenario.
    
    Change-Id: I1b0b16e269c70716962fc5ebb4ddca1e2cbe68a4
    Reviewed-on: http://gerrit.cloudera.org:8080/21447
    Reviewed-by: Zoltan Chovan <zcho...@cloudera.com>
    Reviewed-by: Ashwani Raina <ara...@cloudera.com>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
    Tested-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/tablet/cbtree-test.cc | 197 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 197 insertions(+)

diff --git a/src/kudu/tablet/cbtree-test.cc b/src/kudu/tablet/cbtree-test.cc
index 32cd7b75c..1f342d8cf 100644
--- a/src/kudu/tablet/cbtree-test.cc
+++ b/src/kudu/tablet/cbtree-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <array>
 #include <cstdint>
 #include <cstdio>
 #include <cstdlib>
@@ -25,6 +27,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -36,7 +39,9 @@
 #include "kudu/util/debug/sanitizer_scopes.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/hexdump.h"
+#include "kudu/util/mem_tracker.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/memory/memory.h"
 #include "kudu/util/memory/overwrite.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/stopwatch.h"
@@ -49,6 +54,17 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+DEFINE_int32(concurrent_rw_benchmark_num_writer_threads, 4,
+             "Number of writer threads in TestConcurrentReadWritePerformance");
+DEFINE_int32(concurrent_rw_benchmark_num_reader_threads, 4,
+             "Number of reader threads in TestConcurrentReadWritePerformance");
+DEFINE_int32(concurrent_rw_benchmark_num_inserts, 1000000,
+             "Number of inserts in TestConcurrentReadWritePerformance");
+// This might be needed, because reads are significantly faster than writes.
+DEFINE_int32(concurrent_rw_benchmark_reader_boost, 1,
+            "Multiply the amount of values each reader thread reads in "
+            "TestConcurrentReadWritePerformance");
+
 namespace kudu {
 namespace tablet {
 namespace btree {
@@ -889,6 +905,187 @@ TEST_F(TestCBTree, TestIteratorSeekAtOrBefore) {
   }
 }
 
+// All applications of CBTree use a threadsafe arena with default node sizes.
+struct ProdTreeTraits : public btree::BTreeTraits {
+  typedef ThreadSafeMemoryTrackingArena ArenaType;
+};
+
+// We benchmark two scenarios:
+// 1. Writing on multiple threads.
+// 2. Reading on multiple threads while there are also active writes.
+//
+// If read threads wait for values to be inserted, it defeats the purpose of 
benchmarking.
+// Therefore, we should first populate a tree with values for the read 
threads. The read threads
+// will then read values that are already in the tree, while the write threads 
continue to insert
+// new values.
+//
+// Setting up the tree for the second scenario essentially involves performing 
the first scenario.
+// This is why both scenarios are combined into a single test.
+TEST_F(TestCBTree, ConcurrentReadWriteBenchmark) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  constexpr int kTrials = 10;
+  // Short names to make some formulas readable
+  const int num_writer_threads = 
FLAGS_concurrent_rw_benchmark_num_writer_threads;
+  const int num_reader_threads = 
FLAGS_concurrent_rw_benchmark_num_reader_threads;
+  const int num_inserts = FLAGS_concurrent_rw_benchmark_num_inserts;
+  const int reader_boost = FLAGS_concurrent_rw_benchmark_reader_boost;
+
+  const int num_threads = num_writer_threads + num_reader_threads;
+  // Number of nodes we write in the 1st phase, and read back in the 2nd, 
while there are still
+  // concurrent writes going on.
+  const int num_inserts_first_phase = num_inserts / 2;
+
+  // We apply a (deterministic) mapping for i that feels random enough (for 
the current purpose).
+  auto generate_shuffled_kv = [](std::array<char, 32>& kbuf, std::array<char, 
32>& vbuf, int i) {
+    // any prime number satisfying p % 4 == 3 and at least 1 order of 
magnitude larger than number
+    // of threads is good. (p < num_inserts is ok).
+    constexpr int p = 10007; // Just picked the first above 10000
+    auto random_shuffle = [](int x) {
+      int32_t r = static_cast<int32_t>((static_cast<uint64_t>(x) * x) % p);
+      if (x <= p / 2)
+        return r;
+      else
+        return p - r;
+    };
+    snprintf(kbuf.data(), kbuf.size(), "key_%d_%d", random_shuffle(i % p),  
i); // max 23 bytes used
+    snprintf(vbuf.data(), vbuf.size(), "val_%d", i);
+  };
+  unique_ptr<CBTree<ProdTreeTraits>> tree;
+  vector<thread> threads;
+  // We need 2 internal barriers to know when to stop the first and start the
+  // second LOG_TIMING(...)
+  Barrier start_write_barrier(num_writer_threads + 1);
+  Barrier finish_write_barrier(num_writer_threads + 1);
+  Barrier start_rw_barrier(num_threads + 1);
+  Barrier finish_rw_barrier(num_threads + 1);
+
+  // Writer threads insert keys from [0, num_inserts_first_phase), then wait 
for the internal
+  // barriers. Then insert keys from [num_inserts_first_phase, 
num_inserts_overall). We want to
+  // insert randomly distributed values without any significant performance 
penalty.
+  // generate_shuffled_kv will apply some smart shuffling.
+  for (int tidx = 0; tidx < num_writer_threads; tidx++) {
+    threads.emplace_back([&, tidx]() {
+      std::array<char, 32> kbuf;
+      std::array<char, 32> vbuf;
+      while (true) {
+        start_write_barrier.Wait();
+        if (!tree) {
+          start_rw_barrier.Wait();  // Allow readers to wake up too.
+          return;
+        }
+        // To prevent the existence of a one-to-one mapping between reader and 
writer threads even
+        // if num_writer_threads == num_reader_threads, in the first phase a 
writing thread writes
+        // a continuous section of the keys, while reader threads distribute 
keys in a round-robin
+        // fashion.
+        int interval_length =
+            (num_inserts_first_phase + num_writer_threads - 1) / 
num_writer_threads;
+        int start = interval_length * tidx;
+        int until = std::min(interval_length * (tidx + 1), 
num_inserts_first_phase);
+        for (int i = start; i < until; ++i) {
+          generate_shuffled_kv(kbuf, vbuf, i);
+          if (!tree->Insert(Slice(kbuf.data()), Slice(vbuf.data()))) {
+            ADD_FAILURE() << "Failed insert at iteration " << i;
+            break;
+          }
+        }
+        finish_write_barrier.Wait();
+        start_rw_barrier.Wait();
+        if (!tree) {
+          return;
+        }
+        for (int i = num_inserts_first_phase + tidx; i < num_inserts;
+             i += num_writer_threads) {
+          generate_shuffled_kv(kbuf, vbuf, i);
+          if (!tree->Insert(Slice(kbuf.data()), Slice(vbuf.data()))) {
+            ADD_FAILURE() << "Failed insert at iteration " << i;
+            break;
+          }
+        }
+        finish_rw_barrier.Wait();
+      }
+    });
+  }
+
+  // We want to read values while writes are also happening. However, waiting 
with ASSERT_EVENTUALLY
+  // would completely screw performance measuring. So we will read values that 
are already
+  // guaranteed to be in the tree.
+  for (int tidx = 0; tidx < num_reader_threads; tidx++) {
+    threads.emplace_back([&, tidx]() {
+      std::array<char, 32> kbuf;
+      std::array<char, 32> vbuf;
+      while (true) {
+        // At this point, the 1st phase is done, and the first half of the 
keys are already in the
+        // tree.
+        start_rw_barrier.Wait();
+        if (!tree) {
+          return;
+        }
+        for (int64_t i = tidx;
+             i < static_cast<int64_t>(num_inserts_first_phase) * reader_boost;
+             i += num_reader_threads) {
+          generate_shuffled_kv(kbuf, vbuf, static_cast<int32_t>(i % 
num_inserts_first_phase));
+          VerifyGet(*tree, Slice(kbuf.data()), Slice(vbuf.data()));
+        }
+        finish_rw_barrier.Wait();
+      }
+    });
+  }
+
+  std::shared_ptr<MemoryTrackingBufferAllocator> mtbf;
+  std::shared_ptr<ThreadSafeMemoryTrackingArena> arena_ptr;
+
+  bool skip_normal_shutdown = false;
+
+  for (int trial = 0; trial < kTrials; trial++) {
+    // shared_ptrs are passed on the interfaces, so at first glance one would 
think it is safe to
+    // reset the ptrs in any order. But it is not.
+    tree.reset();
+    arena_ptr.reset();
+    mtbf = 
std::make_shared<MemoryTrackingBufferAllocator>(HeapBufferAllocator::Get(),
+                                                           
MemTracker::GetRootTracker());
+    arena_ptr = std::make_shared<ThreadSafeMemoryTrackingArena>(16, mtbf);
+    tree.reset(new CBTree<ProdTreeTraits>(arena_ptr));
+
+    LOG_TIMING(
+        INFO,
+        Substitute(
+            "Writing $0 values on $1 threads", num_inserts_first_phase, 
num_writer_threads)) {
+      start_write_barrier.Wait();
+      finish_write_barrier.Wait();
+    }
+    if (::testing::Test::HasFatalFailure()) {
+      tree.reset(nullptr);
+      start_rw_barrier.Wait();
+      skip_normal_shutdown = true;
+      break;
+    }
+    LOG_TIMING(INFO,
+               Substitute("Writing $0 values on $1 threads and reading $2 
values on $3 threads",
+                          num_inserts - num_inserts_first_phase,
+                          num_writer_threads,
+                          static_cast<uint64_t>(num_inserts_first_phase)
+                            * reader_boost,
+                          num_reader_threads)) {
+      start_rw_barrier.Wait();
+      finish_rw_barrier.Wait();
+    }
+    if (::testing::Test::HasFatalFailure()) {
+      // Normal shutdown is fine. Threads are already waiting for the next 
start.
+      break;
+    }
+  }
+
+  if (!skip_normal_shutdown) {
+    tree.reset(nullptr);
+    start_write_barrier.Wait();
+    start_rw_barrier.Wait();
+  }
+
+  for (thread& thr : threads) {
+    thr.join();
+  }
+}
+
 } // namespace btree
 } // namespace tablet
 } // namespace kudu

Reply via email to