This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bd5844ea0d3 [Improvement](local exchange) optimize broadcast local
exchanger (#39402)
bd5844ea0d3 is described below
commit bd5844ea0d37ddc8d31673e06d2e983a58f7429f
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 19 11:50:37 2024 +0800
[Improvement](local exchange) optimize broadcast local exchanger (#39402)
Currently, data blocks are sinked in sink operators and copied to
multiple downstream source operators in broadcast local exchanger. This
PR change it to copy-when-pull mode, which means source operators get
this data block when it could.
---
.../local_exchange_source_operator.cpp | 21 +++++++-
.../local_exchange_source_operator.h | 2 +
be/src/pipeline/local_exchange/local_exchanger.cpp | 56 +++++++++++++++-------
be/src/pipeline/local_exchange/local_exchanger.h | 11 +++--
4 files changed, 69 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 0cffe125a1f..2d20b8f365c 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -36,6 +36,18 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
_copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
}
+ if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT &&
_channel_id == 0) {
+ _local_merge_deps = _shared_state->get_dep_by_channel_id(_channel_id);
+ DCHECK_GT(_local_merge_deps.size(), 1);
+ _deps_counter.resize(_local_merge_deps.size());
+ static const std::string timer_name = "WaitForDependencyTime";
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
timer_name, 1);
+ for (size_t i = 0; i < _deps_counter.size(); i++) {
+ _deps_counter[i] = _runtime_profile->add_nonzero_counter(
+ fmt::format("WaitForData{}", i), TUnit ::TIME_NS,
timer_name, 1);
+ }
+ }
+
return Status::OK();
}
@@ -44,6 +56,10 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
return Status::OK();
}
+ for (size_t i = 0; i < _local_merge_deps.size(); i++) {
+ COUNTER_SET(_deps_counter[i],
_local_merge_deps[i]->watcher_elapse_time());
+ }
+
if (_exchanger) {
_exchanger->close(*this);
}
@@ -51,6 +67,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
_shared_state->sub_running_source_operators(*this);
}
+ std::vector<DependencySPtr> {}.swap(_local_merge_deps);
return Base::close(state);
}
@@ -60,9 +77,9 @@ std::vector<Dependency*>
LocalExchangeSourceLocalState::dependencies() const {
// set dependencies ready
std::vector<Dependency*> deps;
auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
- DCHECK_GT(le_deps.size(), 1);
+ DCHECK_GT(_local_merge_deps.size(), 1);
// If this is a local merge exchange, we should use all dependencies
here.
- for (auto& dep : le_deps) {
+ for (auto& dep : _local_merge_deps) {
deps.push_back(dep.get());
}
return deps;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index f6c043d44e4..7bf92add63d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -57,6 +57,8 @@ private:
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
+ std::vector<RuntimeProfile::Counter*> _deps_counter;
+ std::vector<DependencySPtr> _local_merge_deps;
};
class LocalExchangeSourceOperatorX final : public
OperatorX<LocalExchangeSourceLocalState> {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index e256419688e..e10da2beb72 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -35,7 +35,8 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int
channel_id,
// PartitionedBlock will be push into multiple queues with different row
ranges, so it will be
// referenced multiple times. Otherwise, we only ref the block once
because it is only push into
// one queue.
- if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+ std::is_same_v<BroadcastBlock, BlockType>) {
allocated_bytes = block.first->data_block.allocated_bytes();
} else {
block->ref(1);
@@ -50,7 +51,8 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int
channel_id,
local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
// `enqueue(block)` return false iff this queue's source operator is
already closed so we
// just unref the block.
- if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+ std::is_same_v<BroadcastBlock, BlockType>) {
block.first->unref(local_state._shared_state, allocated_bytes);
} else {
block->unref(local_state._shared_state, allocated_bytes);
@@ -71,7 +73,8 @@ bool
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
int channel_id) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[channel_id].try_dequeue(block)) {
- if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+ std::is_same_v<BroadcastBlock, BlockType>) {
local_state._shared_state->sub_mem_usage(channel_id,
block.first->data_block.allocated_bytes());
} else {
@@ -86,7 +89,8 @@ bool
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
} else {
std::unique_lock l(_m);
if (_data_queue[channel_id].try_dequeue(block)) {
- if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+ std::is_same_v<BroadcastBlock, BlockType>) {
local_state._shared_state->sub_mem_usage(channel_id,
block.first->data_block.allocated_bytes());
} else {
@@ -135,7 +139,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
PartitionedBlock partitioned_block;
vectorized::MutableBlock mutable_block;
- auto get_data = [&](vectorized::Block* result_block) -> Status {
+ auto get_data = [&]() -> Status {
do {
const auto* offset_start =
partitioned_block.second.row_idxs->data() +
partitioned_block.second.offset_start;
@@ -152,7 +156,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
- RETURN_IF_ERROR(get_data(block));
+ RETURN_IF_ERROR(get_data());
}
return Status::OK();
}
@@ -374,30 +378,50 @@ Status LocalMergeSortExchanger::get_block(RuntimeState*
state, vectorized::Block
Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
+ if (in_block->empty()) {
+ return Status::OK();
+ }
+ vectorized::Block new_block;
+ if (!_free_blocks.try_dequeue(new_block)) {
+ new_block = {in_block->clone_empty()};
+ }
+ new_block.swap(*in_block);
+ auto wrapper = BlockWrapper::create_shared(std::move(new_block));
+
local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes());
+ wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
- auto mutable_block =
vectorized::MutableBlock::create_unique(in_block->clone_empty());
- RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0,
in_block->rows()));
- _enqueue_data_and_set_ready(i, local_state,
-
BlockWrapper::create_shared(mutable_block->to_block()));
+ _enqueue_data_and_set_ready(i, local_state, {wrapper, {0,
wrapper->data_block.rows()}});
}
return Status::OK();
}
void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
- vectorized::Block next_block;
+ BroadcastBlock partitioned_block;
bool eos;
- BlockWrapperSPtr wrapper;
+ vectorized::Block block;
_data_queue[local_state._channel_id].set_eos();
- while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
- next_block = vectorized::Block();
+ while (_dequeue_data(local_state, partitioned_block, &eos, &block)) {
+ partitioned_block.first->unref(local_state._shared_state);
}
}
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
- BlockWrapperSPtr next_block;
- _dequeue_data(local_state, next_block, eos, block);
+ BroadcastBlock partitioned_block;
+
+ if (_dequeue_data(local_state, partitioned_block, eos, block)) {
+ SCOPED_TIMER(local_state._copy_data_timer);
+ vectorized::MutableBlock mutable_block =
+ vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ block, partitioned_block.first->data_block);
+ auto block_wrapper = partitioned_block.first;
+ RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block,
+
partitioned_block.second.offset_start,
+
partitioned_block.second.length));
+ block_wrapper->unref(local_state._shared_state);
+ }
+
return Status::OK();
}
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index dfb5c31fff8..71c388b2323 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -94,6 +94,12 @@ struct PartitionedRowIdxs {
using PartitionedBlock = std::pair<std::shared_ptr<BlockWrapper>,
PartitionedRowIdxs>;
+struct RowRange {
+ uint32_t offset_start;
+ size_t length;
+};
+using BroadcastBlock = std::pair<std::shared_ptr<BlockWrapper>, RowRange>;
+
template <typename BlockType>
struct BlockQueue {
std::atomic<bool> eos = false;
@@ -304,12 +310,11 @@ private:
std::vector<std::atomic_int64_t> _queues_mem_usege;
};
-class BroadcastExchanger final : public Exchanger<BlockWrapperSPtr> {
+class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
BroadcastExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
- free_block_limit) {
+ : Exchanger<BroadcastBlock>(running_sink_operators,
num_partitions, free_block_limit) {
_data_queue.resize(num_partitions);
}
~BroadcastExchanger() override = default;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]