Copilot commented on code in PR #61130:
URL: https://github.com/apache/doris/pull/61130#discussion_r2902875823
##########
be/src/runtime/query_context.h:
##########
@@ -394,6 +395,10 @@ class QueryContext : public
std::enable_shared_from_this<QueryContext> {
std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*>
_cte_scan;
std::mutex _cte_scan_lock;
+ // for represent the rf's fragment execution round number of recursive cte
+ std::unordered_map<int, uint32_t> _filter_id_to_stage; // filter id ->
stage number
+ std::mutex __filter_id_to_stage_mtx;
+
Review Comment:
`__filter_id_to_stage_mtx` begins with a double underscore, which is a
reserved identifier in C++ and can lead to undefined behavior. Rename it to a
non-reserved identifier (e.g., `_filter_id_to_stage_mtx`) and update all uses.
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -179,9 +179,16 @@ void PipelineFragmentContext::cancel(const Status reason) {
{
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks >= _total_tasks) {
+ if (_need_notify_close) {
+ // if fragment cancelled and waiting for notify to close, need
to remove from fragment mgr
+ _exec_env->fragment_mgr()->remove_pipeline_context({_query_id,
_fragment_id});
+ _need_notify_close = false;
+ }
// All tasks in this PipelineXFragmentContext already closed.
return;
}
+ // make fragment release by self after cancel
+ _need_notify_close = false;
Review Comment:
In `cancel()`, calling `fragment_mgr()->remove_pipeline_context()` when
`_closed_tasks >= _total_tasks` can cause the fragment to be removed multiple
times (e.g. if a rerun path already removed it). `remove_pipeline_context()`
unconditionally decrements `g_fragment_executing_count`, so double-removal will
corrupt metrics. Consider making removal idempotent (only update metrics if the
context existed) or ensuring this path cannot run after an earlier removal.
##########
be/src/exec/pipeline/pipeline_fragment_context.h:
##########
@@ -128,11 +130,24 @@ class PipelineFragmentContext : public
TaskExecutionContext {
std::string get_load_error_url();
std::string get_first_error_msg();
- Status wait_close(bool close);
- Status rebuild(ThreadPool* thread_pool);
- Status set_to_rerun();
+ std::set<int> get_deregister_runtime_filter() const;
- bool need_notify_close() const { return _need_notify_close; }
+ Status listen_wait_close(const std::shared_ptr<brpc::ClosureGuard>& guard,
+ bool need_send_report_on_destruction) {
+ if (_wait_close_guard) {
+ return Status::InternalError("Already listening wait close");
+ }
+ if (!_need_notify_close) {
+ return Status::InternalError("Not need to listen wait close");
+ }
+ if (need_send_report_on_destruction) {
+ _need_notify_close = false;
+ return send_report(true);
+ } else {
+ _wait_close_guard = guard;
+ }
+ return Status::OK();
Review Comment:
`listen_wait_close(..., /*need_send_report_on_destruction=*/true)`
immediately calls `send_report(true)` and does not retain the `ClosureGuard`,
so the rerun_fragment RPC can return before tasks actually close. This breaks
the intended "wait for close" semantics (and can lead to double report / double
remove when the fragment later closes). Consider storing the guard and
deferring report/cleanup until `_close_fragment_instance()` (or otherwise
ensuring the RPC only completes after close).
##########
be/src/runtime/query_context.h:
##########
@@ -410,7 +415,28 @@ class QueryContext : public
std::enable_shared_from_this<QueryContext> {
timespec get_query_arrival_timestamp() const { return
this->_query_arrival_timestamp; }
QuerySource get_query_source() const { return this->_query_source; }
- const TQueryOptions get_query_options() const { return _query_options; }
+ TQueryOptions get_query_options() const { return _query_options; }
+
+ uint32_t get_stage(int filter_id) {
+ std::lock_guard<std::mutex> lock(__filter_id_to_stage_mtx);
+ return _filter_id_to_stage[filter_id];
Review Comment:
`get_stage()` uses `_filter_id_to_stage[filter_id]`, which inserts a new
entry on read. Since this is called on RPC paths (e.g. apply/merge/sync), it
can grow the map due to unexpected/invalid filter_ids and also mutates state in
what looks like a getter. Prefer `find()` and return a default stage (e.g. 0)
without insertion unless the filter_id is explicitly registered.
```suggestion
auto it = _filter_id_to_stage.find(filter_id);
if (it == _filter_id_to_stage.end()) {
return 0;
}
return it->second;
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1460,35 +1503,96 @@ Status FragmentMgr::transmit_rec_cte_block(
}
}
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>&
guard,
+ const TUniqueId& query_id, int fragment,
PRerunFragmentParams_Opcode stage) {
- if (auto q_ctx = get_query_ctx(query_id)) {
- SCOPED_ATTACH_TASK(q_ctx.get());
+ if (stage == PRerunFragmentParams::wait_for_destroy ||
+ stage == PRerunFragmentParams::final_close) {
auto fragment_ctx = _pipeline_map.find({query_id, fragment});
if (!fragment_ctx) {
return Status::NotFound("Fragment context (query-id: {},
fragment-id: {}) not found",
print_id(query_id), fragment);
}
- if (stage == PRerunFragmentParams::wait) {
- return fragment_ctx->wait_close(false);
- } else if (stage == PRerunFragmentParams::release) {
- return fragment_ctx->set_to_rerun();
- } else if (stage == PRerunFragmentParams::rebuild) {
- return fragment_ctx->rebuild(_thread_pool.get());
- } else if (stage == PRerunFragmentParams::submit) {
- return fragment_ctx->submit();
- } else if (stage == PRerunFragmentParams::close) {
- return fragment_ctx->wait_close(true);
- } else {
- return Status::InvalidArgument("Unknown rerun fragment opcode:
{}", stage);
+ if (stage == PRerunFragmentParams::wait_for_destroy) {
+ std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+ auto it = _rerunnable_params_map.find({query_id, fragment});
+ if (it == _rerunnable_params_map.end()) {
+ auto st = fragment_ctx->listen_wait_close(guard, true);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format(
+ "wait_for_destroy fragment context (query-id: {},
fragment-id: "
Review Comment:
In the `wait_for_destroy` path, `_rerunnable_params_lock` is held while
calling `fragment_ctx->listen_wait_close(...)` (which can call into
`send_report()` and other subsystems). This increases the risk of lock
inversion / long critical sections. Capture the NotFound decision under the
lock, then release the lock before invoking methods on `fragment_ctx`.
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1460,35 +1503,96 @@ Status FragmentMgr::transmit_rec_cte_block(
}
}
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>&
guard,
+ const TUniqueId& query_id, int fragment,
PRerunFragmentParams_Opcode stage) {
- if (auto q_ctx = get_query_ctx(query_id)) {
- SCOPED_ATTACH_TASK(q_ctx.get());
+ if (stage == PRerunFragmentParams::wait_for_destroy ||
+ stage == PRerunFragmentParams::final_close) {
auto fragment_ctx = _pipeline_map.find({query_id, fragment});
if (!fragment_ctx) {
return Status::NotFound("Fragment context (query-id: {},
fragment-id: {}) not found",
print_id(query_id), fragment);
}
- if (stage == PRerunFragmentParams::wait) {
- return fragment_ctx->wait_close(false);
- } else if (stage == PRerunFragmentParams::release) {
- return fragment_ctx->set_to_rerun();
- } else if (stage == PRerunFragmentParams::rebuild) {
- return fragment_ctx->rebuild(_thread_pool.get());
- } else if (stage == PRerunFragmentParams::submit) {
- return fragment_ctx->submit();
- } else if (stage == PRerunFragmentParams::close) {
- return fragment_ctx->wait_close(true);
- } else {
- return Status::InvalidArgument("Unknown rerun fragment opcode:
{}", stage);
+ if (stage == PRerunFragmentParams::wait_for_destroy) {
+ std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+ auto it = _rerunnable_params_map.find({query_id, fragment});
+ if (it == _rerunnable_params_map.end()) {
+ auto st = fragment_ctx->listen_wait_close(guard, true);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format(
+ "wait_for_destroy fragment context (query-id: {},
fragment-id: "
+ "{}) failed: {}",
+ print_id(query_id), fragment, st.to_string());
+ }
+ return Status::NotFound(
+ "Rerunnable params (query-id: {}, fragment-id: {}) not
found",
+ print_id(query_id), fragment);
+ }
+
+ it->second.deregister_runtime_filter_ids.merge(
+ fragment_ctx->get_deregister_runtime_filter());
+ }
+
+ auto* query_ctx = fragment_ctx->get_query_ctx();
+ SCOPED_ATTACH_TASK(query_ctx);
+ RETURN_IF_ERROR(
+ fragment_ctx->listen_wait_close(guard, stage ==
PRerunFragmentParams::final_close));
+ remove_pipeline_context({query_id, fragment});
Review Comment:
`final_close` currently calls `listen_wait_close(..., true)` and then
immediately `remove_pipeline_context(...)`. Since `listen_wait_close` (with
`true`) does not keep the `ClosureGuard`, the RPC may return before tasks are
closed; additionally, the fragment may later remove itself again in
`_close_fragment_instance()` after `_need_notify_close` is set false, causing a
second `remove_pipeline_context()` and incorrect metrics. Consider making
`final_close` actually wait for close via the guard (like `wait_for_destroy`)
and avoid removing the context until close is complete (or make removal
idempotent).
```suggestion
// For both wait_for_destroy and final_close, we need to actually
wait for close
// via the guard, so do not pass 'true' here (which would release
the ClosureGuard
// early). The 'true' case is handled only in the special-cased
branch above.
RETURN_IF_ERROR(fragment_ctx->listen_wait_close(guard, false));
// Only remove the pipeline context for wait_for_destroy; for
final_close, the
// fragment will remove itself on close completion to avoid double
removal and
// incorrect metrics.
if (stage == PRerunFragmentParams::wait_for_destroy) {
remove_pipeline_context({query_id, fragment});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]