This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0741fd3a3b6db561465840192888072762a0a926
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Fri Dec 10 22:25:59 2021 -0800

    [consensus] reduce critical section in LogCache::ReadOps()
    
    This patch reduces the section protected by the lock in
    LogCache::ReadOps(), moving out of the critical section several
    operations which do not need such a protection.
    
    I didn't find an existing performance test which would exercise the
    updated code path, but reducing the time spent under a lock looks like
    an improvement.
    
    Change-Id: Ia7de32eb8adde242c8047720e062535cbdaf3e39
    Reviewed-on: http://gerrit.cloudera.org:8080/18090
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/consensus_queue-test.cc | 12 ++++++++----
 src/kudu/consensus/log_cache.cc            | 30 ++++++++++++------------------
 2 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/src/kudu/consensus/consensus_queue-test.cc 
b/src/kudu/consensus/consensus_queue-test.cc
index 5b6f359..139a2f2 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -57,6 +57,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -496,6 +497,11 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
   // result in async log reads instead of cache hits.
   AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 100);
 
+  SCOPED_CLEANUP({
+    // Extract the ops from the request to avoid double free.
+    request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
+  });
+
   OpId last;
   for (int i = 0; i < 11; i++) {
     VLOG(1) << "Making request " << i;
@@ -504,7 +510,8 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
     ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, 
&needs_tablet_copy));
     ASSERT_FALSE(needs_tablet_copy);
     ASSERT_EQ(kOpsPerRequest, request.ops_size());
-    last = request.ops(request.ops_size() -1).id();
+    ASSERT_GE(request.ops_size(), 1);
+    last = request.ops(request.ops_size() - 1).id();
     SetLastReceivedAndLastCommitted(&response, last);
     VLOG(1) << "Faking received up through " << last;
     send_more_immediately = 
queue_->ResponseFromPeer(response.responder_uuid(), response);
@@ -519,9 +526,6 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
   SetLastReceivedAndLastCommitted(&response, last);
   send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), 
response);
   ASSERT_FALSE(send_more_immediately);
-
-  // extract the ops from the request to avoid double free
-  request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
 }
 
 TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 83b06f0..24281cf 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -77,8 +77,6 @@ METRIC_DEFINE_gauge_int64(tablet, log_cache_size, "Log Cache 
Memory Usage",
 
 static const char kParentMemTrackerId[] = "log_cache";
 
-typedef vector<const ReplicateMsg*>::const_iterator MsgIter;
-
 LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
                    scoped_refptr<log::Log> log,
                    string local_uuid,
@@ -315,23 +313,18 @@ Status LogCache::ReadOps(int64_t after_op_index,
         // Read up to the next entry that's in the cache
         up_to = iter->first - 1;
       }
-
       l.unlock();
 
       vector<ReplicateMsg*> raw_replicate_ptrs;
       RETURN_NOT_OK_PREPEND(
-        log_->reader()->ReadReplicatesInRange(
-          next_index, up_to, remaining_space, &raw_replicate_ptrs),
-        Substitute("Failed to read ops $0..$1", next_index, up_to));
-      l.lock();
-      VLOG_WITH_PREFIX_UNLOCKED(2)
-          << "Successfully read " << raw_replicate_ptrs.size() << " ops "
-          << "from disk (" << next_index << ".."
-          << (next_index + raw_replicate_ptrs.size() - 1) << ")";
-
-      for (ReplicateMsg* msg : raw_replicate_ptrs) {
+          log_->reader()->ReadReplicatesInRange(
+              next_index, up_to, remaining_space, &raw_replicate_ptrs),
+          Substitute("failed to read ops $0..$1", next_index, up_to));
+      VLOG_WITH_PREFIX_UNLOCKED(2) <<
+          Substitute("read $0 ops from log ($1..$2)", 
raw_replicate_ptrs.size(),
+          next_index, next_index + raw_replicate_ptrs.size() - 1);
+      for (auto* msg : raw_replicate_ptrs) {
         CHECK_EQ(next_index, msg->id().index());
-
         if (remaining_space > 0) {
           messages->push_back(make_scoped_refptr_replicate(msg));
           remaining_space -= TotalByteSizeForMessage(*msg);
@@ -341,12 +334,13 @@ Status LogCache::ReadOps(int64_t after_op_index,
         }
       }
 
+      // Acquire the lock again before going to the next iteration.
+      l.lock();
     } else {
       // Pull contiguous messages from the cache until the size limit is 
achieved.
       for (; iter != cache_.end(); ++iter) {
         const ReplicateRefPtr& msg = iter->second.msg;
-        int64_t index = msg->get()->id().index();
-        if (index != next_index) {
+        if (next_index != msg->get()->id().index()) {
           continue;
         }
 
@@ -356,7 +350,7 @@ Status LogCache::ReadOps(int64_t after_op_index,
         }
 
         messages->push_back(msg);
-        next_index++;
+        ++next_index;
       }
     }
   }
@@ -379,7 +373,7 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, 
int64_t bytes_to_evic
 
   int64_t bytes_evicted = 0;
   for (auto iter = cache_.begin(); iter != cache_.end();) {
-    const CacheEntry& entry = (*iter).second;
+    const CacheEntry& entry = iter->second;
     const ReplicateRefPtr& msg = entry.msg;
     VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << 
msg->get()->id();
     int64_t msg_index = msg->get()->id().index();

Reply via email to