This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 48ee1e8b5 [compaction] Code cleanup and readability improvement 48ee1e8b5 is described below commit 48ee1e8b5d7792384178c75840a998e413aaa512 Author: Ashwani Raina <ara...@cloudera.com> AuthorDate: Tue Jan 23 17:50:11 2024 +0530 [compaction] Code cleanup and readability improvement This is a base patch that does not change any functionality. Goal is to break the compaction memory usage improvement change into small ones to make it easy to review. Change-Id: I54709b5e27751581c889854911323fbddab1c4ab Reviewed-on: http://gerrit.cloudera.org:8080/21098 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/tablet/compaction.cc | 269 ++++++++++++++++++++++-------------- src/kudu/tablet/compaction.h | 17 ++- src/kudu/tablet/delta_compaction.cc | 16 +-- 3 files changed, 178 insertions(+), 124 deletions(-) diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index d56b79b78..9ad3a1576 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -990,11 +990,11 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation* right) { // and adds them to 'new_undo_head'. Status MergeDuplicatedRowHistory(const string& tablet_id, const scoped_refptr<FsErrorManager>& error_manager, - CompactionInputRow* old_row, - Mutation** new_undo_head, + const CompactionInputRow& old_row, Arena* arena, - const HistoryGcOpts& history_gc_opts) { - if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK(); + const HistoryGcOpts& history_gc_opts, + Mutation** new_undo_head) { + if (PREDICT_TRUE(old_row.previous_ghost == nullptr)) return Status::OK(); // Use an all inclusive snapshot as all of the previous version's undos and redos // are guaranteed to be committed, otherwise the compaction wouldn't be able to @@ -1003,7 +1003,7 @@ Status MergeDuplicatedRowHistory(const string& tablet_id, faststring dst; - CompactionInputRow* previous_ghost = old_row->previous_ghost; + CompactionInputRow* previous_ghost = old_row.previous_ghost; while (previous_ghost != nullptr) { // First step is to transform the old rows REDO's into UNDOs, if there are any. @@ -1016,11 +1016,11 @@ Status MergeDuplicatedRowHistory(const string& tablet_id, RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(all_snap, *previous_ghost, - &pv_new_undos_head, - &pv_delete_redo, arena, + history_gc_opts, &previous_ghost->row, - history_gc_opts)); + &pv_new_undos_head, + &pv_delete_redo)); // We should be left with only one redo, the delete. #ifndef NDEBUG @@ -1179,13 +1179,11 @@ void RowSetsInCompactionOrFlush::DumpToLog() const { } } -void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, - Mutation** undo_head, +bool RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, const Mutation* redo_head, - bool* is_garbage_collected) { - *is_garbage_collected = false; + Mutation** undo_head) { if (!history_gc_opts.gc_enabled()) { - return; + return false; } // Make sure there is at most one REDO in the redo_head and that, if present, it's a DELETE. @@ -1195,8 +1193,7 @@ void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, // Garbage collect rows that are deleted before the AHM. if (history_gc_opts.IsAncientHistory(redo_head->timestamp())) { - *is_garbage_collected = true; - return; + return true; } } @@ -1218,6 +1215,7 @@ void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, prev_undo = undo_mut; undo_mut = undo_mut->next(); } + return false; } // Applies the REDOs of 'src_row' in accordance with the input snapshot, @@ -1228,11 +1226,11 @@ void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, // NOTE: input REDOs are expected to be in increasing timestamp order. Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap, const CompactionInputRow& src_row, - Mutation** new_undo_head, - Mutation** new_redo_head, Arena* arena, + const HistoryGcOpts& history_gc_opts, RowBlockRow* dst_row, - const HistoryGcOpts& history_gc_opts) { + Mutation** new_undo_head, + Mutation** new_redo_head) { bool is_deleted = false; #define ERROR_LOG_CONTEXT \ @@ -1390,6 +1388,135 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap, #undef ERROR_LOG_CONTEXT } +// Append REDO and UNDO deltas to DiskRowSetWriter output. +static Status AppendDeltasToDRS(RollingDiskRowSetWriter* out, + Mutation* new_undos_head, + Mutation* new_redos_head, + RowBlockRow* dst_row) { + rowid_t index_in_current_drs; + + if (new_undos_head != nullptr) { + // Append UNDO deltas to DiskRowSetWriter output. + RETURN_NOT_OK(out->AppendUndoDeltas(dst_row->row_index(), + new_undos_head, + &index_in_current_drs)); + } + + if (new_redos_head != nullptr) { + // Append REDO deltas to DiskRowSetWriter output. + RETURN_NOT_OK(out->AppendRedoDeltas(dst_row->row_index(), + new_redos_head, + &index_in_current_drs)); + } + + DVLOG(4) << "Output Row: " << dst_row->schema()->DebugRow(*dst_row) + << "; RowId: " << index_in_current_drs; + + return Status::OK(); +} + +#ifndef NDEBUG +// Sanity check for UNDO list. +static void UndoListSanityCheck(Mutation* new_undos_head) { + auto* u = new_undos_head; + bool is_deleted = false; + // The resulting list should have the following invariants: + // - deletes can only be observed if not already deleted + // - reinserts can only be observed if deleted + // - UNDO mutations are in decreasing order + while (u != nullptr) { + if (u->changelist().is_delete()) { + CHECK(!is_deleted); + is_deleted = true; + } else if (u->changelist().is_reinsert()) { + CHECK(is_deleted); + is_deleted = false; + } + if (!u->next()) break; + CHECK_GE(u->timestamp(), u->next()->timestamp()); + u = u->next(); + } +} +#endif // NDEBUG + +// For each input row, go through all the REDO mutations and apply those to base row. +// Generate corresponding UNDO deltas for applied mutations. +// For a row with 'ghost' entries, merge their histories of mutations. +// Remove ancient UNDO mutations and check if row is required to be garbage collected. +// Append REDO and UNDO deltas to DRS output. +// Do sanity check for final UNDO list. +static Status ApplyMutationsAndMergeDuplicateHistory(const MvccSnapshot& snap, + const CompactionInputRow& input_row, + size_t cur_row_idx, + RowBlock* block, + Arena* arena, + const string& tablet_id, + const scoped_refptr<FsErrorManager>& err_mgr, + const HistoryGcOpts& history_gc_opts, + RollingDiskRowSetWriter* out, + int* live_row_count, + bool* is_garbage_collected) { + RETURN_NOT_OK(out->RollIfNecessary()); + + const Schema* schema = input_row.row.schema(); + DCHECK_SCHEMA_EQ(*schema, out->schema()); + DCHECK(schema->has_column_ids()); + + DVLOG(4) << "Input Row: " << CompactionInputRowToString(input_row); + + RowBlockRow dst_row = block->row(cur_row_idx); + RETURN_NOT_OK(CopyRow(input_row.row, &dst_row, static_cast<Arena*>(nullptr))); + + // Collect the new UNDO/REDO mutations. + Mutation* new_undos_head = nullptr; + Mutation* new_redos_head = nullptr; + + // Apply all REDOs to the base row, generating UNDOs for it. This does + // not take into account any 'previous_ghost' members. + RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap, + input_row, + arena, + history_gc_opts, + &dst_row, + &new_undos_head, + &new_redos_head)); + + // Merge the histories of 'input_row' with previous ghosts, if there are any. + RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id, + err_mgr, + input_row, + arena, + history_gc_opts, + &new_undos_head)); + + // Remove ancient UNDOS and check whether the row should be garbage collected. + *is_garbage_collected = RemoveAncientUndos(history_gc_opts, + new_redos_head, + &new_undos_head); + + DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head, new_undos_head) << + "; Was garbage collected? " << *is_garbage_collected; + + // Skip further processing if this row was garbage collected + if (!*is_garbage_collected) { + RETURN_NOT_OK(AppendDeltasToDRS(out, + new_undos_head, + new_redos_head, + &dst_row)); + + // If the REDO is empty, it should not be a DELETE. + if (new_redos_head == nullptr) { + (*live_row_count)++; + } + +#ifndef NDEBUG + UndoListSanityCheck(new_undos_head); +#endif // NDEBUG + } + + return Status::OK(); +} + // Following method processes the compaction input by reading input rows in // blocks and for each row inside the block: // - Apply all REDO mutations collected for the row at hand. @@ -1414,110 +1541,38 @@ Status FlushCompactionInput(const string& tablet_id, while (input->HasMoreBlocks()) { RETURN_NOT_OK(input->PrepareBlock(&rows)); - int n = 0; + size_t cur_row_idx = 0; int live_row_count = 0; - for (int i = 0; i < rows.size(); i++) { - CompactionInputRow* input_row = &rows[i]; - RETURN_NOT_OK(out->RollIfNecessary()); - - const Schema* schema = input_row->row.schema(); - DCHECK_SCHEMA_EQ(*schema, out->schema()); - DCHECK(schema->has_column_ids()); - - RowBlockRow dst_row = block.row(n); - RETURN_NOT_OK(CopyRow(input_row->row, &dst_row, static_cast<Arena*>(nullptr))); - - DVLOG(4) << "Input Row: " << CompactionInputRowToString(*input_row); - - // Collect the new UNDO/REDO mutations. - Mutation* new_undos_head = nullptr; - Mutation* new_redos_head = nullptr; - - // Apply all REDOs to the base row, generating UNDOs for it. This does - // not take into account any 'previous_ghost' members. - RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap, - *input_row, - &new_undos_head, - &new_redos_head, - input->PreparedBlockArena(), - &dst_row, - history_gc_opts)); - - // Merge the histories of 'input_row' with previous ghosts, if there are any. - RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id, - error_manager, - input_row, - &new_undos_head, - input->PreparedBlockArena(), - history_gc_opts)); - - // Remove ancient UNDOS and check whether the row should be garbage collected. + for (const auto& row : rows) { bool is_garbage_collected = false; - RemoveAncientUndos(history_gc_opts, - &new_undos_head, - new_redos_head, - &is_garbage_collected); - - DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head, new_undos_head) << - "; Was garbage collected? " << is_garbage_collected; + RETURN_NOT_OK(ApplyMutationsAndMergeDuplicateHistory(snap, + row, + cur_row_idx, + &block, + input->PreparedBlockArena(), + tablet_id, + error_manager, + history_gc_opts, + out, + &live_row_count, + &is_garbage_collected)); // Whether this row was garbage collected if (is_garbage_collected) { // Don't flush the row. continue; } - rowid_t index_in_current_drs; - - if (new_undos_head != nullptr) { - // Append UNDO deltas to DiskRowSetWriter output. - out->AppendUndoDeltas(dst_row.row_index(), new_undos_head, &index_in_current_drs); - } - - if (new_redos_head != nullptr) { - // Append REDO deltas to DiskRowSetWriter output. - out->AppendRedoDeltas(dst_row.row_index(), new_redos_head, &index_in_current_drs); - } - - // If the REDO is empty, it should not be a DELETE. - if (new_redos_head == nullptr) { - live_row_count++; - } - - DVLOG(4) << "Output Row: " << dst_row.schema()->DebugRow(dst_row) - << "; RowId: " << index_in_current_drs; -#ifndef NDEBUG - auto* u = new_undos_head; - bool is_deleted = false; - // The resulting list should have the following invariants: - // - deletes can only be observed if not already deleted - // - reinserts can only be observed if deleted - // - UNDO mutations are in decreasing order - while (u != nullptr) { - if (u->changelist().is_delete()) { - CHECK(!is_deleted); - is_deleted = true; - } else if (u->changelist().is_reinsert()) { - CHECK(is_deleted); - is_deleted = false; - } - if (!u->next()) break; - CHECK_GE(u->timestamp(), u->next()->timestamp()); - u = u->next(); - } -#endif // NDEBUG - - n++; - if (n == block.nrows()) { + if (++cur_row_idx == block.nrows()) { // Append fully processed rowblock to DRS writer output. RETURN_NOT_OK(out->AppendBlock(block, live_row_count)); live_row_count = 0; - n = 0; + cur_row_idx = 0; } } - if (n > 0) { - block.Resize(n); + if (cur_row_idx > 0) { + block.Resize(cur_row_idx); // Append partially (resized) processed rowblock to DRS writer output. RETURN_NOT_OK(out->AppendBlock(block, live_row_count)); block.Resize(block.row_capacity()); diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h index 0404e06ca..67c42587c 100644 --- a/src/kudu/tablet/compaction.h +++ b/src/kudu/tablet/compaction.h @@ -203,16 +203,15 @@ struct CompactionInputRow { // Function shared by flushes and compactions. Removes UNDO Mutations // considered "ancient" from the given CompactionInputRow, modifying the undo // mutation list in-place. -// 'is_garbage_collected': Set to true if the row was marked as deleted prior -// to the ancient history mark, with no reinsertions after that. In such a -// case, all traces of the row should be removed from disk by the caller. +// Return true if the row was marked as deleted prior to the ancient history mark, +// with no reinsertions after that. In such a case, all traces of the row should +// be removed from disk by the caller. // // This is supposed to be called after ApplyMutationsAndGenerateUndos() where REDOS // are transformed in UNDOs. There can be at most one REDO in 'redo_head', a DELETE. -void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, - Mutation** undo_head, +bool RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, const Mutation* redo_head, - bool* is_garbage_collected); + Mutation** undo_head); // Function shared by flushes, compactions and major delta compactions. Applies all the REDO // mutations from 'src_row' to the 'dst_row', and generates the related UNDO mutations. Some @@ -224,11 +223,11 @@ void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, // ignored. Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap, const CompactionInputRow& src_row, - Mutation** new_undo_head, - Mutation** new_redo_head, Arena* arena, + const HistoryGcOpts& history_gc_opts, RowBlockRow* dst_row, - const HistoryGcOpts& history_gc_opts); + Mutation** new_undo_head, + Mutation** new_redo_head); // Iterate through this compaction input, flushing all rows to the given RollingDiskRowSetWriter. // The 'snap' argument should match the MvccSnapshot used to create the compaction input. diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc index 6a582530d..dc612ef1c 100644 --- a/src/kudu/tablet/delta_compaction.cc +++ b/src/kudu/tablet/delta_compaction.cc @@ -179,16 +179,16 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) { DVLOG(3) << "MDC Input Row - RowId: " << row_id << " " << CompactionInputRowToString(*input_row); - // NOTE: This is presently ignored. - bool is_garbage_collected; - - RETURN_NOT_OK(ApplyMutationsAndGenerateUndos( - snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena, - &dst_row, history_gc_opts_)); + RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap, + *input_row, + &mem.arena, + history_gc_opts_, + &dst_row, + &new_undos_head, + &new_redos_head)); RemoveAncientUndos(history_gc_opts_, - &new_undos_head, new_redos_head, - &is_garbage_collected); + &new_undos_head); DVLOG(3) << "MDC Output Row - RowId: " << row_id << " " << RowToString(dst_row, new_undos_head, new_redos_head);