gavinchou commented on code in PR #63722:
URL: https://github.com/apache/doris/pull/63722#discussion_r3496766811


##########
be/src/load/group_commit/group_commit_mgr.cpp:
##########
@@ -277,31 +285,131 @@ Status GroupCommitTable::get_first_block_load_queue(
         return Status::OK();
     }
     create_plan_dep->block();
+    _create_plan_be_exe_version = be_exe_version;
+    if (_create_plan_deps.empty()) {
+        _create_plan_start_time_ms = MonotonicMillis();
+    }
     _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, 
put_block_dep,
                                                        base_schema_version, 
index_size));
-    if (!_is_creating_plan_fragment) {
-        _is_creating_plan_fragment = true;
-        RETURN_IF_ERROR(
-                _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep 
= create_plan_dep] {
-                    Defer defer {[&, dep = dep]() {
-                        std::unique_lock l(_lock);
-                        for (auto it : _create_plan_deps) {
-                            std::get<0>(it.second)->set_ready();
+    [[maybe_unused]] auto submit_st = _submit_create_group_commit_load();
+    return try_to_get_matched_queue();
+}
+
+Status GroupCommitTable::submit_create_group_commit_load() {
+    std::unique_lock l(_lock);
+    if (_create_plan_deps.empty()) {
+        return Status::OK();
+    }
+    return _submit_create_group_commit_load();
+}
+
+Status GroupCommitTable::_submit_create_group_commit_load() {
+    if (_is_creating_plan_fragment) {

Review Comment:
   [major] `group_commit_create_plan_timeout_ms` is not enforced while a 
create-plan task is already in flight. `_submit_create_group_commit_load()` 
returns here before the timeout check below, and there is no timed wakeup path, 
so loads already blocked on `create_plan_dep` can still wait past the 
configured timeout. Please release timed-out `_create_plan_deps` before this 
early return, or add a timer/scheduled wakeup that enforces the timeout while 
the create-plan task is running.



##########
be/src/load/group_commit/group_commit_mgr.cpp:
##########
@@ -277,31 +285,131 @@ Status GroupCommitTable::get_first_block_load_queue(
         return Status::OK();
     }
     create_plan_dep->block();
+    _create_plan_be_exe_version = be_exe_version;
+    if (_create_plan_deps.empty()) {
+        _create_plan_start_time_ms = MonotonicMillis();
+    }
     _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, 
put_block_dep,
                                                        base_schema_version, 
index_size));
-    if (!_is_creating_plan_fragment) {
-        _is_creating_plan_fragment = true;
-        RETURN_IF_ERROR(
-                _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep 
= create_plan_dep] {
-                    Defer defer {[&, dep = dep]() {
-                        std::unique_lock l(_lock);
-                        for (auto it : _create_plan_deps) {
-                            std::get<0>(it.second)->set_ready();
+    [[maybe_unused]] auto submit_st = _submit_create_group_commit_load();
+    return try_to_get_matched_queue();
+}
+
+Status GroupCommitTable::submit_create_group_commit_load() {
+    std::unique_lock l(_lock);
+    if (_create_plan_deps.empty()) {
+        return Status::OK();
+    }
+    return _submit_create_group_commit_load();
+}
+
+Status GroupCommitTable::_submit_create_group_commit_load() {
+    if (_is_creating_plan_fragment) {
+        return Status::OK();
+    }
+
+    int64_t timeout_ms = config::group_commit_create_plan_timeout_ms;
+    if (timeout_ms > 0 && !_create_plan_deps.empty()) {
+        int64_t now_ms = MonotonicMillis();
+        if (_create_plan_start_time_ms > 0 && now_ms - 
_create_plan_start_time_ms > timeout_ms) {
+            std::string last_create_plan_failed_reason = 
_create_plan_failed_reason;
+            _create_plan_failed_reason =
+                    ". group commit create plan timeout after " + 
std::to_string(timeout_ms) + "ms";
+            if (!last_create_plan_failed_reason.empty()) {
+                _create_plan_failed_reason +=
+                        ", last create plan error: " + 
last_create_plan_failed_reason;
+            }
+            for (const auto& [id, load_info] : _create_plan_deps) {
+                std::get<0>(load_info)->set_ready();
+            }
+            _create_plan_deps.clear();
+            _create_plan_start_time_ms = 0;
+            return Status::OK();
+        }
+    }
+
+    auto mem_tracker = _group_commit_mgr->group_commit_mem_tracker();
+    int be_exe_version = _create_plan_be_exe_version;
+    _is_creating_plan_fragment = true;
+    auto submit_st = _thread_pool->submit_func([&, be_exe_version, 
mem_tracker] {
+        std::shared_ptr<LoadBlockQueue> created_load_block_queue;
+        Status create_group_commit_st = Status::OK();
+        std::string create_plan_failed_reason;
+        Defer defer {[&]() {
+            bool need_resubmit = !create_group_commit_st.ok();
+            std::unique_lock l(_lock);
+            _is_creating_plan_fragment = false;
+            _create_plan_failed_reason = create_plan_failed_reason;
+            if (created_load_block_queue && create_group_commit_st.ok() &&
+                !created_load_block_queue->need_commit()) {
+                std::vector<UniqueId> success_load_ids;
+                for (const auto& [id, load_info] : _create_plan_deps) {
+                    auto create_dep = std::get<0>(load_info);
+                    auto put_dep = std::get<1>(load_info);
+                    if (created_load_block_queue->schema_version == 
std::get<2>(load_info) &&
+                        created_load_block_queue->index_size == 
std::get<3>(load_info)) {
+                        auto st = created_load_block_queue->add_load_id(id, 
put_dep);
+                        if (!st.ok()) {
+                            LOG(WARNING) << "failed to add pending load_id 
into created "
+                                            "group commit queue, load_id="
+                                         << id << ", label=" << 
created_load_block_queue->label
+                                         << ", status=" << st.to_string();
+                            need_resubmit = true;
+                        } else {
+                            create_dep->set_ready();
+                            success_load_ids.emplace_back(id);
                         }
-                        _create_plan_deps.clear();
-                        _is_creating_plan_fragment = false;
-                    }};
-                    auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
-                    if (!st.ok()) {
-                        LOG(WARNING) << "create group commit load error: " << 
st.to_string();
-                        _create_plan_failed_reason = ". create group commit 
load error: " +
-                                                     st.to_string().substr(0, 
300);
-                    } else {
-                        _create_plan_failed_reason = "";
+                    } else if (created_load_block_queue->schema_version > 
std::get<2>(load_info) ||
+                               (created_load_block_queue->schema_version ==
+                                        std::get<2>(load_info) &&
+                                created_load_block_queue->index_size != 
std::get<3>(load_info))) {
+                        // schema version mismatch:
+                        //   1. the schema version of created load block queue 
is newer than the load request
+                        //   2. the index size is not equal
+                        // set ready for the load request to let it fail
+                        create_dep->set_ready();
+                        success_load_ids.emplace_back(id);
                     }
-                }));
+                }
+                for (const auto& id : success_load_ids) {
+                    _create_plan_deps.erase(id);
+                }
+                if (_create_plan_deps.empty()) {
+                    _create_plan_start_time_ms = 0;
+                }
+            }
+            if (!_create_plan_deps.empty()) {

Review Comment:
   [minor] This resubmits create-plan immediately whenever pending deps remain. 
If `streamLoadPut` returns a deterministic error quickly, this can spin against 
FE repeatedly until `group_commit_create_plan_timeout_ms` expires. Please add a 
small backoff or scheduled retry interval before resubmitting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to