This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2f39ab1f8fc8658e280d5b0d3298494aa21abfc9 Author: 924060929 <[email protected]> AuthorDate: Thu Mar 26 17:20:58 2026 +0800 [fix](build) resolve BE rebase conflicts after rebasing on apache/master - local_exchanger.h: use TLocalPartitionType::type instead of undefined ExchangeType - local_exchange_sink_operator.cpp: remove vectorized:: namespace prefixes (removed upstream) - pipeline_fragment_context.cpp: replace ExchangeType::NOOP/HASH_SHUFFLE with TLocalPartitionType equivalents - partitioned_hash_join_probe_operator: remove unused _join_distribution field (required_data_distribution now delegates to inner operator) --- be/src/exec/exchange/local_exchange_sink_operator.cpp | 10 +++++----- be/src/exec/exchange/local_exchanger.h | 2 +- be/src/exec/operator/partitioned_hash_join_probe_operator.cpp | 2 -- be/src/exec/operator/partitioned_hash_join_probe_operator.h | 2 -- be/src/exec/pipeline/pipeline_fragment_context.cpp | 6 +++--- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/be/src/exec/exchange/local_exchange_sink_operator.cpp b/be/src/exec/exchange/local_exchange_sink_operator.cpp index 7da4c025f60..4745a6ccdb6 100644 --- a/be/src/exec/exchange/local_exchange_sink_operator.cpp +++ b/be/src/exec/exchange/local_exchange_sink_operator.cpp @@ -51,10 +51,10 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, TLocalPartitionType _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx; if (state->query_options().__isset.enable_new_shuffle_hash_method && state->query_options().enable_new_shuffle_hash_method) { - _partitioner = std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions); + _partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions); } else { _partitioner = std::make_unique< - vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + Crc32HashPartitioner<ShuffleChannelIds>>( _num_partitions); } RETURN_IF_ERROR(_partitioner->init(_texprs)); @@ -87,17 +87,17 @@ Status LocalExchangeSinkOperatorX::init_partitioner(RuntimeState* state) { _type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) { if (state->query_options().__isset.enable_new_shuffle_hash_method && state->query_options().enable_new_shuffle_hash_method) { - _partitioner = std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions); + _partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions); } else { _partitioner = std::make_unique< - vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + Crc32HashPartitioner<ShuffleChannelIds>>( _num_partitions); } RETURN_IF_ERROR(_partitioner->init(_texprs)); } else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { DCHECK_GT(_num_partitions, 0); _partitioner = - std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>( _num_partitions); RETURN_IF_ERROR(_partitioner->init(_texprs)); } diff --git a/be/src/exec/exchange/local_exchanger.h b/be/src/exec/exchange/local_exchanger.h index a04eb23ec24..3df5bb2c685 100644 --- a/be/src/exec/exchange/local_exchanger.h +++ b/be/src/exec/exchange/local_exchanger.h @@ -142,7 +142,7 @@ public: SourceInfo&& source_info) = 0; virtual Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) = 0; - virtual ExchangeType get_type() const = 0; + virtual TLocalPartitionType::type get_type() const = 0; // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. virtual void close(SourceInfo&& source_info) = 0; // Called if all local exchanger source operators are closed. We free the memory in diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp index d5267969cd4..5d553f5dbc3 100644 --- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp +++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp @@ -525,8 +525,6 @@ PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool* int operator_id, const DescriptorTbl& descs) : JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>(pool, tnode, operator_id, descs), - _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type - : TJoinDistributionType::NONE), _distribution_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector<TExpr> {}), diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.h b/be/src/exec/operator/partitioned_hash_join_probe_operator.h index f5b1a272e2b..3ed531962a4 100644 --- a/be/src/exec/operator/partitioned_hash_join_probe_operator.h +++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.h @@ -278,8 +278,6 @@ private: RuntimeState* state, Block* output_block, bool* eos) const; - const TJoinDistributionType::type _join_distribution; - std::shared_ptr<HashJoinBuildSinkOperatorX> _inner_sink_operator; std::shared_ptr<HashJoinProbeOperatorX> _inner_probe_operator; diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 61c6e093257..c935e157058 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -751,7 +751,7 @@ Status PipelineFragmentContext::_create_tree_helper( *root = op; } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). + * `TLocalPartitionType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -771,7 +771,7 @@ Status PipelineFragmentContext::_create_tree_helper( : op->is_shuffled_operator())) && Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) || (followed_by_shuffled_operator && - required_data_distribution.distribution_type == ExchangeType::NOOP); + required_data_distribution.distribution_type == TLocalPartitionType::NOOP); current_require_bucket_distribution = ((require_bucket_distribution || @@ -779,7 +779,7 @@ Status PipelineFragmentContext::_create_tree_helper( : op->is_colocated_operator())) && Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) || (require_bucket_distribution && - required_data_distribution.distribution_type == ExchangeType::NOOP); + required_data_distribution.distribution_type == TLocalPartitionType::NOOP); if (num_children == 0) { _use_serial_source = op->is_serial_operator(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
