This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fbe0e6e80e8 branch-3.0: [fix](local shuffle) Fix unbalanced data
distribution #44137 (#44234)
fbe0e6e80e8 is described below
commit fbe0e6e80e8f4b0d046f4fe21fa9ca65f46c4398
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 26 10:47:19 2024 +0800
branch-3.0: [fix](local shuffle) Fix unbalanced data distribution #44137
(#44234)
Cherry-picked from #44137
Co-authored-by: Gabriel <[email protected]>
---
.../local_exchange/local_exchange_sink_operator.cpp | 1 +
be/src/pipeline/local_exchange/local_exchanger.cpp | 13 -------------
be/src/pipeline/local_exchange/local_exchanger.h | 9 +++++----
be/src/pipeline/pipeline_fragment_context.cpp | 6 +++---
4 files changed, 9 insertions(+), 20 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 00fa7f5ae79..22007a4b220 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -62,6 +62,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type,
const int num_buckets
_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
+ DCHECK_GT(num_buckets, 0);
_partitioner.reset(
new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets));
RETURN_IF_ERROR(_partitioner->init(_texprs));
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index c9f98db26a9..824843d970c 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -226,19 +226,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
}
}
- } else if (_num_senders != _num_sources) {
- // In this branch, data just should be distributed equally into all
instances.
- new_block_wrapper->ref(_num_partitions);
- for (size_t i = 0; i < _num_partitions; i++) {
- uint32_t start = local_state._partition_rows_histogram[i];
- uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
- if (size > 0) {
- _enqueue_data_and_set_ready(i % _num_sources, local_state,
- {new_block_wrapper, {row_idx,
start, size}});
- } else {
- new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
- }
- }
} else {
DCHECK(!bucket_seq_to_instance_idx.empty());
new_block_wrapper->ref(_num_partitions);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index cc33efbb934..e8aa35c2f7c 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -219,9 +219,7 @@ protected:
ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
- free_block_limit) {
- _data_queue.resize(num_partitions);
- }
+ free_block_limit) {}
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, LocalExchangeSinkLocalState&
local_state);
};
@@ -231,7 +229,10 @@ class BucketShuffleExchanger final : public
ShuffleExchanger {
BucketShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
int free_block_limit)
: ShuffleExchanger(running_sink_operators, num_sources,
num_partitions,
- free_block_limit) {}
+ free_block_limit) {
+ DCHECK_GT(num_partitions, 0);
+ _data_queue.resize(std::max(num_partitions, num_sources));
+ }
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b7aa8518422..553e059d1a5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -938,9 +938,9 @@ Status PipelineFragmentContext::_plan_local_exchange(
// if 'num_buckets == 0' means the fragment is colocated by exchange
node not the
// scan node. so here use `_num_instance` to replace the `num_buckets`
to prevent dividing 0
// still keep colocate plan after local shuffle
- RETURN_IF_ERROR(_plan_local_exchange(
- _use_serial_source || num_buckets == 0 ? _num_instances :
num_buckets, pip_idx,
- _pipelines[pip_idx], bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
+ RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx,
_pipelines[pip_idx],
+ bucket_seq_to_instance_idx,
+ shuffle_idx_to_instance_idx));
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]