This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 82c681595e2 [fix](local exchange) Fix local exchange blocked by a huge
data block… (#38693)
82c681595e2 is described below
commit 82c681595e2b592b58942d6e332ea5fb2fee4ac1
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 1 18:04:19 2024 +0800
[fix](local exchange) Fix local exchange blocked by a huge data block…
(#38693)
… (#38657)
If a huge block is push into local exchanger, it will be blocked due to
concurrent problems. This PR use a unique lock to resolve it .
---
.../local_exchange/local_exchange_sink_operator.h | 2 +
.../local_exchange_source_operator.cpp | 6 +-
.../local_exchange_source_operator.h | 2 +
.../pipeline_x/local_exchange/local_exchanger.cpp | 111 +++++++++------------
.../pipeline_x/local_exchange/local_exchanger.h | 14 +++
5 files changed, 71 insertions(+), 64 deletions(-)
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 99b88747a98..a32ecc21e00 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -49,6 +49,8 @@ private:
friend class BroadcastExchanger;
friend class PassToOneExchanger;
friend class AdaptivePassthroughExchanger;
+ template <typename BlockType>
+ friend class Exchanger;
ExchangerBase* _exchanger = nullptr;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 086a3b551fd..b3a28a6404f 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -66,11 +66,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders:
{}, _num_sources: {}, "
- "_running_sink_operators: {}, _running_source_operators:
{}, mem_usage: {}",
+ "_running_sink_operators: {}, _running_source_operators:
{}, mem_usage: {}, "
+ "data queue info: {}",
Base::debug_string(indentation_level), _channel_id,
_exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators,
_exchanger->_running_source_operators,
- _shared_state->mem_usage.load());
+ _shared_state->mem_usage.load(),
+ _exchanger->data_queue_debug_string(_channel_id));
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 7cefc1ca900..193b1c553f9 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -47,6 +47,8 @@ private:
friend class BroadcastExchanger;
friend class PassToOneExchanger;
friend class AdaptivePassthroughExchanger;
+ template <typename BlockType>
+ friend class Exchanger;
ExchangerBase* _exchanger = nullptr;
int _channel_id;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 7a044aaa77f..eb3875dcf7c 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -24,6 +24,37 @@
namespace doris::pipeline {
+template <typename BlockType>
+bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
+
LocalExchangeSinkLocalState& local_state,
+ BlockType&& block) {
+ std::unique_lock l(_m);
+ if (_data_queue[channel_id].enqueue(std::move(block))) {
+ local_state._shared_state->set_ready_to_read(channel_id);
+ return true;
+ }
+ return false;
+}
+
+template <typename BlockType>
+bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState&
local_state,
+ BlockType& block, bool* eos) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+ return true;
+ } else if (all_finished) {
+ *eos = true;
+ } else {
+ std::unique_lock l(_m);
+ if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+ return true;
+ }
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ local_state._dependency->block();
+ }
+ return false;
+}
+
Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
{
@@ -72,17 +103,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
return Status::OK();
};
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ if (_dequeue_data(local_state, partitioned_block, eos)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
- } else if (all_finished) {
- *eos = true;
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- local_state._dependency->block();
}
return Status::OK();
}
@@ -90,7 +115,6 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t*
__restrict channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state)
{
- auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
{
@@ -133,9 +157,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
- if (data_queue[it.second].enqueue({new_block_wrapper,
{row_idx, start, size}})) {
- local_state._shared_state->set_ready_to_read(it.second);
- } else {
+
+ if (!_enqueue_data_and_set_ready(it.second, local_state,
+ {new_block_wrapper, {row_idx,
start, size}})) {
local_state._shared_state->sub_mem_usage(
it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
@@ -152,10 +176,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(), false);
- if (data_queue[i % _num_sources].enqueue(
- {new_block_wrapper, {row_idx, start, size}})) {
- local_state._shared_state->set_ready_to_read(i %
_num_sources);
- } else {
+ if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
+ {new_block_wrapper, {row_idx,
start, size}})) {
local_state._shared_state->sub_mem_usage(
i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(),
false);
@@ -175,9 +197,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
- if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}})) {
- local_state._shared_state->set_ready_to_read(map[i]);
- } else {
+ if (!_enqueue_data_and_set_ready(map[i], local_state,
+ {new_block_wrapper, {row_idx,
start, size}})) {
local_state._shared_state->sub_mem_usage(
map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
@@ -201,9 +222,7 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
auto channel_id = (local_state._channel_id++) % _num_partitions;
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
- if (_data_queue[channel_id].enqueue(std::move(new_block))) {
- local_state._shared_state->set_ready_to_read(channel_id);
- } else {
+ if (!_enqueue_data_and_set_ready(channel_id, local_state,
std::move(new_block))) {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}
@@ -222,19 +241,13 @@ void
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ if (_dequeue_data(local_state, next_block, eos)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
- } else if (all_finished) {
- *eos = true;
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- local_state._dependency->block();
}
return Status::OK();
}
@@ -243,9 +256,7 @@ Status PassToOneExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
- if (_data_queue[0].enqueue(std::move(new_block))) {
- local_state._shared_state->set_ready_to_read(0);
- }
+ _enqueue_data_and_set_ready(0, local_state, std::move(new_block));
return Status::OK();
}
@@ -257,14 +268,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[0].try_dequeue(next_block)) {
+ if (_dequeue_data(local_state, next_block, eos)) {
*block = std::move(next_block);
- } else if (all_finished) {
- *eos = true;
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- local_state._dependency->block();
}
return Status::OK();
}
@@ -274,9 +279,7 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block =
vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0,
in_block->rows()));
- if (_data_queue[i].enqueue(mutable_block->to_block())) {
- local_state._shared_state->set_ready_to_read(i);
- }
+ _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
}
return Status::OK();
@@ -293,14 +296,8 @@ void
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ if (_dequeue_data(local_state, next_block, eos)) {
*block = std::move(next_block);
- } else if (all_finished) {
- *eos = true;
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- local_state._dependency->block();
}
return Status::OK();
}
@@ -316,9 +313,8 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
auto channel_id = (local_state._channel_id++) % _num_partitions;
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
- if (_data_queue[channel_id].enqueue(std::move(new_block))) {
- local_state._shared_state->set_ready_to_read(channel_id);
- } else {
+
+ if (!_enqueue_data_and_set_ready(channel_id, local_state,
std::move(new_block))) {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}
@@ -349,7 +345,6 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict
channel_ids,
vectorized::Block* block,
bool eos,
LocalExchangeSinkLocalState&
local_state) {
- auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
@@ -378,9 +373,7 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(i, memory_usage);
- if (data_queue[i].enqueue(std::move(new_block))) {
- local_state._shared_state->set_ready_to_read(i);
- } else {
+ if (!_enqueue_data_and_set_ready(i, local_state,
std::move(new_block))) {
local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
@@ -404,19 +397,13 @@ Status
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ if (_dequeue_data(local_state, next_block, eos)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
- } else if (all_finished) {
- *eos = true;
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- local_state._dependency->block();
}
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index ee0b5e286de..b1d1d1f2668 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -54,6 +54,8 @@ public:
virtual DependencySPtr get_local_state_dependency(int _channel_id) {
return nullptr; }
+ virtual std::string data_queue_debug_string(int i) = 0;
+
protected:
friend struct LocalExchangeSharedState;
friend struct ShuffleBlockWrapper;
@@ -114,9 +116,21 @@ public:
: ExchangerBase(running_sink_operators, num_sources,
num_partitions, free_block_limit) {
}
~Exchanger() override = default;
+ std::string data_queue_debug_string(int i) override {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {},
eos = {}]",
+ _data_queue[i].data_queue.size_approx(),
_data_queue[i].eos);
+ return fmt::to_string(debug_string_buffer);
+ }
protected:
+ bool _enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
+ BlockType&& block);
+ bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType&
block, bool* eos);
std::vector<BlockQueue<BlockType>> _data_queue;
+
+private:
+ std::mutex _m;
};
class LocalExchangeSourceLocalState;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]