This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 55256681bac4f59137988e3351f6487f101b1e50
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Tue Nov 22 15:55:48 2022 -0800

    [tablet] a small cleanup on DeltaTracker and around
    
    I was briefly looking at the compaction-related code and noticed a bit
    of room for improvement.  This patch is the result.  It introduces
    a bunch of minor changes to DeltaTracker and compaction-related code:
    
      * updated DeltaTracker::Update() to accept nullptr for the
        OperationResultPB output parameter
      * removed per-block heap allocated instance of OperationResultPB
        in ReupdateMissedDeltas()
      * switched to std::atomic<bool> instead of AtomicBool for the
        DeltaTracker::dms_exists_ field
      * renamed delta_tracker() --> mutable_delta_tracker() for DiskRowSet
      * introduced DiskRowSet::delta_tracker() to return const reference
        (this is going to be used in a follow-up patch)
      * updated RowSetMetadata::Commit{Redo,Undo}DeltaDataBlock() to
        return 'void' instead of 'Status'
      * added a few DCHECK macros to spot inconsistencies
      * updated related code to be more style-compliant
    
    Change-Id: I9e42c72664c07e126f8cce5f071db56c0b4fe48c
    Reviewed-on: http://gerrit.cloudera.org:8080/19269
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Attila Bukor <abu...@apache.org>
---
 src/kudu/cfile/cfile_writer.cc     |  1 +
 src/kudu/cfile/cfile_writer.h      |  2 +-
 src/kudu/tablet/compaction.cc      | 16 +++++-----
 src/kudu/tablet/delta_store.cc     |  7 +++--
 src/kudu/tablet/delta_tracker.cc   | 62 +++++++++++++++++++-------------------
 src/kudu/tablet/delta_tracker.h    | 14 ++++-----
 src/kudu/tablet/diskrowset-test.cc | 20 ++++++------
 src/kudu/tablet/diskrowset.cc      |  9 +++---
 src/kudu/tablet/diskrowset.h       | 13 ++++++--
 src/kudu/tablet/metadata-test.cc   |  2 +-
 src/kudu/tablet/rowset_metadata.cc | 10 +++---
 src/kudu/tablet/rowset_metadata.h  |  8 ++---
 src/kudu/tablet/tablet.cc          |  3 +-
 13 files changed, 86 insertions(+), 81 deletions(-)

diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 12ce6baa3..e0abefe6a 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -366,6 +366,7 @@ Status CFileWriter::FinishCurDataBlock() {
     return Status::OK();
   }
 
+  DCHECK_GE(value_count_, num_elems_in_block);
   rowid_t first_elem_ord = value_count_ - num_elems_in_block;
   VLOG(1) << "Appending data block for values " <<
     first_elem_ord << "-" << value_count_;
diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h
index d9cca13c2..d0c46f936 100644
--- a/src/kudu/cfile/cfile_writer.h
+++ b/src/kudu/cfile/cfile_writer.h
@@ -160,7 +160,7 @@ class CFileWriter {
   // Return the number of values written to the file.
   // This includes NULL cells, but does not include any "raw" blocks
   // appended.
-  int written_value_count() const {
+  uint32_t written_value_count() const {
     return value_count_;
   }
 
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 762c92b08..e38bb7479 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -54,7 +54,6 @@
 #include "kudu/tablet/memrowset.h"
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
-#include "kudu/tablet/tablet.pb.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/fault_injection.h"
@@ -1552,17 +1551,16 @@ Status ReupdateMissedDeltas(const IOContext* io_context,
           RETURN_NOT_OK(cur_drs->CountRows(io_context, &num_rows));
         }
 
-        DeltaTracker* cur_tracker = cur_drs->delta_tracker();
-        unique_ptr<OperationResultPB> result(new OperationResultPB);
+        DeltaTracker* cur_tracker = cur_drs->mutable_delta_tracker();
         DCHECK_LT(idx_in_delta_tracker, num_rows);
-        Status s = cur_tracker->Update(mut->timestamp(),
-                                       idx_in_delta_tracker,
-                                       mut->changelist(),
-                                       max_op_id,
-                                       result.get());
+        const auto s = cur_tracker->Update(mut->timestamp(),
+                                           idx_in_delta_tracker,
+                                           mut->changelist(),
+                                           max_op_id,
+                                           nullptr);
         DCHECK(s.ok()) << "Failed update on compaction for row " << 
output_row_offset
             << " @" << mut->timestamp() << ": " << 
mut->changelist().ToString(*schema);
-        if (s.ok()) {
+        if (PREDICT_TRUE(s.ok())) {
           // Update the set of delta trackers with the one we've just updated.
           InsertIfNotPresent(&updated_trackers, cur_tracker);
         }
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 1687fd649..461f34e22 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -340,7 +340,7 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, 
Slice val, bool* fin
     PreparedDelta d;
     d.key = key;
     d.val = val;
-    prepared_deltas_.emplace_back(d);
+    prepared_deltas_.emplace_back(std::move(d));
   }
 
   if (finished_row_for_apply_or_collect &&
@@ -437,9 +437,10 @@ template<class Traits>
 Status DeltaPreparer<Traits>::CollectMutations(vector<Mutation*>* dst, Arena* 
arena) {
   DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT);
   DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->size());
-  for (const PreparedDelta& src : prepared_deltas_) {
-    DeltaKey key = src.key;
+  for (const auto& src : prepared_deltas_) {
+    const auto& key = src.key;
     RowChangeList changelist(src.val);
+    DCHECK_GE(key.row_idx(), prev_prepared_idx_);
     uint32_t rel_idx = key.row_idx() - prev_prepared_idx_;
 
     Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), 
changelist);
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 4f63e80a6..5a1e86bef 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -26,6 +26,8 @@
 #include <type_traits>
 #include <utility>
 
+#include <boost/iterator/iterator_facade.hpp>
+#include <boost/iterator/reverse_iterator.hpp>
 #include <boost/range/adaptor/reversed.hpp>
 #include <glog/logging.h>
 
@@ -180,7 +182,7 @@ Status DeltaTracker::CreateAndInitDMSUnlocked(const 
fs::IOContext* io_context) {
   RETURN_NOT_OK(dms->Init(io_context));
 
   dms_ = std::move(dms);
-  dms_exists_.Store(true);
+  dms_exists_ = true;
   return Status::OK();
 }
 
@@ -189,7 +191,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const 
IOContext* io_context
                                                      const Schema* projection,
                                                      SharedDeltaStoreVector* 
target_stores,
                                                      vector<BlockId> 
*target_blocks,
-                                                     
std::unique_ptr<DeltaIterator>* out) {
+                                                     
std::unique_ptr<DeltaIterator>* out) const {
   CHECK(open_);
   CHECK_LE(start_idx, end_idx);
   CHECK_LT(end_idx, redo_delta_stores_.size());
@@ -198,12 +200,13 @@ Status 
DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context
   int64_t reinsert_count = 0;
   int64_t update_count = 0;
   for (size_t idx = start_idx; idx <= end_idx; ++idx) {
-    shared_ptr<DeltaStore> &delta_store = redo_delta_stores_[idx];
+    const shared_ptr<DeltaStore>& delta_store = redo_delta_stores_[idx];
 
     // In DEBUG mode, the following asserts that the object is of the right 
type
     // (using RTTI)
     ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
-    shared_ptr<DeltaFileReader> dfr = 
std::static_pointer_cast<DeltaFileReader>(delta_store);
+    shared_ptr<DeltaFileReader> dfr =
+        std::static_pointer_cast<DeltaFileReader>(delta_store);
 
     if (dfr->has_delta_stats()) {
       delete_count += dfr->delta_stats().delete_count();
@@ -223,8 +226,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const 
IOContext* io_context
   RowIteratorOptions opts;
   opts.projection = projection;
   opts.io_context = io_context;
-  RETURN_NOT_OK(DeltaIteratorMerger::Create(inputs, opts, out));
-  return Status::OK();
+  return DeltaIteratorMerger::Create(inputs, opts, out);
 }
 
 namespace {
@@ -626,7 +628,7 @@ void 
DeltaTracker::CollectStores(vector<shared_ptr<DeltaStore>>* deltas,
   }
   if (which != UNDOS_ONLY) {
     deltas->insert(deltas->end(), redo_delta_stores_.begin(), 
redo_delta_stores_.end());
-    if (dms_exists_.Load() && !dms_->Empty()) {
+    if (dms_exists_ && !dms_->Empty()) {
       deltas->push_back(dms_);
     }
   }
@@ -681,15 +683,14 @@ Status DeltaTracker::WrapIterator(const 
shared_ptr<CFileSet::Iterator> &base,
 
 Status DeltaTracker::Update(Timestamp timestamp,
                             rowid_t row_idx,
-                            const RowChangeList &update,
+                            const RowChangeList& update,
                             const consensus::OpId& op_id,
                             OperationResultPB* result) {
-  Status s;
   while (true) {
-    if (!dms_exists_.Load()) {
+    if (!dms_exists_) {
       std::lock_guard<rw_spinlock> lock(component_lock_);
       // Should check dms_exists_ here in case multiple threads are blocked.
-      if (!dms_exists_.Load()) {
+      if (!dms_exists_) {
         RETURN_NOT_OK(CreateAndInitDMSUnlocked(nullptr));
       }
     }
@@ -699,27 +700,27 @@ Status DeltaTracker::Update(Timestamp timestamp,
 
     // Should check dms_exists_ here again since there is a gap
     // between the two critical sections defined by component_lock_.
-    if (!dms_exists_.Load()) continue;
+    if (!dms_exists_) {
+      continue;
+    }
 
-    s = dms_->Update(timestamp, row_idx, update, op_id);
-    if (s.ok()) {
+    auto s = dms_->Update(timestamp, row_idx, update, op_id);
+    if (s.ok() && result != nullptr) {
       MemStoreTargetPB* target = result->add_mutated_stores();
       target->set_rs_id(rowset_metadata_->id());
       target->set_dms_id(dms_->id());
     }
-    break;
+    return s;
   }
-
-  return s;
 }
 
 Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* 
io_context,
-                                     bool *deleted, ProbeStats* stats) const {
+                                     bool* deleted, ProbeStats* stats) const {
   shared_lock<rw_spinlock> lock(component_lock_);
 
   *deleted = false;
   // Check if the row has a deletion in DeltaMemStore.
-  if (dms_exists_.Load()) {
+  if (dms_exists_) {
     RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted));
     if (*deleted) {
       return Status::OK();
@@ -727,8 +728,8 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const 
IOContext* io_contex
   }
 
   // Then check backwards through the list of trackers.
-  for (auto ds = redo_delta_stores_.crbegin(); ds != 
redo_delta_stores_.crend(); ds++) {
-    stats->deltas_consulted++;
+  for (auto ds = redo_delta_stores_.crbegin(); ds != 
redo_delta_stores_.crend(); ++ds) {
+    ++stats->deltas_consulted;
     RETURN_NOT_OK((*ds)->CheckRowDeleted(row_idx, io_context, deleted));
     if (*deleted) {
       return Status::OK();
@@ -787,9 +788,8 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
     // and reset deleted_row_count_ should be atomic, so we lock the
     // component_lock_ in exclusive mode.
     std::lock_guard<rw_spinlock> lock(component_lock_);
-    RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(),
-                                                             
deleted_row_count_,
-                                                             block_id));
+    rowset_metadata_->CommitRedoDeltaDataBlock(
+        dms->id(), deleted_row_count_, block_id);
     deleted_row_count_ = 0;
   }
   if (flush_type == FLUSH_METADATA) {
@@ -814,11 +814,11 @@ Status DeltaTracker::Flush(const IOContext* io_context, 
MetadataFlushType flush_
     // This shuts out any concurrent readers or writers.
     std::lock_guard<rw_spinlock> lock(component_lock_);
 
-    count = dms_exists_.Load() ? dms_->Count() : 0;
+    count = dms_exists_ ? dms_->Count() : 0;
 
     // Swap the DeltaMemStore and dms_ is null now.
     old_dms = std::move(dms_);
-    dms_exists_.Store(false);
+    dms_exists_ = false;
 
     if (count == 0) {
       // No need to flush if there are no deltas.
@@ -867,9 +867,9 @@ bool DeltaTracker::GetDeltaMemStoreInfo(size_t* size_bytes, 
MonoTime* creation_t
   // Check dms_exists_ first to avoid unnecessary contention on
   // component_lock_. We need to check again after taking the lock in case we
   // raced with a DMS flush.
-  if (dms_exists_.Load()) {
+  if (dms_exists_) {
     shared_lock<rw_spinlock> lock(component_lock_);
-    if (dms_exists_.Load()) {
+    if (dms_exists_) {
       *size_bytes = dms_->EstimateSize();
       *creation_time = dms_->creation_time();
       return true;
@@ -880,12 +880,12 @@ bool DeltaTracker::GetDeltaMemStoreInfo(size_t* 
size_bytes, MonoTime* creation_t
 
 size_t DeltaTracker::DeltaMemStoreSize() const {
   shared_lock<rw_spinlock> lock(component_lock_);
-  return dms_exists_.Load() ? dms_->EstimateSize() : 0;
+  return dms_exists_ ? dms_->EstimateSize() : 0;
 }
 
 int64_t DeltaTracker::MinUnflushedLogIndex() const {
   shared_lock<rw_spinlock> lock(component_lock_);
-  return dms_exists_.Load() ? dms_->MinLogIndex() : 0;
+  return dms_exists_ ? dms_->MinLogIndex() : 0;
 }
 
 size_t DeltaTracker::CountUndoDeltaStores() const {
@@ -949,7 +949,7 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores 
stores) {
 int64_t DeltaTracker::CountDeletedRows() const {
   shared_lock<rw_spinlock> lock(component_lock_);
   DCHECK_GE(deleted_row_count_, 0);
-  return deleted_row_count_ + (dms_exists_.Load() ? dms_->deleted_row_count() 
: 0);
+  return deleted_row_count_ + (dms_exists_ ? dms_->deleted_row_count() : 0);
 }
 
 string DeltaTracker::LogPrefix() const {
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index e021ec829..bb3b8277a 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -33,7 +34,6 @@
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/tablet_mem_trackers.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
@@ -139,10 +139,10 @@ class DeltaTracker {
   // Update the given row in the database.
   // Copies the data, as well as any referenced values into a local arena.
   // "result" tracks the status of the update as well as which data
-  // structure(s) it ended up at.
+  // structure(s) it ended up at (optional, might be passed as nullptr).
   Status Update(Timestamp timestamp,
                 rowid_t row_idx,
-                const RowChangeList &update,
+                const RowChangeList& update,
                 const consensus::OpId& op_id,
                 OperationResultPB* result);
 
@@ -151,7 +151,7 @@ class DeltaTracker {
   //
   // Sets *deleted to true if so; otherwise sets it to false.
   Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
-                         bool *deleted, ProbeStats* stats) const;
+                         bool* deleted, ProbeStats* stats) const;
 
   // Compacts all REDO delta files.
   Status Compact(const fs::IOContext* io_context);
@@ -248,7 +248,7 @@ class DeltaTracker {
 
   // Returns true if the DMS doesn't exist. This doesn't rely on the size.
   bool DeltaMemStoreEmpty() const {
-    return !dms_exists_.Load();
+    return !dms_exists_;
   }
 
   // Get the minimum log index for this tracker's DMS, -1 if it wasn't set.
@@ -336,7 +336,7 @@ class DeltaTracker {
                                          size_t start_idx, size_t end_idx, 
const Schema* projection,
                                          
std::vector<std::shared_ptr<DeltaStore>>* target_stores,
                                          std::vector<BlockId>* target_blocks,
-                                         std::unique_ptr<DeltaIterator>* out);
+                                         std::unique_ptr<DeltaIterator>* out) 
const;
 
   std::string LogPrefix() const;
 
@@ -369,7 +369,7 @@ class DeltaTracker {
   // The maintenance scheduler calls DeltaMemStoreEmpty() a lot.
   // We use an atomic variable to indicate whether DMS exists or not and
   // to avoid having to take component_lock_ in order to satisfy this call.
-  AtomicBool dms_exists_;
+  std::atomic<bool> dms_exists_;
 
   // read-write lock protecting dms_ and {redo,undo}_delta_stores_.
   // - Readers take this lock in shared mode.
diff --git a/src/kudu/tablet/diskrowset-test.cc 
b/src/kudu/tablet/diskrowset-test.cc
index a813f0395..37529d363 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -543,18 +543,18 @@ TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) {
   ASSERT_OK(OpenTestRowSet(&rs));
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
   ASSERT_OK(rs->FlushDeltas(nullptr));
-  DeltaTracker *dt = rs->delta_tracker();
-  int num_stores = dt->redo_delta_stores_.size();
+  const DeltaTracker& dt = rs->delta_tracker();
+  size_t num_stores = dt.redo_delta_stores_.size();
+  ASSERT_GT(num_stores, 0);
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
   unique_ptr<DeltaIterator> merge_iter;
-  ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, 
&schema_,
-                                                &compacted_stores,
-                                                &compacted_blocks, 
&merge_iter));
+  ASSERT_OK(dt.MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, 
&schema_,
+                                               &compacted_stores,
+                                               &compacted_blocks, 
&merge_iter));
   vector<string> results;
   ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
-                                          ITERATE_OVER_ALL_ROWS,
-                                          &results));
+                                   ITERATE_OVER_ALL_ROWS, &results));
   for (const string &str : results) {
     VLOG(1) << str;
   }
@@ -611,10 +611,10 @@ TEST_F(TestRowSet, TestCompactStores) {
       RowSet::MAJOR_DELTA_COMPACTION)));
 
   // Compact the deltafiles
-  DeltaTracker *dt = rs->delta_tracker();
+  auto* dt = rs->mutable_delta_tracker();
   int num_stores = dt->redo_delta_stores_.size();
   VLOG(1) << "Number of stores before compaction: " << num_stores;
-  ASSERT_EQ(num_stores, 3);
+  ASSERT_EQ(3, num_stores);
   ASSERT_OK(dt->CompactStores(nullptr, 0, num_stores - 1));
   num_stores = dt->redo_delta_stores_.size();
   VLOG(1) << "Number of stores after compaction: " << num_stores;
@@ -650,7 +650,7 @@ TEST_F(TestRowSet, TestGCAncientStores) {
   WriteTestRowSet();
   shared_ptr<DiskRowSet> rs;
   ASSERT_OK(OpenTestRowSet(&rs));
-  DeltaTracker *dt = rs->delta_tracker();
+  DeltaTracker* dt = rs->mutable_delta_tracker();
   ASSERT_EQ(0, dt->CountUndoDeltaStores());
   ASSERT_EQ(0, dt->CountRedoDeltaStores());
 
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 8d9ca542a..59831a135 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -50,6 +50,7 @@
 #include "kudu/tablet/multi_column_writer.h"
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
+#include "kudu/tablet/rowset_metadata.h"
 #include "kudu/util/compression/compression.pb.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
@@ -573,8 +574,8 @@ Status 
DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
                                                         HistoryGcOpts 
history_gc_opts) {
   VLOG_WITH_PREFIX(1) << "Major compacting REDO delta stores (cols: " << 
col_ids << ")";
   TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
-  std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
-  RETURN_NOT_OK(delta_tracker()->CheckWritableUnlocked());
+  std::lock_guard<Mutex> l(*mutable_delta_tracker()->compact_flush_lock());
+  RETURN_NOT_OK(mutable_delta_tracker()->CheckWritableUnlocked());
 
   // TODO(todd): do we need to lock schema or anything here?
   unique_ptr<MajorDeltaCompaction> compaction;
@@ -704,9 +705,7 @@ Status DiskRowSet::MutateRow(Timestamp timestamp,
     return Status::NotFound("row not found");
   }
 
-  RETURN_NOT_OK(delta_tracker_->Update(timestamp, *row_idx, update, op_id, 
result));
-
-  return Status::OK();
+  return delta_tracker_->Update(timestamp, *row_idx, update, op_id, result);
 }
 
 Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index a6b076f0e..4ebb848b8 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -26,6 +26,7 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include <glog/logging.h>
@@ -40,7 +41,6 @@
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_tracker.h"
 #include "kudu/tablet/rowset.h"
-#include "kudu/tablet/rowset_metadata.h"
 #include "kudu/tablet/tablet_mem_trackers.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/util/bloom_filter.h"
@@ -86,6 +86,7 @@ class MultiColumnWriter;
 class Mutation;
 class MvccSnapshot;
 class OperationResultPB;
+class RowSetMetadata;
 
 class DiskRowSetWriter {
  public:
@@ -430,8 +431,14 @@ class DiskRowSet :
     has_been_compacted_.store(true);
   }
 
-  DeltaTracker *delta_tracker() {
-    return DCHECK_NOTNULL(delta_tracker_.get());
+  DeltaTracker* mutable_delta_tracker() {
+    DCHECK(delta_tracker_);
+    return delta_tracker_.get();
+  }
+
+  const DeltaTracker& delta_tracker() const {
+    DCHECK(delta_tracker_);
+    return *delta_tracker_;
   }
 
   std::shared_ptr<RowSetMetadata> metadata() override {
diff --git a/src/kudu/tablet/metadata-test.cc b/src/kudu/tablet/metadata-test.cc
index ad538b88d..4d271ebb2 100644
--- a/src/kudu/tablet/metadata-test.cc
+++ b/src/kudu/tablet/metadata-test.cc
@@ -46,7 +46,7 @@ class MetadataTest : public KuduTest {
     tablet_meta_ = new TabletMetadata(nullptr, "fake-tablet");
     CHECK_OK(RowSetMetadata::CreateNew(tablet_meta_.get(), 0, &meta_));
     for (int i = 0; i < all_blocks_.size(); i++) {
-      CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i]));
+      meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i]);
     }
     CHECK_EQ(4, meta_->redo_delta_blocks().size());
   }
diff --git a/src/kudu/tablet/rowset_metadata.cc 
b/src/kudu/tablet/rowset_metadata.cc
index 718710e31..56746bc84 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -186,20 +186,18 @@ void RowSetMetadata::SetColumnDataBlocks(const 
std::map<ColumnId, BlockId>& bloc
   blocks_by_col_id_ = std::move(new_map);
 }
 
-Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id,
-                                                int64_t num_deleted_rows,
-                                                const BlockId& block_id) {
+void RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id,
+                                              int64_t num_deleted_rows,
+                                              const BlockId& block_id) {
   std::lock_guard<LockType> l(lock_);
   last_durable_redo_dms_id_ = dms_id;
   redo_delta_blocks_.push_back(block_id);
   IncrementLiveRowsUnlocked(-num_deleted_rows);
-  return Status::OK();
 }
 
-Status RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) {
+void RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) {
   std::lock_guard<LockType> l(lock_);
   undo_delta_blocks_.push_back(block_id);
-  return Status::OK();
 }
 
 void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
diff --git a/src/kudu/tablet/rowset_metadata.h 
b/src/kudu/tablet/rowset_metadata.h
index 85efd9d6b..c6f240a4a 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -120,11 +120,11 @@ class RowSetMetadata {
 
   // Atomically commit the new redo delta block to RowSetMetadata.
   // This atomic operation includes updates to last_durable_redo_dms_id_ and 
live_row_count_.
-  Status CommitRedoDeltaDataBlock(int64_t dms_id,
-                                  int64_t num_deleted_rows,
-                                  const BlockId& block_id);
+  void CommitRedoDeltaDataBlock(int64_t dms_id,
+                                int64_t num_deleted_rows,
+                                const BlockId& block_id);
 
-  Status CommitUndoDeltaDataBlock(const BlockId& block_id);
+  void CommitUndoDeltaDataBlock(const BlockId& block_id);
 
   bool has_encoded_keys_unlocked() const {
     return min_encoded_key_ && max_encoded_key_;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 18ca96089..364eaa4ac 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2600,7 +2600,8 @@ Status Tablet::MajorCompactAllDeltaStoresForTests() {
   for (const auto& rs : comps->rowsets->all_rowsets()) {
     if (!rs->IsAvailableForCompaction()) continue;
     DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
-    
RETURN_NOT_OK(drs->delta_tracker()->InitAllDeltaStoresForTests(DeltaTracker::REDOS_ONLY));
+    RETURN_NOT_OK(drs->mutable_delta_tracker()->InitAllDeltaStoresForTests(
+        DeltaTracker::REDOS_ONLY));
     RETURN_NOT_OK_PREPEND(drs->MajorCompactDeltaStores(&io_context, 
GetHistoryGcOpts()),
                           "Failed major delta compaction on " + 
rs->ToString());
   }

Reply via email to