This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2f39ab1f8fc8658e280d5b0d3298494aa21abfc9
Author: 924060929 <[email protected]>
AuthorDate: Thu Mar 26 17:20:58 2026 +0800

    [fix](build) resolve BE rebase conflicts after rebasing on apache/master
    
    - local_exchanger.h: use TLocalPartitionType::type instead of undefined 
ExchangeType
    - local_exchange_sink_operator.cpp: remove vectorized:: namespace prefixes 
(removed upstream)
    - pipeline_fragment_context.cpp: replace ExchangeType::NOOP/HASH_SHUFFLE 
with TLocalPartitionType equivalents
    - partitioned_hash_join_probe_operator: remove unused _join_distribution 
field (required_data_distribution now delegates to inner operator)
---
 be/src/exec/exchange/local_exchange_sink_operator.cpp         | 10 +++++-----
 be/src/exec/exchange/local_exchanger.h                        |  2 +-
 be/src/exec/operator/partitioned_hash_join_probe_operator.cpp |  2 --
 be/src/exec/operator/partitioned_hash_join_probe_operator.h   |  2 --
 be/src/exec/pipeline/pipeline_fragment_context.cpp            |  6 +++---
 5 files changed, 9 insertions(+), 13 deletions(-)

diff --git a/be/src/exec/exchange/local_exchange_sink_operator.cpp 
b/be/src/exec/exchange/local_exchange_sink_operator.cpp
index 7da4c025f60..4745a6ccdb6 100644
--- a/be/src/exec/exchange/local_exchange_sink_operator.cpp
+++ b/be/src/exec/exchange/local_exchange_sink_operator.cpp
@@ -51,10 +51,10 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* 
state, TLocalPartitionType
         _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
         if (state->query_options().__isset.enable_new_shuffle_hash_method &&
             state->query_options().enable_new_shuffle_hash_method) {
-            _partitioner = 
std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions);
+            _partitioner = 
std::make_unique<Crc32CHashPartitioner>(_num_partitions);
         } else {
             _partitioner = std::make_unique<
-                    
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                    Crc32HashPartitioner<ShuffleChannelIds>>(
                     _num_partitions);
         }
         RETURN_IF_ERROR(_partitioner->init(_texprs));
@@ -87,17 +87,17 @@ Status 
LocalExchangeSinkOperatorX::init_partitioner(RuntimeState* state) {
         _type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) {
         if (state->query_options().__isset.enable_new_shuffle_hash_method &&
             state->query_options().enable_new_shuffle_hash_method) {
-            _partitioner = 
std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions);
+            _partitioner = 
std::make_unique<Crc32CHashPartitioner>(_num_partitions);
         } else {
             _partitioner = std::make_unique<
-                    
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                    Crc32HashPartitioner<ShuffleChannelIds>>(
                     _num_partitions);
         }
         RETURN_IF_ERROR(_partitioner->init(_texprs));
     } else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
         DCHECK_GT(_num_partitions, 0);
         _partitioner =
-                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(
                         _num_partitions);
         RETURN_IF_ERROR(_partitioner->init(_texprs));
     }
diff --git a/be/src/exec/exchange/local_exchanger.h 
b/be/src/exec/exchange/local_exchanger.h
index a04eb23ec24..3df5bb2c685 100644
--- a/be/src/exec/exchange/local_exchanger.h
+++ b/be/src/exec/exchange/local_exchanger.h
@@ -142,7 +142,7 @@ public:
                              SourceInfo&& source_info) = 0;
     virtual Status sink(RuntimeState* state, Block* in_block, bool eos, 
Profile&& profile,
                         SinkInfo& sink_info) = 0;
-    virtual ExchangeType get_type() const = 0;
+    virtual TLocalPartitionType::type get_type() const = 0;
     // Called if a local exchanger source operator are closed. Free the unused 
data block in data_queue.
     virtual void close(SourceInfo&& source_info) = 0;
     // Called if all local exchanger source operators are closed. We free the 
memory in
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index d5267969cd4..5d553f5dbc3 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -525,8 +525,6 @@ 
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
                                                                      int 
operator_id,
                                                                      const 
DescriptorTbl& descs)
         : JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>(pool, tnode, 
operator_id, descs),
-          _join_distribution(tnode.hash_join_node.__isset.dist_type ? 
tnode.hash_join_node.dist_type
-                                                                    : 
TJoinDistributionType::NONE),
           _distribution_partition_exprs(tnode.__isset.distribute_expr_lists
                                                 ? 
tnode.distribute_expr_lists[0]
                                                 : std::vector<TExpr> {}),
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.h 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.h
index f5b1a272e2b..3ed531962a4 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.h
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.h
@@ -278,8 +278,6 @@ private:
                                                 RuntimeState* state, Block* 
output_block,
                                                 bool* eos) const;
 
-    const TJoinDistributionType::type _join_distribution;
-
     std::shared_ptr<HashJoinBuildSinkOperatorX> _inner_sink_operator;
     std::shared_ptr<HashJoinProbeOperatorX> _inner_probe_operator;
 
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 61c6e093257..c935e157058 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -751,7 +751,7 @@ Status PipelineFragmentContext::_create_tree_helper(
         *root = op;
     }
     /**
-     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed 
by a shuffled operator (shuffled hash join, union operator followed by 
co-located operators).
+     * `TLocalPartitionType::HASH_SHUFFLE` should be used if an operator is 
followed by a shuffled operator (shuffled hash join, union operator followed by 
co-located operators).
      *
      * For plan:
      * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
@@ -771,7 +771,7 @@ Status PipelineFragmentContext::_create_tree_helper(
                                              : op->is_shuffled_operator())) &&
              
Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
             (followed_by_shuffled_operator &&
-             required_data_distribution.distribution_type == 
ExchangeType::NOOP);
+             required_data_distribution.distribution_type == 
TLocalPartitionType::NOOP);
 
     current_require_bucket_distribution =
             ((require_bucket_distribution ||
@@ -779,7 +779,7 @@ Status PipelineFragmentContext::_create_tree_helper(
                                              : op->is_colocated_operator())) &&
              
Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
             (require_bucket_distribution &&
-             required_data_distribution.distribution_type == 
ExchangeType::NOOP);
+             required_data_distribution.distribution_type == 
TLocalPartitionType::NOOP);
 
     if (num_children == 0) {
         _use_serial_source = op->is_serial_operator();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to