yiguolei commented on a change in pull request #1576: Change cumulative compaction for decoupling storage from compution URL: https://github.com/apache/incubator-doris/pull/1576#discussion_r310428885
########## File path: be/src/olap/base_compaction.cpp ########## @@ -40,487 +39,264 @@ using std::vector; namespace doris { -OLAPStatus BaseCompaction::init(TabletSharedPtr tablet, bool is_manual_trigger) { - // 表在首次查询或PUSH等操作时,会被加载到内存 - // 如果表没有被加载,表明该表上目前没有任何操作,所以不进行BE操作 - if (!tablet->init_succeeded()) { - return OLAP_ERR_INPUT_PARAMETER_ERROR; - } - - LOG(INFO) << "init base compaction handler. [tablet=" << tablet->full_name() << "]"; +static bool rowset_comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) { + return left->end_version() < right->end_version(); +} - _tablet = tablet; +BaseCompaction::BaseCompaction(TabletSharedPtr tablet) + : _tablet(tablet), + _base_locked(false), + _input_rowsets_size(0), + _input_row_num(0) + { } - // 1. 尝试取得base compaction的锁 - if (!_try_base_compaction_lock()) { - LOG(WARNING) << "another base compaction is running. tablet=" << tablet->full_name(); - return OLAP_ERR_BE_TRY_BE_LOCK_ERROR; +BaseCompaction::~BaseCompaction() { + if (_base_locked) { + _tablet->release_base_compaction_lock(); } +} - // 2. 检查是否满足base compaction触发策略 - VLOG(3) << "check whether satisfy base compaction policy."; - bool is_policy_satisfied = false; - vector<Version> candidate_versions; - is_policy_satisfied = _check_whether_satisfy_policy(is_manual_trigger, &candidate_versions); - - // 2.1 如果不满足触发策略,则直接释放base compaction锁, 返回错误码 - if (!is_policy_satisfied) { - _release_base_compaction_lock(); - return OLAP_ERR_BE_NO_SUITABLE_VERSION; +OLAPStatus BaseCompaction::compact() { + if (!_tablet->init_succeeded()) { + return OLAP_ERR_INPUT_PARAMETER_ERROR; } - // 2.2 如果满足触发策略,触发base compaction - // 不释放base compaction锁, 在run()完成之后再释放 - if (!_validate_need_merged_versions(candidate_versions)) { - LOG(FATAL) << "error! invalid need merged versions"; - _release_base_compaction_lock(); - return OLAP_ERR_BE_INVALID_NEED_MERGED_VERSIONS; + if (!_tablet->try_base_compaction_lock()) { + LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name(); + return OLAP_ERR_BE_TRY_BE_LOCK_ERROR; } - _need_merged_versions = candidate_versions; + _base_locked = true; - return OLAP_SUCCESS; -} + // 1. pick rowsets to compact + RETURN_NOT_OK(pick_rowsets_to_compact()); -OLAPStatus BaseCompaction::run() { - LOG(INFO) << "start base compaction. tablet=" << _tablet->full_name() - << ", old_base_version=" << _old_base_version.second - << ", new_base_version=" << _new_base_version.second; + // 2. do base compaction, merge rowsets + RETURN_NOT_OK(do_base_compaction()); - OLAPStatus res = OLAP_SUCCESS; - OlapStopWatch stage_watch; - - // 1. 计算新base的version hash - VersionHash new_base_version_hash; - res = _tablet->compute_all_versions_hash(_need_merged_versions, &new_base_version_hash); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to calculate new base version hash. tablet=" << _tablet->full_name() - << ", new_base_version=" << _new_base_version.second; - _garbage_collection(); - return res; - } + // 3. set base state to success + _base_state = BaseCompactionState::SUCCESS; - VLOG(10) << "new_base_version_hash:" << new_base_version_hash; + // 4. garbage collect input rowsets after base compaction + RETURN_NOT_OK(gc_unused_rowsets()); - // 2. 获取生成新base需要的data sources - vector<RowsetSharedPtr> rowsets; - res = _tablet->capture_consistent_rowsets(_need_merged_versions, &rowsets); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to acquire need data sources. tablet=" << _tablet->full_name() - << ", version=" << _new_base_version.second; - _garbage_collection(); - return res; - } + return OLAP_SUCCESS; +} - { - DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size()); - int64_t merge_bytes = 0; - for (auto& rowset : rowsets) { - merge_bytes += rowset->data_disk_size(); - } - DorisMetrics::base_compaction_bytes_total.increment(merge_bytes); +OLAPStatus BaseCompaction::pick_rowsets_to_compact() { + _input_rowsets.clear(); + _tablet->pick_candicate_rowsets_to_base_compaction(&_input_rowsets); Review comment: should use capture rs readers to check data is valid ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org