This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d0243afe2ed93aeb18e318068df1bc02de72ad1a Author: Alexey Serbin <ale...@apache.org> AuthorDate: Thu Dec 9 19:49:23 2021 -0800 [consensus] minor clean-up on LogCache Since I was looking a bit into the code of the LogCache cache, I went ahead and did a minor clean-up here, such as * removing unused code * fixing code style * simplifying the going-over-max_size_bytes condition in ReadOps(), making sure the regression test for KUDU-1586 passes * fixing signed/unsigned comparison warning for a Raft op's index and the index of the corresponding entry in the cache * other unsorted minor updates Change-Id: I48f60c44209e269eb6b00278c6e32d4398ef9a55 Reviewed-on: http://gerrit.cloudera.org:8080/18081 Reviewed-by: Andrew Wong <aw...@cloudera.com> Tested-by: Alexey Serbin <aser...@cloudera.com> --- src/kudu/consensus/log_cache.cc | 84 ++++++++++++++++++----------------------- src/kudu/consensus/log_cache.h | 20 +++++----- 2 files changed, 46 insertions(+), 58 deletions(-) diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc index 67cf6f4..83b06f0 100644 --- a/src/kudu/consensus/log_cache.cc +++ b/src/kudu/consensus/log_cache.cc @@ -83,13 +83,12 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity, scoped_refptr<log::Log> log, string local_uuid, string tablet_id) - : log_(std::move(log)), - local_uuid_(std::move(local_uuid)), - tablet_id_(std::move(tablet_id)), - next_sequential_op_index_(0), - min_pinned_op_index_(0), - metrics_(metric_entity) { - + : log_(std::move(log)), + local_uuid_(std::move(local_uuid)), + tablet_id_(std::move(tablet_id)), + next_sequential_op_index_(0), + min_pinned_op_index_(0), + metrics_(metric_entity) { const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1024L * 1024L; const int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1024L * 1024L; @@ -107,9 +106,12 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity, // Put a fake message at index 0, since this simplifies a lot of our // code paths elsewhere. - auto zero_op = new ReplicateMsg(); + auto zero_op = new ReplicateMsg; *zero_op->mutable_id() = MinimumOpId(); - InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() }); + DCHECK_EQ(kZeroOpIdx, zero_op->id().index()); + InsertOrDie(&cache_, + kZeroOpIdx, + { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() }); } LogCache::~LogCache() { @@ -119,8 +121,7 @@ LogCache::~LogCache() { void LogCache::Init(const OpId& preceding_op) { std::lock_guard<simple_spinlock> l(lock_); - CHECK_EQ(cache_.size(), 1) - << "Cache should have only our special '0' op"; + CHECK_EQ(1, cache_.size()) << "cache should have only special '0' op"; next_sequential_op_index_ = preceding_op.index() + 1; min_pinned_op_index_ = next_sequential_op_index_; } @@ -162,8 +163,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs, entries_to_insert.emplace_back(std::move(e)); } - int64_t first_idx_in_batch = msgs.front()->get()->id().index(); - int64_t last_idx_in_batch = msgs.back()->get()->id().index(); + const int64_t first_idx_in_batch = msgs.front()->get()->id().index(); + const int64_t last_idx_in_batch = msgs.back()->get()->id().index(); std::unique_lock<simple_spinlock> l(lock_); // If we're not appending a consecutive op we're likely overwriting and @@ -175,8 +176,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs, // Try to consume the memory. If it can't be consumed, we may need to evict. bool borrowed_memory = false; if (!tracker_->TryConsume(mem_required)) { - int spare = tracker_->SpareCapacity(); - int need_to_free = mem_required - spare; + auto spare = tracker_->SpareCapacity(); + auto need_to_free = mem_required - spare; VLOG_WITH_PREFIX_UNLOCKED(1) << "Memory limit would be exceeded trying to append " << HumanReadableNumBytes::ToString(mem_required) << " to log cache (available=" @@ -188,8 +189,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs, EvictSomeUnlocked(min_pinned_op_index_, need_to_free); // Force consuming, so that we don't refuse appending data. We might - // blow past our limit a little bit (as much as the number of tablets times - // the amount of in-flight data in the log), but until implementing the above TODO, + // blow past our limit as much as the number of tablets times the amount + // of in-flight data in the log, but until implementing the above TODO, // it's difficult to solve this issue. tracker_->Consume(mem_required); @@ -283,9 +284,9 @@ namespace { // length delimiting and tagging of the message. int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) { int64_t msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize( - msg.ByteSizeLong()); - msg_size += 1; // for the type tag - return msg_size; + msg.ByteSizeLong()); + // Add an extra byte for the type tag. + return msg_size + 1; } } // anonymous namespace @@ -296,13 +297,12 @@ Status LogCache::ReadOps(int64_t after_op_index, DCHECK_GE(after_op_index, 0); RETURN_NOT_OK(LookupOpId(after_op_index, preceding_op)); - std::unique_lock<simple_spinlock> l(lock_); - int64_t next_index = after_op_index + 1; - // Return as many operations as we can, up to the limit int64_t remaining_space = max_size_bytes; - while (remaining_space > 0 && next_index < next_sequential_op_index_) { + int64_t next_index = after_op_index + 1; + std::unique_lock<simple_spinlock> l(lock_); + while (remaining_space > 0 && next_index < next_sequential_op_index_) { // If the messages the peer needs haven't been loaded into the queue yet, // load them. MessageCache::const_iterator iter = cache_.lower_bound(next_index); @@ -332,10 +332,10 @@ Status LogCache::ReadOps(int64_t after_op_index, for (ReplicateMsg* msg : raw_replicate_ptrs) { CHECK_EQ(next_index, msg->id().index()); - remaining_space -= TotalByteSizeForMessage(*msg); - if (remaining_space > 0 || messages->empty()) { + if (remaining_space > 0) { messages->push_back(make_scoped_refptr_replicate(msg)); - next_index++; + remaining_space -= TotalByteSizeForMessage(*msg); + ++next_index; } else { delete msg; } @@ -383,7 +383,7 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic const ReplicateRefPtr& msg = entry.msg; VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id(); int64_t msg_index = msg->get()->id().index(); - if (msg_index == 0) { + if (msg_index == kZeroOpIdx) { // Always keep our special '0' op. ++iter; continue; @@ -445,31 +445,21 @@ std::string LogCache::ToStringUnlocked() const { } std::string LogCache::LogPrefixUnlocked() const { - return Substitute("T $0 P $1: ", - tablet_id_, - local_uuid_); -} - -void LogCache::DumpToLog() const { - vector<string> strings; - DumpToStrings(&strings); - for (const string& s : strings) { - LOG_WITH_PREFIX_UNLOCKED(INFO) << s; - } + return Substitute("T $0 P $1: ", tablet_id_, local_uuid_); } void LogCache::DumpToStrings(vector<string>* lines) const { std::lock_guard<simple_spinlock> lock(lock_); - int counter = 0; + lines->reserve(cache_.size() + 2); lines->push_back(ToStringUnlocked()); lines->push_back("Messages:"); + size_t counter = 0; for (const auto& entry : cache_) { - const ReplicateMsg* msg = entry.second.msg->get(); + const auto* msg = entry.second.msg->get(); lines->push_back( - Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4", - counter++, msg->id().term(), msg->id().index(), - OperationType_Name(msg->op_type()), - msg->ByteSizeLong())); + Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4", + counter++, msg->id().term(), msg->id().index(), + OperationType_Name(msg->op_type()), msg->ByteSizeLong())); } } @@ -496,8 +486,8 @@ void LogCache::DumpToHtml(std::ostream& out) const { #define INSTANTIATE_METRIC(x) \ x.Instantiate(metric_entity, 0) LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity) - : log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)), - log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) { + : log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)), + log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) { } #undef INSTANTIATE_METRIC diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h index 9091308..812f50e 100644 --- a/src/kudu/consensus/log_cache.h +++ b/src/kudu/consensus/log_cache.h @@ -14,8 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_CONSENSUS_LOG_CACHE_H -#define KUDU_CONSENSUS_LOG_CACHE_H +#pragma once #include <cstddef> #include <cstdint> @@ -80,7 +79,7 @@ class LogCache { // The OpId which precedes the returned ops is returned in *preceding_op. // The index of this OpId will match 'after_op_index'. // - // If the ops being requested are not available in the log, this will synchronously + // If the ops being requested are not available in the cache, this will synchronously // read these ops from disk. Therefore, this function may take a substantial amount // of time and should not be called with important locks held, etc. Status ReadOps(int64_t after_op_index, @@ -103,7 +102,7 @@ class LogCache { // Following this, reads of truncated indexes using ReadOps(), LookupOpId(), // HasOpBeenWritten(), etc, will return as if the operations were never appended. // - // NOTE: unless a new operation is appended followig 'index', this truncation does + // NOTE: unless a new operation is appended following 'index', this truncation does // not persist across server restarts. void TruncateOpsAfter(int64_t index); @@ -122,9 +121,6 @@ class LogCache { return metrics_.log_cache_num_ops->value(); } - // Dump the current contents of the cache to the log. - void DumpToLog() const; - // Dumps the contents of the cache to the provided string vector. void DumpToStrings(std::vector<std::string>* lines) const; @@ -150,6 +146,9 @@ class LogCache { FRIEND_TEST(LogCacheTest, TestTruncation); friend class LogCacheTest; + // Index of the special 'zero-op' entry in the cache. + static constexpr const int64_t kZeroOpIdx = 0; + // An entry in the cache. struct CacheEntry { ReplicateRefPtr msg; @@ -193,7 +192,7 @@ class LogCache { // An ordered map that serves as the buffer for the cached messages. // Maps from log index -> ReplicateMsg - typedef std::map<uint64_t, CacheEntry> MessageCache; + typedef std::map<int64_t, CacheEntry> MessageCache; MessageCache cache_; // The next log index to append. Each append operation must either @@ -223,10 +222,10 @@ class LogCache { explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity); // Keeps track of the total number of operations in the cache. - scoped_refptr<AtomicGauge<int64_t> > log_cache_num_ops; + scoped_refptr<AtomicGauge<int64_t>> log_cache_num_ops; // Keeps track of the memory consumed by the cache, in bytes. - scoped_refptr<AtomicGauge<int64_t> > log_cache_size; + scoped_refptr<AtomicGauge<int64_t>> log_cache_size; }; Metrics metrics_; @@ -235,4 +234,3 @@ class LogCache { } // namespace consensus } // namespace kudu -#endif /* KUDU_CONSENSUS_LOG_CACHE_H */