This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1d2dbe7898 [Bug][Pipeline] Run clickbench dead lock in pipeline exec
engine (#18211)
1d2dbe7898 is described below
commit 1d2dbe78989e43976cce5327538b30d5303b9fe9
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 30 21:41:57 2023 +0800
[Bug][Pipeline] Run clickbench dead lock in pipeline exec engine (#18211)
In pipeline exec engine run clickbench may dead lock in some query
---
be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +++++
be/src/vec/runtime/vdata_stream_recvr.h | 4 ++++
be/src/vec/sink/vdata_stream_sender.h | 12 ++++++------
3 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 79cb7b5cfd..c3bb910d70 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -373,6 +373,11 @@ void VDataStreamRecvr::add_block(Block* block, int
sender_id, bool use_move) {
_sender_queues[use_sender_id]->add_block(block, use_move);
}
+bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
+ int use_sender_id = _is_merging ? sender_id : 0;
+ return _sender_queues[use_sender_id]->queue_empty();
+}
+
bool VDataStreamRecvr::ready_to_read() {
for (size_t i = 0; i < _sender_queues.size(); i++) {
if (_sender_queues[i]->should_wait()) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 1fc635a7f7..66941dae48 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -68,6 +68,8 @@ public:
void add_block(Block* block, int sender_id, bool use_move);
+ bool sender_queue_empty(int sender_id);
+
bool ready_to_read();
Status get_next(Block* block, bool* eos);
@@ -174,6 +176,8 @@ public:
void close();
+ bool queue_empty() { return _block_queue_empty; }
+
protected:
virtual void _update_block_queue_empty() {}
Status _inner_get_batch(Block* block, bool* eos);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 5bf116af12..d0357cdbcc 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -233,8 +233,7 @@ public:
_need_close(false),
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
-
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
- _capacity(std::max(1, buffer_size /
std::max(_row_desc.get_row_size(), 1))) {
+
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch)
{
std::string localhost = BackendOptions::get_localhost();
_is_local = (_brpc_dest_addr.hostname == localhost) &&
(_brpc_dest_addr.port == config::brpc_port);
@@ -292,8 +291,6 @@ public:
return uid.to_string();
}
- TUniqueId get_fragment_instance_id() const { return _fragment_instance_id;
}
-
bool is_local() const { return _is_local; }
virtual void ch_roll_pb_block();
@@ -302,7 +299,11 @@ public:
if (!is_local()) {
return true;
}
- return !_local_recvr || _local_recvr->is_closed() ||
!_local_recvr->exceeds_limit(0);
+
+ // if local recvr queue mem over the exchange node mem limit, we must
ensure each queue
+ // has one block to do merge sort in exchange node to prevent the
logic dead lock
+ return !_local_recvr || _local_recvr->is_closed() ||
!_local_recvr->exceeds_limit(0) ||
+ _local_recvr->sender_queue_empty(_parent->_sender_id);
}
protected:
@@ -363,7 +364,6 @@ protected:
bool _send_query_statistics_with_every_batch;
RuntimeState* _state;
- size_t _capacity;
bool _is_local;
std::shared_ptr<VDataStreamRecvr> _local_recvr;
// serialized blocks for broadcasting; we need two so we can write
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]