Copilot commented on code in PR #60812:
URL: https://github.com/apache/doris/pull/60812#discussion_r2847233869
##########
be/src/runtime/task_execution_context.cpp:
##########
@@ -19,22 +19,45 @@
#include <glog/logging.h>
-#include <condition_variable>
-
namespace doris {
-void TaskExecutionContext::ref_task_execution_ctx() {
- ++_has_task_execution_ctx_ref_count;
+
+void TaskExecutionContext::init_sentinel() {
+ _rerun_wait_ctx = std::make_shared<RerunWaitContext>();
+ auto ctx = _rerun_wait_ctx; // shared_ptr copy captured by deleter
+ _sentinel = std::shared_ptr<void>(
+ reinterpret_cast<void*>(1), // dummy pointer, does not manage
actual resource
+ [ctx](void*) {
+ std::lock_guard<std::mutex> lk(ctx->mtx);
+ ctx->sentinel_destroyed = true;
+ ctx->cv.notify_all();
+ });
}
-void TaskExecutionContext::unref_task_execution_ctx() {
- --_has_task_execution_ctx_ref_count;
- if (_has_task_execution_ctx_ref_count == 0) {
- _notify_cv.notify_all();
+bool TaskExecutionContext::wait_for_sentinel_destruction(std::function<bool()>
is_cancelled) {
+ if (!_rerun_wait_ctx) {
+ return true;
+ }
+ std::unique_lock<std::mutex> lk(_rerun_wait_ctx->mtx);
+ while (!_rerun_wait_ctx->sentinel_destroyed) {
+ LOG(WARNING) << "Waiting for sentinel destruction";
Review Comment:
This LOG(WARNING) will be emitted every second while waiting for sentinel
destruction. Consider using LOG(INFO) or adding a counter to log only
periodically (e.g., every 10 iterations) to reduce log spam during long waits.
```suggestion
int wait_iterations = 0;
while (!_rerun_wait_ctx->sentinel_destroyed) {
if ((wait_iterations++ % 10) == 0) {
LOG(INFO) << "Waiting for sentinel destruction";
}
```
##########
gensrc/proto/internal_service.proto:
##########
@@ -75,11 +75,10 @@ message PTransmitRecCTEBlockResult {
message PRerunFragmentParams {
enum Opcode {
- wait = 1; // wait fragment execute done
- release = 2; // release current round's resource
- rebuild = 3; // rebuild next round pipeline tasks
- submit = 4; // submit tasks to execute
- close = 5; // close fragment
+ wait_for_close = 1; // wait for PFC close
+ wait_for_destroy = 2; // wait for external thread finished and destroy
PFC
+ recreate_and_submit = 3; // recreate PFC from saved params + submit
+ final_close = 4; // close fragment (final round)
Review Comment:
This is a breaking protocol change. The enum values have been renumbered
(wait=1 is now wait_for_close=1, but they have completely different semantics).
This means BE nodes running the new code cannot communicate with BE nodes
running the old code for recursive CTE operations. During a rolling upgrade, if
the FE sends rerun_fragment requests to a mix of old and new BE nodes, the old
BE nodes will misinterpret the commands and execute incorrect operations.
Consider either:
1. Adding new enum values (6, 7, 8, 9) for the new protocol while keeping
old values deprecated
2. Adding version checking to ensure all nodes are upgraded before enabling
this feature
3. Documenting that this requires a coordinated upgrade of all BE nodes
```suggestion
// Legacy opcodes (old protocol). These numeric values are kept for
// backward compatibility and must not be reused with new semantics.
// New code should avoid using these and prefer the new opcodes
below.
wait = 1;
wait_for_destroy_legacy = 2;
recreate_legacy = 3;
final_close_legacy = 4;
// New opcodes (new protocol). These use distinct numeric values to
// avoid collisions with the legacy protocol during rolling upgrades.
wait_for_close = 6; // wait for PFC close
wait_for_destroy = 7; // wait for external thread finished and
destroy PFC
recreate_and_submit = 8; // recreate PFC from saved params + submit
final_close = 9; // close fragment (final round)
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1450,33 +1457,104 @@ Status FragmentMgr::transmit_rec_cte_block(
Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
PRerunFragmentParams_Opcode stage) {
- if (auto q_ctx = get_query_ctx(query_id)) {
+ if (stage == PRerunFragmentParams::wait_for_close) {
+ auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+ if (!fragment_ctx) {
+ return Status::NotFound("Fragment context (query-id: {},
fragment-id: {}) not found",
+ print_id(query_id), fragment);
+ }
+ SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
+ RETURN_IF_ERROR(fragment_ctx->wait_close(false));
+ fragment_ctx->release_resource();
+ return Status::OK();
+ } else if (stage == PRerunFragmentParams::wait_for_destroy) {
+ auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+ if (!fragment_ctx) {
+ return Status::NotFound("Fragment context (query-id: {},
fragment-id: {}) not found",
+ print_id(query_id), fragment);
+ }
+ SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
+ // Wait for all external sentinel copies to be released
+ bool ok = fragment_ctx->wait_for_sentinel_destruction(
+ [&fragment_ctx]() { return
fragment_ctx->get_query_ctx()->is_cancelled(); });
+ if (!ok) {
+ return Status::Cancelled("Query has been cancelled while waiting
for sentinel");
+ }
+ remove_pipeline_context({query_id, fragment});
+ return Status::OK();
+ } else if (stage == PRerunFragmentParams::recreate_and_submit) {
+ auto q_ctx = get_query_ctx(query_id);
+ if (!q_ctx) {
+ return Status::NotFound(
+ "rerun_fragment: Query context (query-id: {}) not found,
maybe finished",
+ print_id(query_id));
+ }
SCOPED_ATTACH_TASK(q_ctx.get());
+ RerunableFragmentInfo info;
+ {
+ std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+ auto it = _rerunnable_params_map.find({query_id, fragment});
+ if (it == _rerunnable_params_map.end()) {
+ return Status::NotFound(
+ "Rerunnable params (query-id: {}, fragment-id: {}) not
found",
+ print_id(query_id), fragment);
+ }
+ info = it->second;
+ }
+
+ auto context = std::make_shared<pipeline::PipelineFragmentContext>(
+ q_ctx->query_id(), info.params, q_ctx, _exec_env,
info.finish_callback,
+ [this](const ReportStatusRequest& req, auto&& ctx) {
+ return this->trigger_pipeline_context_report(req,
std::move(ctx));
+ });
+
+ Status prepare_st = Status::OK();
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st =
context->prepare(_thread_pool.get()),
+ prepare_st);
+ if (!prepare_st.ok()) {
+ q_ctx->cancel(prepare_st, info.params.fragment_id);
+ return prepare_st;
+ }
+
+ // Update fragment executing count (remove_pipeline_context
decremented it)
+ {
+ int64_t now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_fragment_executing_count << 1;
+ g_fragment_last_active_time.set_value(now);
+ }
+
+ // Insert new PFC into _pipeline_map (old one was removed)
+ _pipeline_map.insert({info.params.query_id, info.params.fragment_id},
context);
+
+ // Update QueryContext mapping (must support overwrite)
+ q_ctx->set_pipeline_context(info.params.fragment_id, context);
+
+ RETURN_IF_ERROR(context->submit());
+ return Status::OK();
+
+ } else if (stage == PRerunFragmentParams::final_close) {
auto fragment_ctx = _pipeline_map.find({query_id, fragment});
if (!fragment_ctx) {
return Status::NotFound("Fragment context (query-id: {},
fragment-id: {}) not found",
print_id(query_id), fragment);
}
+ SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
- if (stage == PRerunFragmentParams::wait) {
- return fragment_ctx->wait_close(false);
- } else if (stage == PRerunFragmentParams::release) {
- return fragment_ctx->set_to_rerun();
- } else if (stage == PRerunFragmentParams::rebuild) {
- return fragment_ctx->rebuild(_thread_pool.get());
- } else if (stage == PRerunFragmentParams::submit) {
- return fragment_ctx->submit();
- } else if (stage == PRerunFragmentParams::close) {
- return fragment_ctx->wait_close(true);
- } else {
- return Status::InvalidArgument("Unknown rerun fragment opcode:
{}", stage);
+ // wait_close(true) internally does send_report +
remove_pipeline_context
+ RETURN_IF_ERROR(fragment_ctx->wait_close(true));
+
+ // Clean up saved params
+ {
+ std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+ _rerunnable_params_map.erase({query_id, fragment});
Review Comment:
Memory leak: If a recursive CTE query is cancelled or errors out before
reaching the final_close stage, the entry in _rerunnable_params_map will never
be cleaned up. This map stores a full copy of TPipelineFragmentParams,
TPipelineFragmentParamsList, the callback, and a shared_ptr to QueryContext,
which can consume significant memory.
Add cleanup logic in cancel_query or remove_query_context to iterate through
_rerunnable_params_map and erase entries matching the cancelled query_id.
Alternatively, consider using query_id as part of the map key and cleaning up
all fragments for a query when the query is cancelled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]