This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 798d9d6fc60 [pick21][opt](mow) reduce memory usage for mow table compaction (#36865) (#36968) 798d9d6fc60 is described below commit 798d9d6fc60a7cca8bb3f975041fe5064d62b739 Author: camby <camby...@tencent.com> AuthorDate: Mon Jul 1 15:33:18 2024 +0800 [pick21][opt](mow) reduce memory usage for mow table compaction (#36865) (#36968) cherry-pick https://github.com/apache/doris/pull/36865 to branch-2.1 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/olap/compaction.cpp | 49 +++++++++++++++----------- be/src/olap/tablet.cpp | 8 +++-- be/src/olap/utils.h | 2 ++ regression-test/pipeline/external/conf/be.conf | 1 + regression-test/pipeline/p0/conf/be.conf | 1 + regression-test/pipeline/p1/conf/be.conf | 1 + 8 files changed, 43 insertions(+), 23 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7a8c63db748..910bf69609e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1098,6 +1098,8 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9"); DEFINE_mBool(enable_merge_on_write_correctness_check, "true"); // rowid conversion correctness check when compaction for mow table DEFINE_mBool(enable_rowid_conversion_correctness_check, "false"); +// missing rows correctness check when compaction for mow table +DEFINE_mBool(enable_missing_rows_correctness_check, "false"); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 177bb03e02b..2d0dc128a2a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1159,6 +1159,8 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); DECLARE_mBool(enable_merge_on_write_correctness_check); // rowid conversion correctness check when compaction for mow table DECLARE_mBool(enable_rowid_conversion_correctness_check); +// missing rows correctness check when compaction for mow table +DECLARE_mBool(enable_missing_rows_correctness_check); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DECLARE_mInt32(mow_publish_max_discontinuous_version_num); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 05c4cbcb56a..677681f712f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -895,8 +895,17 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { _tablet->tablet_schema()->cluster_key_idxes().empty()) { Version version = _tablet->max_version(); DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set<RowLocation> missed_rows; - std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; + std::unique_ptr<RowLocationSet> missed_rows; + if (config::enable_missing_rows_correctness_check && !allow_delete_in_cumu_compaction() && + compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { + missed_rows = std::make_unique<RowLocationSet>(); + LOG(INFO) << "RowLocation Set inited succ for tablet:" << _tablet->tablet_id(); + } + std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map; + if (config::enable_rowid_conversion_correctness_check) { + location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>(); + LOG(INFO) << "Location Map inited succ for tablet:" << _tablet->tablet_id(); + } // Convert the delete bitmap of the input rowsets to output rowset. // New loads are not blocked, so some keys of input rowsets might // be deleted during the time. We need to deal with delete bitmap @@ -904,13 +913,12 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { // TODO(LiaoXin): check if there are duplicate keys std::size_t missed_rows_size = 0; _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), + _input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(), + location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - if (!allow_delete_in_cumu_compaction()) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - _tablet->tablet_state() == TABLET_RUNNING && stats != nullptr && + if (missed_rows) { + missed_rows_size = missed_rows->size(); + if (_tablet->tablet_state() == TABLET_RUNNING && stats != nullptr && stats->merged_rows != missed_rows_size) { std::stringstream ss; ss << "cumulative compaction: the merged rows(" << stats->merged_rows @@ -936,9 +944,9 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { } if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); + RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, *location_map)); + location_map->clear(); } - location_map.clear(); { std::lock_guard<std::mutex> wrlock_(_tablet->get_rowset_update_lock()); @@ -965,8 +973,8 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { } DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id()); _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, UINT64_MAX, &missed_rows, - &location_map, *it.delete_bitmap.get(), &txn_output_delete_bitmap); + _input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(), + location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap); if (config::enable_merge_on_write_correctness_check) { RowsetIdUnorderedSet rowsetids; rowsetids.insert(_output_rowset->rowset_id()); @@ -985,21 +993,20 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { // Convert the delete bitmap of the input rowsets to output rowset for // incremental data. _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), + _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, + missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - if (!allow_delete_in_cumu_compaction() && - compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - DCHECK_EQ(missed_rows.size(), missed_rows_size); - if (missed_rows.size() != missed_rows_size) { + if (missed_rows) { + DCHECK_EQ(missed_rows->size(), missed_rows_size); + if (missed_rows->size() != missed_rows_size) { LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size - << " after: " << missed_rows.size(); + << " after: " << missed_rows->size(); } } - if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); + if (location_map) { + RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, *location_map)); } _tablet->merge_delete_bitmap(output_rowset_delete_bitmap); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4cf8fae0ded..a9f67b4ad5f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3617,7 +3617,9 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( << " src loaction: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " version: " << cur_version; - missed_rows->insert(src); + if (missed_rows) { + missed_rows->insert(src); + } continue; } VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |" @@ -3625,7 +3627,9 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( << " src location: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " start version: " << start_version << "end version" << end_version; - (*location_map)[rowset].emplace_back(src, dst); + if (location_map) { + (*location_map)[rowset].emplace_back(src, dst); + } output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version}, dst.row_id); } diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index 7c0ccc503de..ef090cbf84c 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -296,6 +296,8 @@ struct RowLocation { } } }; +using RowLocationSet = std::set<RowLocation>; +using RowLocationPairList = std::list<std::pair<RowLocation, RowLocation>>; struct GlobalRowLoacation { GlobalRowLoacation(int64_t tid, RowsetId rsid, uint32_t sid, uint32_t rid) diff --git a/regression-test/pipeline/external/conf/be.conf b/regression-test/pipeline/external/conf/be.conf index 40b1b0669c6..78140d80a18 100644 --- a/regression-test/pipeline/external/conf/be.conf +++ b/regression-test/pipeline/external/conf/be.conf @@ -80,6 +80,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true #enable_jvm_monitor = true diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 40b1b0669c6..78140d80a18 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -80,6 +80,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true #enable_jvm_monitor = true diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index 0c450c9281e..74069688bd8 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -72,6 +72,7 @@ enable_fuzzy_mode=true enable_set_in_bitmap_value=true enable_feature_binlog=true max_sys_mem_available_low_water_mark_bytes=69206016 +enable_missing_rows_correctness_check=true enable_jvm_monitor = true --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org