yiguolei commented on a change in pull request #1576: Change cumulative 
compaction for decoupling storage from compution
URL: https://github.com/apache/incubator-doris/pull/1576#discussion_r310462451
 
 

 ##########
 File path: be/src/olap/cumulative_compaction.cpp
 ##########
 @@ -32,356 +32,121 @@ using std::vector;
 
 namespace doris {
 
-OLAPStatus CumulativeCompaction::init(TabletSharedPtr tablet) {
-    LOG(INFO) << "init cumulative compaction handler. tablet=" << 
tablet->full_name();
+static bool rowset_comparator(const RowsetSharedPtr& left, const 
RowsetSharedPtr& right) {
+    return left->end_version() < right->end_version();
+}
+
+CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet)
+    : _tablet(tablet),
+      
_cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes),
+      _input_rowsets_size(0),
+      _input_row_num(0),
+      _cumulative_state(CumulativeState::FAILED)
+    { }
 
-    if (_is_init) {
-        LOG(WARNING) << "cumulative handler has been inited. tablet=" << 
tablet->full_name();
-        return OLAP_ERR_CUMULATIVE_REPEAT_INIT;
+CumulativeCompaction::~CumulativeCompaction() {
+    if (_cumulative_locked) {
+        _tablet->release_cumulative_lock();
     }
+}
 
-    if (!tablet->init_succeeded()) {
+OLAPStatus CumulativeCompaction::compact() {
+    if (!_tablet->init_succeeded()) {
         return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
     }
 
-    _tablet = tablet;
-    _max_delta_file_size = config::cumulative_compaction_budgeted_bytes;
-
     if (!_tablet->try_cumulative_lock()) {
-        LOG(INFO) << "skip compaction, because another cumulative is running. 
tablet=" << _tablet->full_name();
+        LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << 
_tablet->full_name();
         return OLAP_ERR_CE_TRY_CE_LOCK_ERROR;
     }
 
-    _tablet->obtain_header_rdlock();
-    _old_cumulative_layer_point = _tablet->cumulative_layer_point();
-    _tablet->release_header_lock();
-    // 如果为-1,则该table之前没有设置过cumulative layer point
-    // 我们在这里设置一下
-    if (_old_cumulative_layer_point == -1) {
-        LOG(INFO) << "tablet has an unreasonable cumulative layer point. 
tablet=" << _tablet->full_name()
-                  << ", cumulative_layer_point=" << 
_old_cumulative_layer_point;
-        _tablet->release_cumulative_lock();
-        return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
-    }
+    _cumulative_locked = true;
 
-    _tablet->obtain_header_wrlock();
-    OLAPStatus res = _calculate_need_merged_versions();
-    _tablet->release_header_lock();
-    if (res != OLAP_SUCCESS) {
-        _tablet->release_cumulative_lock();
-        LOG(INFO) << "no suitable delta versions. don't do cumulative 
compaction now.";
-        return res;
-    }
+    // 1. pick rowsets to compact
+    RETURN_NOT_OK(pick_rowsets_to_compact());
 
-    if (!_validate_need_merged_versions()) {
-        _tablet->release_cumulative_lock();
-        LOG(FATAL) << "error! invalid need merged versions.";
-        return OLAP_ERR_CUMULATIVE_INVALID_NEED_MERGED_VERSIONS;
-    }
+    // 2. do cumulative compaction, merge rowsets
+    RETURN_NOT_OK(do_cumulative_compaction());
+
+    // 33. set cumulative state to success
+    _cumulative_state = CumulativeState::SUCCESS;
+    
+    // 4. garbage collect input rowsets after cumulative compaction 
+    RETURN_NOT_OK(gc_unused_rowsets());
 
-    _is_init = true;
-    _cumulative_version = Version(_need_merged_versions.begin()->first,
-                                  _need_merged_versions.rbegin()->first);
-    _rs_writer.reset(new (std::nothrow)AlphaRowsetWriter());
     return OLAP_SUCCESS;
 }
 
-OLAPStatus CumulativeCompaction::run() {
-    if (!_is_init) {
-        _tablet->release_cumulative_lock();
-        LOG(WARNING) << "cumulative handler is not inited.";
-        return OLAP_ERR_NOT_INITED;
-    }
-
-    // 0. 准备工作
-    LOG(INFO) << "start cumulative compaction. tablet=" << _tablet->full_name()
-              << ", cumulative_version=" << _cumulative_version.first << "-"
-              << _cumulative_version.second;
-    OlapStopWatch watch;
-
-    // 1. 计算新的cumulative文件的version hash
-    OLAPStatus res = OLAP_SUCCESS;
-    res = _tablet->compute_all_versions_hash(_need_merged_versions, 
&_cumulative_version_hash);
-    if (res != OLAP_SUCCESS) {
-        _tablet->release_cumulative_lock();
-        LOG(WARNING) << "failed to computer cumulative version hash."
-                     << " tablet=" << _tablet->full_name()
-                     << ", cumulative_version=" << _cumulative_version.first
-                     << "-" << _cumulative_version.second;
-        return res;
-    }
-
-    // 2. 获取待合并的delta文件对应的data文件
-    res = _tablet->capture_consistent_rowsets(_need_merged_versions, 
&_rowsets);
-    if (res != OLAP_SUCCESS) {
-        _tablet->release_cumulative_lock();
-        LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << 
_tablet->full_name()
-                     << ", version=" << _cumulative_version.first
-                     << "-" << _cumulative_version.second;
-        return res;
-    }
-
-    {
-        
DorisMetrics::cumulative_compaction_deltas_total.increment(_need_merged_versions.size());
-        int64_t merge_bytes = 0;
-        for (auto& rowset : _rowsets) {
-            merge_bytes += rowset->data_disk_size();
-        }
-        DorisMetrics::cumulative_compaction_bytes_total.increment(merge_bytes);
-    }
-
-    do {
-        // 3. 生成新cumulative文件对应的olap index
-        RowsetId rowset_id = 0;
-        RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
-        RowsetWriterContext context;
-        context.rowset_id = rowset_id;
-        context.tablet_uid = _tablet->tablet_uid();
-        context.tablet_id = _tablet->tablet_id();
-        context.partition_id = _tablet->partition_id();
-        context.tablet_schema_hash = _tablet->schema_hash();
-        context.rowset_type = ALPHA_ROWSET;
-        context.rowset_path_prefix = _tablet->tablet_path();
-        context.tablet_schema = &(_tablet->tablet_schema());
-        context.rowset_state = VISIBLE;
-        context.data_dir = _tablet->data_dir();
-        context.version = _cumulative_version;
-        context.version_hash = _cumulative_version_hash;
-        _rs_writer->init(context);
-
-        // 4. 执行cumulative compaction合并过程
-        for (auto& rowset : _rowsets) {
-            RowsetReaderSharedPtr rs_reader(rowset->create_reader());
-            if (rs_reader == nullptr) {
-                LOG(WARNING) << "rowset create reader failed. rowset:" <<  
rowset->rowset_id();
-                _tablet->release_cumulative_lock();
-                return OLAP_ERR_ROWSET_CREATE_READER;
-            }
-            _rs_readers.push_back(rs_reader);
-        }
-        res = _do_cumulative_compaction();
-        _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + 
std::to_string(_rs_writer->rowset_id()));
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "failed to do cumulative compaction."
-                         << ", tablet=" << _tablet->full_name()
-                         << ", cumulative_version=" << 
_cumulative_version.first
-                         << "-" << _cumulative_version.second;
-            break;
-        }
-    } while (0);
-
-    // 5. 如果出现错误,执行清理工作
-    if (res != OLAP_SUCCESS) {
-        StorageEngine::instance()->add_unused_rowset(_rowset);
-    }
+OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
_tablet->pick_candicate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
-    _tablet->release_cumulative_lock();
-
-    LOG(INFO) << "succeed to do cumulative compaction. tablet=" << 
_tablet->full_name()
-              << ", cumulative_version=" << _cumulative_version.first
-              << "-" << _cumulative_version.second
-              << ". elapsed time of doing cumulative compaction"
-              << ", time=" << watch.get_elapse_second() << "s";
-    return res;
-}
-
-OLAPStatus CumulativeCompaction::_calculate_need_merged_versions() {
-    Versions delta_versions;
-    OLAPStatus res = _get_delta_versions(&delta_versions);
-    if (res != OLAP_SUCCESS) {
-        LOG(INFO) << "failed to get delta versions. res=" << res;
-        return res;
+    if (candidate_rowsets.size() == 0 || candidate_rowsets.size() == 1) {
+        LOG(INFO) << "There is no enough rowsets to cumulative compaction."
+                  << ", the size of rowsets to compact=" << 
candidate_rowsets.size()
+                  << ", cumulative_point=" << 
_tablet->cumulative_layer_point();
+        return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
     }
 
-    // 此处减1,是为了确保最新版本的delta不会合入到cumulative里
-    // 因为push可能会重复导入最新版本的delta
-    uint32_t delta_number = delta_versions.size() - 1;
-    uint32_t index = 0;
-    // 在delta文件中寻找可合并的delta文件
-    // 这些delta文件可能被delete或较大的delta文件(>= max_delta_file_size)分割为多个区间, 比如:
-    // v1, v2, v3, D, v4, v5, D, v6, v7
-    // 我们分区间进行查找直至找到合适的可合并delta文件
-    while (index < delta_number) {
-        Versions need_merged_versions;
-        size_t total_size = 0;
-
-        // 在其中1个区间里查找可以合并的delta文件
-        for (; index < delta_number; ++index) {
-            // 如果已找到的可合并delta文件大小大于等于_max_delta_file_size,我们认为可以执行合并了
-            // 停止查找过程
-            if (total_size >= _max_delta_file_size) {
-                break;
-            }
+    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
rowset_comparator);
+    RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
 
-            Version delta = delta_versions[index];
-            size_t delta_size = _tablet->get_rowset_size_by_version(delta);
-            // 如果遇到大的delta文件,或delete版本文件,则:
-            if (delta_size >= _max_delta_file_size
-                    || _tablet->version_for_delete_predicate(delta)
-                    || _tablet->version_for_load_deletion(delta)) {
-                // 1) 如果need_merged_versions为空,表示这2类文件在区间的开头,直接跳过
-                if (need_merged_versions.empty()) {
-                    continue;
-                } else {
-                    // 2) 如果need_merged_versions不为空,则已经找到区间的末尾,跳出循环
-                    break;
-                }
+    std::vector<RowsetSharedPtr> transient_rowsets;
+    for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
+        RowsetSharedPtr rowset = candidate_rowsets[i];
+        if (_tablet->version_for_delete_predicate(rowset->version())) {
+            if (transient_rowsets.size() > _input_rowsets.size()) {
+                _input_rowsets = transient_rowsets;
             }
-
-            need_merged_versions.push_back(delta);
-            total_size += delta_size;
-        }
-
-        // 该区间没有可以合并的delta文件,进行下一轮循环,继续查找下一个区间
-        if (need_merged_versions.empty()) {
+            transient_rowsets.clear();
             continue;
         }
 
-        // 如果该区间中只有一个delta,或者该区间的delta都是空的delta,则我们查看能否与区间末尾的
-        // 大delta合并,或者与区间的开头的前一个版本合并
-        if (need_merged_versions.size() == 1 || total_size == 0) {
-            // 如果区间末尾是较大的delta版, 则与它合并
-            if (index < delta_number
-                    && 
_tablet->get_rowset_size_by_version(delta_versions[index]) >=
-                           _max_delta_file_size) {
-                need_merged_versions.push_back(delta_versions[index]);
-                ++index;
-            }
-            // 如果区间前一个版本可以合并, 则将其加入到可合并版本中
-            Version delta_before_interval;
-            if (_find_previous_version(need_merged_versions[0], 
&delta_before_interval)) {
-                need_merged_versions.insert(need_merged_versions.begin(),
-                                            delta_before_interval); 
-            }
-
-            // 如果还是只有1个待合并的delta,则跳过,不进行合并
-            if (need_merged_versions.size() == 1) {
-                continue;
-            }
-
-            _need_merged_versions.swap(need_merged_versions);
-            _new_cumulative_layer_point = delta_versions[index].first;
-            return OLAP_SUCCESS;
-        }
-
-        // 如果有多个可合并文件,则可以进行cumulative compaction的合并过程
-        // 如果只有只有一个可合并的文件,为了效率,不触发cumulative compaction的合并过程
-        if (need_merged_versions.size() != 1) {
-            // 如果在可合并区间开头之前的一个版本的大小没有达到delta文件的最大值,
-            // 则将可合并区间的文件合并到之前那个版本上
-            Version delta;
-            if (_find_previous_version(need_merged_versions[0], &delta)) {
-                need_merged_versions.insert(need_merged_versions.begin(), 
delta); 
-            }
-
-            _need_merged_versions.swap(need_merged_versions);
-            _new_cumulative_layer_point = delta_versions[index].first;
-            return OLAP_SUCCESS;
-        }
+        transient_rowsets.push_back(rowset); 
     }
-    
-    // 没有找到可以合并的delta文件,无法执行合并过程,但我们仍然需要设置新的cumulative_layer_point
-    // 如果不设置新的cumulative_layer_point, 则下次执行cumulative compaction时,扫描的文件和这次
-    // 扫描的文件相同,依然找不到可以合并的delta文件, 无法执行合并过程。
-    // 依此类推,就进入了死循环状态,永远不会进行cumulative compaction
-    _tablet->set_cumulative_layer_point(delta_versions[index].first);
-    _tablet->save_meta();
-    return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
-}
-
-static bool version_comparator(const Version& lhs, const Version& rhs) {
-    return lhs.second < rhs.second;
-}
-
-OLAPStatus CumulativeCompaction::_get_delta_versions(Versions* delta_versions) 
{
-    delta_versions->clear();
-    
-    Versions all_versions;
-    _tablet->list_versions(&all_versions);
 
-    for (Versions::const_iterator version = all_versions.begin();
-            version != all_versions.end(); ++version) {
-        if (version->first == version->second && version->first >= 
_old_cumulative_layer_point) {
-            delta_versions->push_back(*version);
-        }
+    if (transient_rowsets.size() > _input_rowsets.size()) {
+        _input_rowsets = transient_rowsets;
     }
-
-    if (delta_versions->size() == 0) {
-        LOG(INFO) << "no delta versions. cumulative_point=" << 
_old_cumulative_layer_point;
-        return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
+               
+    if (_input_rowsets.empty()) {
+        
_tablet->set_cumulative_layer_point(candidate_rowsets.back()->end_version() + 
1);
+    } else {
+        
_tablet->set_cumulative_layer_point(_input_rowsets.back()->end_version() + 1);
 
 Review comment:
   Must not update ce point here. Should update it after ce success or ce failed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to