This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 55256681bac4f59137988e3351f6487f101b1e50 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Tue Nov 22 15:55:48 2022 -0800 [tablet] a small cleanup on DeltaTracker and around I was briefly looking at the compaction-related code and noticed a bit of room for improvement. This patch is the result. It introduces a bunch of minor changes to DeltaTracker and compaction-related code: * updated DeltaTracker::Update() to accept nullptr for the OperationResultPB output parameter * removed per-block heap allocated instance of OperationResultPB in ReupdateMissedDeltas() * switched to std::atomic<bool> instead of AtomicBool for the DeltaTracker::dms_exists_ field * renamed delta_tracker() --> mutable_delta_tracker() for DiskRowSet * introduced DiskRowSet::delta_tracker() to return const reference (this is going to be used in a follow-up patch) * updated RowSetMetadata::Commit{Redo,Undo}DeltaDataBlock() to return 'void' instead of 'Status' * added a few DCHECK macros to spot inconsistencies * updated related code to be more style-compliant Change-Id: I9e42c72664c07e126f8cce5f071db56c0b4fe48c Reviewed-on: http://gerrit.cloudera.org:8080/19269 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Attila Bukor <abu...@apache.org> --- src/kudu/cfile/cfile_writer.cc | 1 + src/kudu/cfile/cfile_writer.h | 2 +- src/kudu/tablet/compaction.cc | 16 +++++----- src/kudu/tablet/delta_store.cc | 7 +++-- src/kudu/tablet/delta_tracker.cc | 62 +++++++++++++++++++------------------- src/kudu/tablet/delta_tracker.h | 14 ++++----- src/kudu/tablet/diskrowset-test.cc | 20 ++++++------ src/kudu/tablet/diskrowset.cc | 9 +++--- src/kudu/tablet/diskrowset.h | 13 ++++++-- src/kudu/tablet/metadata-test.cc | 2 +- src/kudu/tablet/rowset_metadata.cc | 10 +++--- src/kudu/tablet/rowset_metadata.h | 8 ++--- src/kudu/tablet/tablet.cc | 3 +- 13 files changed, 86 insertions(+), 81 deletions(-) diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc index 12ce6baa3..e0abefe6a 100644 --- a/src/kudu/cfile/cfile_writer.cc +++ b/src/kudu/cfile/cfile_writer.cc @@ -366,6 +366,7 @@ Status CFileWriter::FinishCurDataBlock() { return Status::OK(); } + DCHECK_GE(value_count_, num_elems_in_block); rowid_t first_elem_ord = value_count_ - num_elems_in_block; VLOG(1) << "Appending data block for values " << first_elem_ord << "-" << value_count_; diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h index d9cca13c2..d0c46f936 100644 --- a/src/kudu/cfile/cfile_writer.h +++ b/src/kudu/cfile/cfile_writer.h @@ -160,7 +160,7 @@ class CFileWriter { // Return the number of values written to the file. // This includes NULL cells, but does not include any "raw" blocks // appended. - int written_value_count() const { + uint32_t written_value_count() const { return value_count_; } diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index 762c92b08..e38bb7479 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -54,7 +54,6 @@ #include "kudu/tablet/memrowset.h" #include "kudu/tablet/mutation.h" #include "kudu/tablet/mvcc.h" -#include "kudu/tablet/tablet.pb.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/faststring.h" #include "kudu/util/fault_injection.h" @@ -1552,17 +1551,16 @@ Status ReupdateMissedDeltas(const IOContext* io_context, RETURN_NOT_OK(cur_drs->CountRows(io_context, &num_rows)); } - DeltaTracker* cur_tracker = cur_drs->delta_tracker(); - unique_ptr<OperationResultPB> result(new OperationResultPB); + DeltaTracker* cur_tracker = cur_drs->mutable_delta_tracker(); DCHECK_LT(idx_in_delta_tracker, num_rows); - Status s = cur_tracker->Update(mut->timestamp(), - idx_in_delta_tracker, - mut->changelist(), - max_op_id, - result.get()); + const auto s = cur_tracker->Update(mut->timestamp(), + idx_in_delta_tracker, + mut->changelist(), + max_op_id, + nullptr); DCHECK(s.ok()) << "Failed update on compaction for row " << output_row_offset << " @" << mut->timestamp() << ": " << mut->changelist().ToString(*schema); - if (s.ok()) { + if (PREDICT_TRUE(s.ok())) { // Update the set of delta trackers with the one we've just updated. InsertIfNotPresent(&updated_trackers, cur_tracker); } diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc index 1687fd649..461f34e22 100644 --- a/src/kudu/tablet/delta_store.cc +++ b/src/kudu/tablet/delta_store.cc @@ -340,7 +340,7 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin PreparedDelta d; d.key = key; d.val = val; - prepared_deltas_.emplace_back(d); + prepared_deltas_.emplace_back(std::move(d)); } if (finished_row_for_apply_or_collect && @@ -437,9 +437,10 @@ template<class Traits> Status DeltaPreparer<Traits>::CollectMutations(vector<Mutation*>* dst, Arena* arena) { DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT); DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->size()); - for (const PreparedDelta& src : prepared_deltas_) { - DeltaKey key = src.key; + for (const auto& src : prepared_deltas_) { + const auto& key = src.key; RowChangeList changelist(src.val); + DCHECK_GE(key.row_idx(), prev_prepared_idx_); uint32_t rel_idx = key.row_idx() - prev_prepared_idx_; Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), changelist); diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc index 4f63e80a6..5a1e86bef 100644 --- a/src/kudu/tablet/delta_tracker.cc +++ b/src/kudu/tablet/delta_tracker.cc @@ -26,6 +26,8 @@ #include <type_traits> #include <utility> +#include <boost/iterator/iterator_facade.hpp> +#include <boost/iterator/reverse_iterator.hpp> #include <boost/range/adaptor/reversed.hpp> #include <glog/logging.h> @@ -180,7 +182,7 @@ Status DeltaTracker::CreateAndInitDMSUnlocked(const fs::IOContext* io_context) { RETURN_NOT_OK(dms->Init(io_context)); dms_ = std::move(dms); - dms_exists_.Store(true); + dms_exists_ = true; return Status::OK(); } @@ -189,7 +191,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context const Schema* projection, SharedDeltaStoreVector* target_stores, vector<BlockId> *target_blocks, - std::unique_ptr<DeltaIterator>* out) { + std::unique_ptr<DeltaIterator>* out) const { CHECK(open_); CHECK_LE(start_idx, end_idx); CHECK_LT(end_idx, redo_delta_stores_.size()); @@ -198,12 +200,13 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context int64_t reinsert_count = 0; int64_t update_count = 0; for (size_t idx = start_idx; idx <= end_idx; ++idx) { - shared_ptr<DeltaStore> &delta_store = redo_delta_stores_[idx]; + const shared_ptr<DeltaStore>& delta_store = redo_delta_stores_[idx]; // In DEBUG mode, the following asserts that the object is of the right type // (using RTTI) ignore_result(down_cast<DeltaFileReader*>(delta_store.get())); - shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store); + shared_ptr<DeltaFileReader> dfr = + std::static_pointer_cast<DeltaFileReader>(delta_store); if (dfr->has_delta_stats()) { delete_count += dfr->delta_stats().delete_count(); @@ -223,8 +226,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context RowIteratorOptions opts; opts.projection = projection; opts.io_context = io_context; - RETURN_NOT_OK(DeltaIteratorMerger::Create(inputs, opts, out)); - return Status::OK(); + return DeltaIteratorMerger::Create(inputs, opts, out); } namespace { @@ -626,7 +628,7 @@ void DeltaTracker::CollectStores(vector<shared_ptr<DeltaStore>>* deltas, } if (which != UNDOS_ONLY) { deltas->insert(deltas->end(), redo_delta_stores_.begin(), redo_delta_stores_.end()); - if (dms_exists_.Load() && !dms_->Empty()) { + if (dms_exists_ && !dms_->Empty()) { deltas->push_back(dms_); } } @@ -681,15 +683,14 @@ Status DeltaTracker::WrapIterator(const shared_ptr<CFileSet::Iterator> &base, Status DeltaTracker::Update(Timestamp timestamp, rowid_t row_idx, - const RowChangeList &update, + const RowChangeList& update, const consensus::OpId& op_id, OperationResultPB* result) { - Status s; while (true) { - if (!dms_exists_.Load()) { + if (!dms_exists_) { std::lock_guard<rw_spinlock> lock(component_lock_); // Should check dms_exists_ here in case multiple threads are blocked. - if (!dms_exists_.Load()) { + if (!dms_exists_) { RETURN_NOT_OK(CreateAndInitDMSUnlocked(nullptr)); } } @@ -699,27 +700,27 @@ Status DeltaTracker::Update(Timestamp timestamp, // Should check dms_exists_ here again since there is a gap // between the two critical sections defined by component_lock_. - if (!dms_exists_.Load()) continue; + if (!dms_exists_) { + continue; + } - s = dms_->Update(timestamp, row_idx, update, op_id); - if (s.ok()) { + auto s = dms_->Update(timestamp, row_idx, update, op_id); + if (s.ok() && result != nullptr) { MemStoreTargetPB* target = result->add_mutated_stores(); target->set_rs_id(rowset_metadata_->id()); target->set_dms_id(dms_->id()); } - break; + return s; } - - return s; } Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* io_context, - bool *deleted, ProbeStats* stats) const { + bool* deleted, ProbeStats* stats) const { shared_lock<rw_spinlock> lock(component_lock_); *deleted = false; // Check if the row has a deletion in DeltaMemStore. - if (dms_exists_.Load()) { + if (dms_exists_) { RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted)); if (*deleted) { return Status::OK(); @@ -727,8 +728,8 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* io_contex } // Then check backwards through the list of trackers. - for (auto ds = redo_delta_stores_.crbegin(); ds != redo_delta_stores_.crend(); ds++) { - stats->deltas_consulted++; + for (auto ds = redo_delta_stores_.crbegin(); ds != redo_delta_stores_.crend(); ++ds) { + ++stats->deltas_consulted; RETURN_NOT_OK((*ds)->CheckRowDeleted(row_idx, io_context, deleted)); if (*deleted) { return Status::OK(); @@ -787,9 +788,8 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms, // and reset deleted_row_count_ should be atomic, so we lock the // component_lock_ in exclusive mode. std::lock_guard<rw_spinlock> lock(component_lock_); - RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), - deleted_row_count_, - block_id)); + rowset_metadata_->CommitRedoDeltaDataBlock( + dms->id(), deleted_row_count_, block_id); deleted_row_count_ = 0; } if (flush_type == FLUSH_METADATA) { @@ -814,11 +814,11 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_ // This shuts out any concurrent readers or writers. std::lock_guard<rw_spinlock> lock(component_lock_); - count = dms_exists_.Load() ? dms_->Count() : 0; + count = dms_exists_ ? dms_->Count() : 0; // Swap the DeltaMemStore and dms_ is null now. old_dms = std::move(dms_); - dms_exists_.Store(false); + dms_exists_ = false; if (count == 0) { // No need to flush if there are no deltas. @@ -867,9 +867,9 @@ bool DeltaTracker::GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_t // Check dms_exists_ first to avoid unnecessary contention on // component_lock_. We need to check again after taking the lock in case we // raced with a DMS flush. - if (dms_exists_.Load()) { + if (dms_exists_) { shared_lock<rw_spinlock> lock(component_lock_); - if (dms_exists_.Load()) { + if (dms_exists_) { *size_bytes = dms_->EstimateSize(); *creation_time = dms_->creation_time(); return true; @@ -880,12 +880,12 @@ bool DeltaTracker::GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_t size_t DeltaTracker::DeltaMemStoreSize() const { shared_lock<rw_spinlock> lock(component_lock_); - return dms_exists_.Load() ? dms_->EstimateSize() : 0; + return dms_exists_ ? dms_->EstimateSize() : 0; } int64_t DeltaTracker::MinUnflushedLogIndex() const { shared_lock<rw_spinlock> lock(component_lock_); - return dms_exists_.Load() ? dms_->MinLogIndex() : 0; + return dms_exists_ ? dms_->MinLogIndex() : 0; } size_t DeltaTracker::CountUndoDeltaStores() const { @@ -949,7 +949,7 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) { int64_t DeltaTracker::CountDeletedRows() const { shared_lock<rw_spinlock> lock(component_lock_); DCHECK_GE(deleted_row_count_, 0); - return deleted_row_count_ + (dms_exists_.Load() ? dms_->deleted_row_count() : 0); + return deleted_row_count_ + (dms_exists_ ? dms_->deleted_row_count() : 0); } string DeltaTracker::LogPrefix() const { diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h index e021ec829..bb3b8277a 100644 --- a/src/kudu/tablet/delta_tracker.h +++ b/src/kudu/tablet/delta_tracker.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include <atomic> #include <cstddef> #include <cstdint> #include <memory> @@ -33,7 +34,6 @@ #include "kudu/tablet/delta_stats.h" #include "kudu/tablet/delta_store.h" #include "kudu/tablet/tablet_mem_trackers.h" -#include "kudu/util/atomic.h" #include "kudu/util/locks.h" #include "kudu/util/mutex.h" #include "kudu/util/status.h" @@ -139,10 +139,10 @@ class DeltaTracker { // Update the given row in the database. // Copies the data, as well as any referenced values into a local arena. // "result" tracks the status of the update as well as which data - // structure(s) it ended up at. + // structure(s) it ended up at (optional, might be passed as nullptr). Status Update(Timestamp timestamp, rowid_t row_idx, - const RowChangeList &update, + const RowChangeList& update, const consensus::OpId& op_id, OperationResultPB* result); @@ -151,7 +151,7 @@ class DeltaTracker { // // Sets *deleted to true if so; otherwise sets it to false. Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context, - bool *deleted, ProbeStats* stats) const; + bool* deleted, ProbeStats* stats) const; // Compacts all REDO delta files. Status Compact(const fs::IOContext* io_context); @@ -248,7 +248,7 @@ class DeltaTracker { // Returns true if the DMS doesn't exist. This doesn't rely on the size. bool DeltaMemStoreEmpty() const { - return !dms_exists_.Load(); + return !dms_exists_; } // Get the minimum log index for this tracker's DMS, -1 if it wasn't set. @@ -336,7 +336,7 @@ class DeltaTracker { size_t start_idx, size_t end_idx, const Schema* projection, std::vector<std::shared_ptr<DeltaStore>>* target_stores, std::vector<BlockId>* target_blocks, - std::unique_ptr<DeltaIterator>* out); + std::unique_ptr<DeltaIterator>* out) const; std::string LogPrefix() const; @@ -369,7 +369,7 @@ class DeltaTracker { // The maintenance scheduler calls DeltaMemStoreEmpty() a lot. // We use an atomic variable to indicate whether DMS exists or not and // to avoid having to take component_lock_ in order to satisfy this call. - AtomicBool dms_exists_; + std::atomic<bool> dms_exists_; // read-write lock protecting dms_ and {redo,undo}_delta_stores_. // - Readers take this lock in shared mode. diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc index a813f0395..37529d363 100644 --- a/src/kudu/tablet/diskrowset-test.cc +++ b/src/kudu/tablet/diskrowset-test.cc @@ -543,18 +543,18 @@ TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) { ASSERT_OK(OpenTestRowSet(&rs)); UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr); ASSERT_OK(rs->FlushDeltas(nullptr)); - DeltaTracker *dt = rs->delta_tracker(); - int num_stores = dt->redo_delta_stores_.size(); + const DeltaTracker& dt = rs->delta_tracker(); + size_t num_stores = dt.redo_delta_stores_.size(); + ASSERT_GT(num_stores, 0); vector<shared_ptr<DeltaStore> > compacted_stores; vector<BlockId> compacted_blocks; unique_ptr<DeltaIterator> merge_iter; - ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, &schema_, - &compacted_stores, - &compacted_blocks, &merge_iter)); + ASSERT_OK(dt.MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, &schema_, + &compacted_stores, + &compacted_blocks, &merge_iter)); vector<string> results; ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_, - ITERATE_OVER_ALL_ROWS, - &results)); + ITERATE_OVER_ALL_ROWS, &results)); for (const string &str : results) { VLOG(1) << str; } @@ -611,10 +611,10 @@ TEST_F(TestRowSet, TestCompactStores) { RowSet::MAJOR_DELTA_COMPACTION))); // Compact the deltafiles - DeltaTracker *dt = rs->delta_tracker(); + auto* dt = rs->mutable_delta_tracker(); int num_stores = dt->redo_delta_stores_.size(); VLOG(1) << "Number of stores before compaction: " << num_stores; - ASSERT_EQ(num_stores, 3); + ASSERT_EQ(3, num_stores); ASSERT_OK(dt->CompactStores(nullptr, 0, num_stores - 1)); num_stores = dt->redo_delta_stores_.size(); VLOG(1) << "Number of stores after compaction: " << num_stores; @@ -650,7 +650,7 @@ TEST_F(TestRowSet, TestGCAncientStores) { WriteTestRowSet(); shared_ptr<DiskRowSet> rs; ASSERT_OK(OpenTestRowSet(&rs)); - DeltaTracker *dt = rs->delta_tracker(); + DeltaTracker* dt = rs->mutable_delta_tracker(); ASSERT_EQ(0, dt->CountUndoDeltaStores()); ASSERT_EQ(0, dt->CountRedoDeltaStores()); diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc index 8d9ca542a..59831a135 100644 --- a/src/kudu/tablet/diskrowset.cc +++ b/src/kudu/tablet/diskrowset.cc @@ -50,6 +50,7 @@ #include "kudu/tablet/multi_column_writer.h" #include "kudu/tablet/mutation.h" #include "kudu/tablet/mvcc.h" +#include "kudu/tablet/rowset_metadata.h" #include "kudu/util/compression/compression.pb.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/flag_tags.h" @@ -573,8 +574,8 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>& HistoryGcOpts history_gc_opts) { VLOG_WITH_PREFIX(1) << "Major compacting REDO delta stores (cols: " << col_ids << ")"; TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds"); - std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock()); - RETURN_NOT_OK(delta_tracker()->CheckWritableUnlocked()); + std::lock_guard<Mutex> l(*mutable_delta_tracker()->compact_flush_lock()); + RETURN_NOT_OK(mutable_delta_tracker()->CheckWritableUnlocked()); // TODO(todd): do we need to lock schema or anything here? unique_ptr<MajorDeltaCompaction> compaction; @@ -704,9 +705,7 @@ Status DiskRowSet::MutateRow(Timestamp timestamp, return Status::NotFound("row not found"); } - RETURN_NOT_OK(delta_tracker_->Update(timestamp, *row_idx, update, op_id, result)); - - return Status::OK(); + return delta_tracker_->Update(timestamp, *row_idx, update, op_id, result); } Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe &probe, diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h index a6b076f0e..4ebb848b8 100644 --- a/src/kudu/tablet/diskrowset.h +++ b/src/kudu/tablet/diskrowset.h @@ -26,6 +26,7 @@ #include <memory> #include <mutex> #include <string> +#include <type_traits> #include <vector> #include <glog/logging.h> @@ -40,7 +41,6 @@ #include "kudu/tablet/delta_key.h" #include "kudu/tablet/delta_tracker.h" #include "kudu/tablet/rowset.h" -#include "kudu/tablet/rowset_metadata.h" #include "kudu/tablet/tablet_mem_trackers.h" #include "kudu/tablet/tablet_metadata.h" #include "kudu/util/bloom_filter.h" @@ -86,6 +86,7 @@ class MultiColumnWriter; class Mutation; class MvccSnapshot; class OperationResultPB; +class RowSetMetadata; class DiskRowSetWriter { public: @@ -430,8 +431,14 @@ class DiskRowSet : has_been_compacted_.store(true); } - DeltaTracker *delta_tracker() { - return DCHECK_NOTNULL(delta_tracker_.get()); + DeltaTracker* mutable_delta_tracker() { + DCHECK(delta_tracker_); + return delta_tracker_.get(); + } + + const DeltaTracker& delta_tracker() const { + DCHECK(delta_tracker_); + return *delta_tracker_; } std::shared_ptr<RowSetMetadata> metadata() override { diff --git a/src/kudu/tablet/metadata-test.cc b/src/kudu/tablet/metadata-test.cc index ad538b88d..4d271ebb2 100644 --- a/src/kudu/tablet/metadata-test.cc +++ b/src/kudu/tablet/metadata-test.cc @@ -46,7 +46,7 @@ class MetadataTest : public KuduTest { tablet_meta_ = new TabletMetadata(nullptr, "fake-tablet"); CHECK_OK(RowSetMetadata::CreateNew(tablet_meta_.get(), 0, &meta_)); for (int i = 0; i < all_blocks_.size(); i++) { - CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i])); + meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i]); } CHECK_EQ(4, meta_->redo_delta_blocks().size()); } diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc index 718710e31..56746bc84 100644 --- a/src/kudu/tablet/rowset_metadata.cc +++ b/src/kudu/tablet/rowset_metadata.cc @@ -186,20 +186,18 @@ void RowSetMetadata::SetColumnDataBlocks(const std::map<ColumnId, BlockId>& bloc blocks_by_col_id_ = std::move(new_map); } -Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id, - int64_t num_deleted_rows, - const BlockId& block_id) { +void RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id, + int64_t num_deleted_rows, + const BlockId& block_id) { std::lock_guard<LockType> l(lock_); last_durable_redo_dms_id_ = dms_id; redo_delta_blocks_.push_back(block_id); IncrementLiveRowsUnlocked(-num_deleted_rows); - return Status::OK(); } -Status RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) { +void RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) { std::lock_guard<LockType> l(lock_); undo_delta_blocks_.push_back(block_id); - return Status::OK(); } void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update, diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h index 85efd9d6b..c6f240a4a 100644 --- a/src/kudu/tablet/rowset_metadata.h +++ b/src/kudu/tablet/rowset_metadata.h @@ -120,11 +120,11 @@ class RowSetMetadata { // Atomically commit the new redo delta block to RowSetMetadata. // This atomic operation includes updates to last_durable_redo_dms_id_ and live_row_count_. - Status CommitRedoDeltaDataBlock(int64_t dms_id, - int64_t num_deleted_rows, - const BlockId& block_id); + void CommitRedoDeltaDataBlock(int64_t dms_id, + int64_t num_deleted_rows, + const BlockId& block_id); - Status CommitUndoDeltaDataBlock(const BlockId& block_id); + void CommitUndoDeltaDataBlock(const BlockId& block_id); bool has_encoded_keys_unlocked() const { return min_encoded_key_ && max_encoded_key_; diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 18ca96089..364eaa4ac 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -2600,7 +2600,8 @@ Status Tablet::MajorCompactAllDeltaStoresForTests() { for (const auto& rs : comps->rowsets->all_rowsets()) { if (!rs->IsAvailableForCompaction()) continue; DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get()); - RETURN_NOT_OK(drs->delta_tracker()->InitAllDeltaStoresForTests(DeltaTracker::REDOS_ONLY)); + RETURN_NOT_OK(drs->mutable_delta_tracker()->InitAllDeltaStoresForTests( + DeltaTracker::REDOS_ONLY)); RETURN_NOT_OK_PREPEND(drs->MajorCompactDeltaStores(&io_context, GetHistoryGcOpts()), "Failed major delta compaction on " + rs->ToString()); }