This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 0741fd3a3b6db561465840192888072762a0a926 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Fri Dec 10 22:25:59 2021 -0800 [consensus] reduce critical section in LogCache::ReadOps() This patch reduces the section protected by the lock in LogCache::ReadOps(), moving out of the critical section several operations which do not need such a protection. I didn't find an existing performance test which would exercise the updated code path, but reducing the time spent under a lock looks like an improvement. Change-Id: Ia7de32eb8adde242c8047720e062535cbdaf3e39 Reviewed-on: http://gerrit.cloudera.org:8080/18090 Reviewed-by: Andrew Wong <aw...@cloudera.com> Tested-by: Kudu Jenkins --- src/kudu/consensus/consensus_queue-test.cc | 12 ++++++++---- src/kudu/consensus/log_cache.cc | 30 ++++++++++++------------------ 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc index 5b6f359..139a2f2 100644 --- a/src/kudu/consensus/consensus_queue-test.cc +++ b/src/kudu/consensus/consensus_queue-test.cc @@ -57,6 +57,7 @@ #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/pb_util.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" @@ -496,6 +497,11 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) { // result in async log reads instead of cache hits. AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 100); + SCOPED_CLEANUP({ + // Extract the ops from the request to avoid double free. + request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr); + }); + OpId last; for (int i = 0; i < 11; i++) { VLOG(1) << "Making request " << i; @@ -504,7 +510,8 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) { ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy)); ASSERT_FALSE(needs_tablet_copy); ASSERT_EQ(kOpsPerRequest, request.ops_size()); - last = request.ops(request.ops_size() -1).id(); + ASSERT_GE(request.ops_size(), 1); + last = request.ops(request.ops_size() - 1).id(); SetLastReceivedAndLastCommitted(&response, last); VLOG(1) << "Faking received up through " << last; send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response); @@ -519,9 +526,6 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) { SetLastReceivedAndLastCommitted(&response, last); send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response); ASSERT_FALSE(send_more_immediately); - - // extract the ops from the request to avoid double free - request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr); } TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) { diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc index 83b06f0..24281cf 100644 --- a/src/kudu/consensus/log_cache.cc +++ b/src/kudu/consensus/log_cache.cc @@ -77,8 +77,6 @@ METRIC_DEFINE_gauge_int64(tablet, log_cache_size, "Log Cache Memory Usage", static const char kParentMemTrackerId[] = "log_cache"; -typedef vector<const ReplicateMsg*>::const_iterator MsgIter; - LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity, scoped_refptr<log::Log> log, string local_uuid, @@ -315,23 +313,18 @@ Status LogCache::ReadOps(int64_t after_op_index, // Read up to the next entry that's in the cache up_to = iter->first - 1; } - l.unlock(); vector<ReplicateMsg*> raw_replicate_ptrs; RETURN_NOT_OK_PREPEND( - log_->reader()->ReadReplicatesInRange( - next_index, up_to, remaining_space, &raw_replicate_ptrs), - Substitute("Failed to read ops $0..$1", next_index, up_to)); - l.lock(); - VLOG_WITH_PREFIX_UNLOCKED(2) - << "Successfully read " << raw_replicate_ptrs.size() << " ops " - << "from disk (" << next_index << ".." - << (next_index + raw_replicate_ptrs.size() - 1) << ")"; - - for (ReplicateMsg* msg : raw_replicate_ptrs) { + log_->reader()->ReadReplicatesInRange( + next_index, up_to, remaining_space, &raw_replicate_ptrs), + Substitute("failed to read ops $0..$1", next_index, up_to)); + VLOG_WITH_PREFIX_UNLOCKED(2) << + Substitute("read $0 ops from log ($1..$2)", raw_replicate_ptrs.size(), + next_index, next_index + raw_replicate_ptrs.size() - 1); + for (auto* msg : raw_replicate_ptrs) { CHECK_EQ(next_index, msg->id().index()); - if (remaining_space > 0) { messages->push_back(make_scoped_refptr_replicate(msg)); remaining_space -= TotalByteSizeForMessage(*msg); @@ -341,12 +334,13 @@ Status LogCache::ReadOps(int64_t after_op_index, } } + // Acquire the lock again before going to the next iteration. + l.lock(); } else { // Pull contiguous messages from the cache until the size limit is achieved. for (; iter != cache_.end(); ++iter) { const ReplicateRefPtr& msg = iter->second.msg; - int64_t index = msg->get()->id().index(); - if (index != next_index) { + if (next_index != msg->get()->id().index()) { continue; } @@ -356,7 +350,7 @@ Status LogCache::ReadOps(int64_t after_op_index, } messages->push_back(msg); - next_index++; + ++next_index; } } } @@ -379,7 +373,7 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic int64_t bytes_evicted = 0; for (auto iter = cache_.begin(); iter != cache_.end();) { - const CacheEntry& entry = (*iter).second; + const CacheEntry& entry = iter->second; const ReplicateRefPtr& msg = entry.msg; VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id(); int64_t msg_index = msg->get()->id().index();