This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7296a21dbec [pipelineX](fix) Fix incorrect partition number (#29963)
7296a21dbec is described below
commit 7296a21dbecb67e92d0f07ff81a11e6aa7b6835e
Author: Gabriel <[email protected]>
AuthorDate: Mon Jan 15 11:49:45 2024 +0800
[pipelineX](fix) Fix incorrect partition number (#29963)
---
.../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index b479e1d9334..e2f1d9742b4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -728,21 +728,24 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
// 1. Create a new pipeline with local exchange sink.
DataSinkOperatorXPtr sink;
auto sink_id = next_sink_operator_id();
+ const bool is_shuffled_hash_join = operator_xs.size() > idx
+ ?
operator_xs[idx]->is_shuffled_hash_join()
+ :
cur_pipe->sink_x()->is_shuffled_hash_join();
sink.reset(new LocalExchangeSinkOperatorX(
- sink_id, local_exchange_id, _total_instances,
data_distribution.partition_exprs,
- bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
+ sink_id, local_exchange_id, is_shuffled_hash_join ?
_total_instances : _num_instances,
+ data_distribution.partition_exprs, bucket_seq_to_instance_idx,
+ shuffle_idx_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type,
num_buckets,
- operator_xs.size() > idx
- ?
operator_xs[idx]->is_shuffled_hash_join()
- :
cur_pipe->sink_x()->is_shuffled_hash_join()));
+ is_shuffled_hash_join));
// 2. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
- std::max(cur_pipe->num_tasks(), _num_instances),
_total_instances);
+ std::max(cur_pipe->num_tasks(), _num_instances),
+ is_shuffled_hash_join ? _total_instances : _num_instances);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]