This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 35f965532d1 [bugfix](becore) local exchange should check receiver's
local state to avoid core (#45470)
35f965532d1 is described below
commit 35f965532d1a1a538c1ea92ca5f241e6580d5461
Author: yiguolei <[email protected]>
AuthorDate: Tue Dec 17 18:47:29 2024 +0800
[bugfix](becore) local exchange should check receiver's local state to
avoid core (#45470)
_local_recvr depdend on pipeline::ExchangeLocalState* _parent to do some
memory counter settings
but it only owns a raw pointer, so that the ExchangeLocalState object
may be deconstructed.
So that I lock the local state to avoid it is deconstruted
---
be/src/vec/sink/vdata_stream_sender.cpp | 24 +++++++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index ceb7be95e40..a4a9e6cccf8 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -97,8 +97,14 @@ Status Channel::open(RuntimeState* state) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
if (!st.ok()) {
- // Recvr not found. Maybe downstream task is finished already.
- LOG(INFO) << "Recvr is not found : " << st.to_string();
+ // If could not find local receiver, then it means the channel is
EOF.
+ // Maybe downstream task is finished already.
+ //if (_receiver_status.ok()) {
+ // _receiver_status = Status::EndOfFile("local data stream
receiver is deconstructed");
+ //}
+ LOG(INFO) << "Query: " << print_id(state->query_id())
+ << " recvr is not found, maybe downstream task is
finished. error st is: "
+ << st.to_string();
}
}
_be_number = state->be_number();
@@ -191,8 +197,20 @@ Status Channel::send_local_block(Block* block, bool eos,
bool can_be_moved) {
if (is_receiver_eof()) {
return _receiver_status;
}
-
auto receiver_status = _recvr_status();
+ // _local_recvr depdend on pipeline::ExchangeLocalState* _parent to do
some memory counter settings
+ // but it only owns a raw pointer, so that the ExchangeLocalState object
may be deconstructed.
+ // Lock the fragment context to ensure the runtime state and other objects
are not deconstructed
+ TaskExecutionContextSPtr ctx_lock = nullptr;
+ if (receiver_status.ok() && _local_recvr != nullptr) {
+ ctx_lock = _local_recvr->task_exec_ctx();
+ // Do not return internal error, because when query finished, the
downstream node
+ // may finish before upstream node. And the object maybe
deconstructed. If return error
+ // then the upstream node may report error status to FE, the query is
failed.
+ if (ctx_lock == nullptr) {
+ receiver_status = Status::EndOfFile("local data stream receiver is
deconstructed");
+ }
+ }
if (receiver_status.ok()) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]