This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 70da7856f4d [Improvement](local shuffle) Reduce locking scope in local
exchanger … (#46293)
70da7856f4d is described below
commit 70da7856f4db91b21f40053cefa7b9c4e9102e52
Author: Gabriel <[email protected]>
AuthorDate: Thu Jan 2 20:07:14 2025 +0800
[Improvement](local shuffle) Reduce locking scope in local exchanger …
(#46293)
…(#46251)
Reduce lock scope from global level to data queue level.
---
be/src/pipeline/local_exchange/local_exchanger.cpp | 4 +--
be/src/pipeline/local_exchange/local_exchanger.h | 42 ++++++++++------------
2 files changed, 21 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 824843d970c..fa34b6a4040 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -42,7 +42,7 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int
channel_id,
block->ref(1);
allocated_bytes = block->data_block.allocated_bytes();
}
- std::unique_lock l(_m);
+ std::unique_lock l(*_m[channel_id]);
local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
!std::is_same_v<PartitionedBlock,
BlockType> &&
!std::is_same_v<BroadcastBlock, BlockType>);
@@ -90,7 +90,7 @@ bool
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
} else if (all_finished) {
*eos = true;
} else {
- std::unique_lock l(_m);
+ std::unique_lock l(*_m[channel_id]);
if (_data_queue[channel_id].try_dequeue(block)) {
if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
std::is_same_v<BroadcastBlock, BlockType>) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 274e7b404aa..9909161bd26 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -142,9 +142,20 @@ template <typename BlockType>
class Exchanger : public ExchangerBase {
public:
Exchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : ExchangerBase(running_sink_operators, num_partitions,
free_block_limit) {}
+ : ExchangerBase(running_sink_operators, num_partitions,
free_block_limit) {
+ _data_queue.resize(num_partitions);
+ _m.resize(num_partitions);
+ for (size_t i = 0; i < num_partitions; i++) {
+ _m[i] = std::make_unique<std::mutex>();
+ }
+ }
Exchanger(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
: ExchangerBase(running_sink_operators, num_sources,
num_partitions, free_block_limit) {
+ _data_queue.resize(num_sources);
+ _m.resize(num_sources);
+ for (size_t i = 0; i < num_sources; i++) {
+ _m[i] = std::make_unique<std::mutex>();
+ }
}
~Exchanger() override = default;
std::string data_queue_debug_string(int i) override {
@@ -161,9 +172,7 @@ protected:
bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType&
block, bool* eos,
vectorized::Block* data_block, int channel_id);
std::vector<BlockQueue<BlockType>> _data_queue;
-
-private:
- std::mutex _m;
+ std::vector<std::unique_ptr<std::mutex>> _m;
};
class LocalExchangeSourceLocalState;
@@ -217,7 +226,6 @@ public:
free_block_limit) {
DCHECK_GT(num_partitions, 0);
DCHECK_GT(num_sources, 0);
- _data_queue.resize(num_sources);
}
~ShuffleExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -238,9 +246,7 @@ class BucketShuffleExchanger final : public
ShuffleExchanger {
BucketShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
int free_block_limit)
: ShuffleExchanger(running_sink_operators, num_sources,
num_partitions,
- free_block_limit) {
- DCHECK_GT(num_partitions, 0);
- }
+ free_block_limit) {}
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
@@ -250,9 +256,7 @@ public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
PassthroughExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
: Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
- free_block_limit) {
- _data_queue.resize(num_partitions);
- }
+ free_block_limit) {}
~PassthroughExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@@ -268,9 +272,7 @@ public:
ENABLE_FACTORY_CREATOR(PassToOneExchanger);
PassToOneExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
: Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
- free_block_limit) {
- _data_queue.resize(num_partitions);
- }
+ free_block_limit) {}
~PassToOneExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@@ -287,9 +289,7 @@ public:
LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
int running_sink_operators, int num_partitions,
int free_block_limit)
: Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions, free_block_limit),
- _sort_source(std::move(sort_source)) {
- _data_queue.resize(num_partitions);
- }
+ _sort_source(std::move(sort_source)) {}
~LocalMergeSortExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@@ -313,9 +313,7 @@ class BroadcastExchanger final : public
Exchanger<BroadcastBlock> {
public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
BroadcastExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger<BroadcastBlock>(running_sink_operators,
num_partitions, free_block_limit) {
- _data_queue.resize(num_partitions);
- }
+ : Exchanger<BroadcastBlock>(running_sink_operators,
num_partitions, free_block_limit) {}
~BroadcastExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@@ -334,9 +332,7 @@ public:
AdaptivePassthroughExchanger(int running_sink_operators, int
num_partitions,
int free_block_limit)
: Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
- free_block_limit) {
- _data_queue.resize(num_partitions);
- }
+ free_block_limit) {}
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]