This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit d867c7e0adf49d77ff60e23d44fc3863304af0af Author: Gabriel <[email protected]> AuthorDate: Fri Mar 20 21:27:31 2026 +0800 [refactor](local shuffle) Plan local exchanger in FE (BE part) Extend the BE pipeline local exchange infrastructure to support FE-planned local exchanges via a new LOCAL_EXCHANGE_NODE plan node type: - Add LOCAL_EXCHANGE_NODE handling in pipeline_fragment_context: create LocalExchangeSharedState, wire LOCAL_EXCHANGE_NODE into pipeline splits similar to how join build/probe sides are split - LocalExchangeSinkOperatorX: set _name with exchange type string for correct profile naming (LOCAL_EXCHANGE_SINK_OPERATOR(PASSTHROUGH) etc.) - Add update_operator() virtual method to OperatorXBase/StatefulOperatorX so FE-planned exchanges can propagate followed_by_shuffled_operator and require_bucket_distribution flags to BE operators at plan-time - Expose required_data_distribution() and is_shuffled_operator() on all sink/source operator types to enable FE/BE consistency verification - pipeline.h: add need_to_local_exchange() and data_distribution tracking for single-instance fragment early-return (skip local exchange when num_instances <= 1) - Thrift: add LOCAL_EXCHANGE_NODE to TPlanNodeType; add TLocalExchangeNode and TLocalPartitionType for PASSTHROUGH/HASH/BUCKET_HASH/PASS_TO_ONE etc. --- .../exec/exchange/local_exchange_sink_operator.cpp | 73 +++++++--- .../exec/exchange/local_exchange_sink_operator.h | 28 +++- .../exchange/local_exchange_source_operator.cpp | 5 +- .../exec/exchange/local_exchange_source_operator.h | 38 ++++- be/src/exec/exchange/local_exchanger.h | 40 ++--- be/src/exec/operator/aggregation_sink_operator.h | 8 +- be/src/exec/operator/analytic_sink_operator.h | 8 +- be/src/exec/operator/assert_num_rows_operator.h | 2 +- .../distinct_streaming_aggregation_operator.h | 10 +- be/src/exec/operator/exchange_source_operator.h | 8 +- be/src/exec/operator/hashjoin_build_sink.h | 12 +- be/src/exec/operator/hashjoin_probe_operator.h | 14 +- .../operator/nested_loop_join_build_operator.h | 6 +- .../operator/nested_loop_join_probe_operator.h | 4 +- be/src/exec/operator/operator.cpp | 4 +- be/src/exec/operator/operator.h | 6 +- .../exec/operator/partition_sort_sink_operator.h | 4 +- .../partitioned_hash_join_probe_operator.h | 12 +- .../operator/partitioned_hash_join_sink_operator.h | 6 +- .../exec/operator/rec_cte_anchor_sink_operator.h | 2 +- be/src/exec/operator/rec_cte_sink_operator.h | 2 +- be/src/exec/operator/rec_cte_source_operator.h | 2 +- be/src/exec/operator/scan_operator.h | 4 +- be/src/exec/operator/set_probe_sink_operator.h | 6 +- be/src/exec/operator/set_sink_operator.h | 4 +- be/src/exec/operator/set_source_operator.h | 4 +- be/src/exec/operator/sort_sink_operator.h | 10 +- .../exec/operator/streaming_aggregation_operator.h | 7 +- be/src/exec/operator/table_function_operator.h | 2 +- be/src/exec/operator/union_sink_operator.h | 4 +- be/src/exec/operator/union_source_operator.h | 4 +- be/src/exec/pipeline/dependency.h | 50 +++---- be/src/exec/pipeline/pipeline.cpp | 3 +- be/src/exec/pipeline/pipeline.h | 11 +- be/src/exec/pipeline/pipeline_fragment_context.cpp | 162 +++++++++++++++++++-- be/src/runtime/runtime_state.h | 7 +- be/test/exec/pipeline/local_exchanger_test.cpp | 10 +- be/test/exec/pipeline/pipeline_test.cpp | 21 +-- gensrc/thrift/PaloInternalService.thrift | 2 + gensrc/thrift/Partitions.thrift | 36 +++++ gensrc/thrift/PlanNodes.thrift | 22 ++- 41 files changed, 462 insertions(+), 201 deletions(-) diff --git a/be/src/exec/exchange/local_exchange_sink_operator.cpp b/be/src/exec/exchange/local_exchange_sink_operator.cpp index 0a11596cfee..7da4c025f60 100644 --- a/be/src/exec/exchange/local_exchange_sink_operator.cpp +++ b/be/src/exec/exchange/local_exchange_sink_operator.cpp @@ -37,24 +37,31 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const { return deps; } -Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, - const int num_buckets, const bool use_global_hash_shuffle, +Status LocalExchangeSinkOperatorX::init(RuntimeState* state, TLocalPartitionType::type type, + const int num_buckets, const std::map<int, int>& shuffle_idx_to_instance_idx) { + DCHECK(!_planned_by_fe); _name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + ")"; _type = type; - if (_type == ExchangeType::HASH_SHUFFLE) { - _shuffle_idx_to_instance_idx.clear(); - _use_global_shuffle = use_global_hash_shuffle; + if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) { // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (use_global_hash_shuffle) { - _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx; + _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx; + if (state->query_options().__isset.enable_new_shuffle_hash_method && + state->query_options().enable_new_shuffle_hash_method) { + _partitioner = std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions); } else { - for (int i = 0; i < _num_partitions; i++) { - _shuffle_idx_to_instance_idx[i] = i; - } + _partitioner = std::make_unique< + vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + _num_partitions); + } + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } else if (_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) { + _shuffle_idx_to_instance_idx.clear(); + for (int i = 0; i < _num_partitions; i++) { + _shuffle_idx_to_instance_idx[i] = i; } if (state->query_options().__isset.enable_new_shuffle_hash_method && state->query_options().enable_new_shuffle_hash_method) { @@ -64,7 +71,7 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions); } RETURN_IF_ERROR(_partitioner->init(_texprs)); - } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + } else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { DCHECK_GT(num_buckets, 0); _partitioner = std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(num_buckets); RETURN_IF_ERROR(_partitioner->init(_texprs)); @@ -72,9 +79,36 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, return Status::OK(); } +Status LocalExchangeSinkOperatorX::init_partitioner(RuntimeState* state) { + DCHECK(_planned_by_fe); + // Set operator name to include exchange type (base class init(tnode) only sets generic name). + _name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(_type) + ")"; + if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE || + _type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) { + if (state->query_options().__isset.enable_new_shuffle_hash_method && + state->query_options().enable_new_shuffle_hash_method) { + _partitioner = std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions); + } else { + _partitioner = std::make_unique< + vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + _num_partitions); + } + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { + DCHECK_GT(_num_partitions, 0); + _partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + _num_partitions); + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } + return Status::OK(); +} + Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state)); - if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE || + _type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE || + _type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); } @@ -88,11 +122,6 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(custom_profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(custom_profile(), "DistributeDataTime"); - if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) { - custom_profile()->add_info_string( - "UseGlobalShuffle", - std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle)); - } custom_profile()->add_info_string( "PartitionExprsSize", std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num)); @@ -107,8 +136,7 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); - if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || - _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (is_shuffled_exchange(_exchanger->get_type())) { auto& p = _parent->cast<LocalExchangeSinkOperatorX>(); RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); } @@ -131,12 +159,11 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, " + "{}, _channel_id: {}, _num_partitions: {}, " "_num_senders: {}, _num_sources: {}, " "_running_sink_operators: {}, _running_source_operators: {}", - Base::debug_string(indentation_level), - _parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id, - _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, + Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, + _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/exec/exchange/local_exchange_sink_operator.h b/be/src/exec/exchange/local_exchange_sink_operator.h index 01b958645d3..3ff2b70e588 100644 --- a/be/src/exec/exchange/local_exchange_sink_operator.h +++ b/be/src/exec/exchange/local_exchange_sink_operator.h @@ -79,6 +79,17 @@ public: _texprs(texprs), _partitioned_exprs_num(texprs.size()), _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {} + + LocalExchangeSinkOperatorX(int operator_id, int dest_id, const TPlanNode& tnode, + int num_partitions, + const std::map<int, int>& shuffle_id_to_instance_idx) + : Base(operator_id, tnode, dest_id), + _type(tnode.local_exchange_node.partition_type), + _num_partitions(num_partitions), + _texprs(tnode.local_exchange_node.distribute_expr_lists), + _partitioned_exprs_num(tnode.local_exchange_node.distribute_expr_lists.size()), + _shuffle_idx_to_instance_idx(shuffle_id_to_instance_idx), + _planned_by_fe(true) {} #ifdef BE_TEST LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs, const std::map<int, int>& bucket_seq_to_instance_idx) @@ -89,18 +100,19 @@ public: _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {} #endif - Status init(const TPlanNode& tnode, RuntimeState* state) override { - return Status::InternalError("{} should not init with TPlanNode", Base::_name); - } - Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(RuntimeState* state, ExchangeType type, const int num_buckets, - const bool use_global_hash_shuffle, + Status init(RuntimeState* state, TLocalPartitionType::type type, const int num_buckets, const std::map<int, int>& shuffle_idx_to_instance_idx) override; + // Initialize partitioner for FE-planned local exchange nodes. The FE-planned constructor + // already sets _type, _num_partitions, _texprs, and _shuffle_idx_to_instance_idx from the + // TPlanNode, but does not create the partitioner. This method creates the partitioner so + // that prepare() can call _partitioner->prepare() without null dereference. + Status init_partitioner(RuntimeState* state); + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, Block* in_block, bool eos) override; @@ -115,13 +127,13 @@ public: private: friend class LocalExchangeSinkLocalState; friend class ShuffleExchanger; - ExchangeType _type; + TLocalPartitionType::type _type; const int _num_partitions; const std::vector<TExpr>& _texprs; const size_t _partitioned_exprs_num; std::unique_ptr<PartitionerBase> _partitioner; std::map<int, int> _shuffle_idx_to_instance_idx; - bool _use_global_shuffle = false; + const bool _planned_by_fe = false; }; } // namespace doris diff --git a/be/src/exec/exchange/local_exchange_source_operator.cpp b/be/src/exec/exchange/local_exchange_source_operator.cpp index ad092656f21..d6bf8ac7e21 100644 --- a/be/src/exec/exchange/local_exchange_source_operator.cpp +++ b/be/src/exec/exchange/local_exchange_source_operator.cpp @@ -31,8 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& DCHECK(_exchanger != nullptr); _get_block_failed_counter = ADD_COUNTER_WITH_LEVEL(custom_profile(), "GetBlockFailedTime", TUnit::UNIT, 1); - if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || - _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (is_shuffled_exchange(_exchanger->get_type())) { _copy_data_timer = ADD_TIMER(custom_profile(), "CopyDataTime"); } @@ -60,7 +59,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { } std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const { - if ((_exchanger->get_type() == ExchangeType::PASS_TO_ONE) && _channel_id != 0) { + if ((_exchanger->get_type() == TLocalPartitionType::PASS_TO_ONE) && _channel_id != 0) { // If this is a PASS_TO_ONE exchange and is not the first task, source operators always // return empty result so no dependencies here. return {}; diff --git a/be/src/exec/exchange/local_exchange_source_operator.h b/be/src/exec/exchange/local_exchange_source_operator.h index 58252b24ec2..2f3fc2d5294 100644 --- a/be/src/exec/exchange/local_exchange_source_operator.h +++ b/be/src/exec/exchange/local_exchange_source_operator.h @@ -63,20 +63,47 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL public: using Base = OperatorX<LocalExchangeSourceLocalState>; LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, id) {} + LocalExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _exchange_type(tnode.local_exchange_node.partition_type), + _planned_by_fe(true) {} #ifdef BE_TEST LocalExchangeSourceOperatorX() = default; #endif - Status init(ExchangeType type) override { + Status init(TLocalPartitionType::type type) override { + DCHECK(!_planned_by_fe); _op_name = "LOCAL_EXCHANGE_OPERATOR(" + get_exchange_type_name(type) + ")"; _exchange_type = type; return Status::OK(); } - Status prepare(RuntimeState* state) override { return Status::OK(); } + Status prepare(RuntimeState* state) override { + if (_planned_by_fe) { + RETURN_IF_ERROR(Base::prepare(state)); + // Base::prepare() resets _op_name from tnode node_type; restore the type-qualified name. + _op_name = "LOCAL_EXCHANGE_OPERATOR(" + get_exchange_type_name(_exchange_type) + ")"; + return Status::OK(); + } + return Status::OK(); + } const RowDescriptor& intermediate_row_desc() const override { + if (_planned_by_fe) { + return Base::intermediate_row_desc(); + } return _child->intermediate_row_desc(); } - RowDescriptor& row_descriptor() override { return _child->row_descriptor(); } - const RowDescriptor& row_desc() const override { return _child->row_desc(); } + RowDescriptor& row_descriptor() override { + if (_planned_by_fe) { + return Base::row_descriptor(); + } + return _child->row_descriptor(); + } + const RowDescriptor& row_desc() const override { + if (_planned_by_fe) { + return Base::row_desc(); + } + return _child->row_desc(); + } Status get_block(RuntimeState* state, Block* block, bool* eos) override; @@ -85,7 +112,8 @@ public: private: friend class LocalExchangeSourceLocalState; - ExchangeType _exchange_type; + TLocalPartitionType::type _exchange_type; + const bool _planned_by_fe = false; }; } // namespace doris diff --git a/be/src/exec/exchange/local_exchanger.h b/be/src/exec/exchange/local_exchanger.h index f1c784a9be6..a04eb23ec24 100644 --- a/be/src/exec/exchange/local_exchanger.h +++ b/be/src/exec/exchange/local_exchanger.h @@ -227,16 +227,19 @@ using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>; template <typename BlockType> class Exchanger : public ExchangerBase { public: - Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { + Exchanger(int running_sink_operators, int num_partitions, int free_block_limit, + TLocalPartitionType::type type) + : ExchangerBase(running_sink_operators, num_partitions, free_block_limit), _type(type) { _data_queue.resize(num_partitions); _m.resize(num_partitions); for (size_t i = 0; i < num_partitions; i++) { _m[i] = std::make_unique<std::mutex>(); } } - Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) - : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { + Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit, + TLocalPartitionType::type type) + : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit), + _type(type) { _data_queue.resize(num_sources); _m.resize(num_sources); for (size_t i = 0; i < num_sources; i++) { @@ -244,6 +247,7 @@ public: } } ~Exchanger() override = default; + TLocalPartitionType::type get_type() const override { return _type; } std::string data_queue_debug_string(int i) override { return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i, _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); @@ -260,6 +264,7 @@ protected: bool _dequeue_data(BlockType& block, bool* eos, Block* data_block, int channel_id); std::vector<BlockQueue<BlockType>> _data_queue; std::vector<std::unique_ptr<std::mutex>> _m; + const TLocalPartitionType::type _type; }; class LocalExchangeSourceLocalState; @@ -269,9 +274,9 @@ class ShuffleExchanger : public Exchanger<PartitionedBlock> { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - int free_block_limit) + int free_block_limit, TLocalPartitionType::type type) : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, - free_block_limit) { + free_block_limit, type) { DCHECK_GT(num_partitions, 0); DCHECK_GT(num_sources, 0); _partition_rows_histogram.resize(running_sink_operators); @@ -283,7 +288,6 @@ public: Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; void close(SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, Block* block, @@ -299,24 +303,22 @@ class BucketShuffleExchanger final : public ShuffleExchanger { BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, - free_block_limit) {} + free_block_limit, TLocalPartitionType::BUCKET_HASH_SHUFFLE) {} ~BucketShuffleExchanger() override = default; - ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> { public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) {} + : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::PASSTHROUGH) {} ~PassthroughExchanger() override = default; Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) override; Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } void close(SourceInfo&& source_info) override; }; @@ -324,29 +326,28 @@ class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> { public: ENABLE_FACTORY_CREATOR(PassToOneExchanger); PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) {} + : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::PASS_TO_ONE) {} ~PassToOneExchanger() override = default; Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) override; Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } void close(SourceInfo&& source_info) override; }; class BroadcastExchanger final : public Exchanger<BroadcastBlock> { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {} + : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::BROADCAST) {} ~BroadcastExchanger() override = default; Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) override; Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::BROADCAST; } void close(SourceInfo&& source_info) override; }; @@ -357,8 +358,8 @@ public: ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger); AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) { + : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::ADAPTIVE_PASSTHROUGH) { _partition_rows_histogram.resize(running_sink_operators); } Status sink(RuntimeState* state, Block* in_block, bool eos, Profile&& profile, @@ -366,7 +367,6 @@ public: Status get_block(RuntimeState* state, Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } void close(SourceInfo&& source_info) override; diff --git a/be/src/exec/operator/aggregation_sink_operator.h b/be/src/exec/operator/aggregation_sink_operator.h index 0a7067ecb41..81fa9b736a4 100644 --- a/be/src/exec/operator/aggregation_sink_operator.h +++ b/be/src/exec/operator/aggregation_sink_operator.h @@ -160,13 +160,15 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_partition_exprs.empty()) { return _needs_finalize - ? DataDistribution(ExchangeType::NOOP) + ? DataDistribution(TLocalPartitionType::NOOP) : DataSinkOperatorX<AggSinkLocalState>::required_data_distribution( state); } return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } bool is_colocated_operator() const override { return _is_colocate; } bool is_shuffled_operator() const override { diff --git a/be/src/exec/operator/analytic_sink_operator.h b/be/src/exec/operator/analytic_sink_operator.h index 7b6975e5e68..0750af230c7 100644 --- a/be/src/exec/operator/analytic_sink_operator.h +++ b/be/src/exec/operator/analytic_sink_operator.h @@ -213,11 +213,13 @@ public: Status sink(RuntimeState* state, Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_partition_by_eq_expr_ctxs.empty()) { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } else { return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } } diff --git a/be/src/exec/operator/assert_num_rows_operator.h b/be/src/exec/operator/assert_num_rows_operator.h index 2639dae29d1..87758d8c470 100644 --- a/be/src/exec/operator/assert_num_rows_operator.h +++ b/be/src/exec/operator/assert_num_rows_operator.h @@ -48,7 +48,7 @@ public: [[nodiscard]] bool is_source() const override { return false; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } private: diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h b/be/src/exec/operator/distinct_streaming_aggregation_operator.h index 07e015af86e..19246dab447 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h @@ -117,15 +117,17 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_needs_finalize && _probe_expr_ctxs.empty()) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } if (state->enable_distinct_streaming_agg_force_passthrough()) { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } else { return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution( state); diff --git a/be/src/exec/operator/exchange_source_operator.h b/be/src/exec/operator/exchange_source_operator.h index b973e4923bd..3b53ad3732e 100644 --- a/be/src/exec/operator/exchange_source_operator.h +++ b/be/src/exec/operator/exchange_source_operator.h @@ -112,13 +112,13 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (OperatorX<ExchangeLocalState>::is_serial_operator()) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } return _partition_type == TPartitionType::HASH_PARTITIONED - ? DataDistribution(ExchangeType::HASH_SHUFFLE) + ? DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) : _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE) - : DataDistribution(ExchangeType::NOOP); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE) + : DataDistribution(TLocalPartitionType::NOOP); } private: diff --git a/be/src/exec/operator/hashjoin_build_sink.h b/be/src/exec/operator/hashjoin_build_sink.h index c6f492e8df7..499a3903258 100644 --- a/be/src/exec/operator/hashjoin_build_sink.h +++ b/be/src/exec/operator/hashjoin_build_sink.h @@ -134,15 +134,17 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } else if (_is_broadcast_join) { - return _child->is_serial_operator() ? DataDistribution(ExchangeType::PASS_TO_ONE) - : DataDistribution(ExchangeType::NOOP); + return _child->is_serial_operator() ? DataDistribution(TLocalPartitionType::PASS_TO_ONE) + : DataDistribution(TLocalPartitionType::NOOP); } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } bool is_shuffled_operator() const override { diff --git a/be/src/exec/operator/hashjoin_probe_operator.h b/be/src/exec/operator/hashjoin_probe_operator.h index 5a771b80331..f53c683aa61 100644 --- a/be/src/exec/operator/hashjoin_probe_operator.h +++ b/be/src/exec/operator/hashjoin_probe_operator.h @@ -136,21 +136,23 @@ public: bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution(RuntimeState* state) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } else if (_is_broadcast_join) { if (state->enable_broadcast_join_force_passthrough()) { - return DataDistribution(ExchangeType::PASSTHROUGH); + return DataDistribution(TLocalPartitionType::PASSTHROUGH); } else { return _child && _child->is_serial_operator() - ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataDistribution(ExchangeType::NOOP); + ? DataDistribution(TLocalPartitionType::PASSTHROUGH) + : DataDistribution(TLocalPartitionType::NOOP); } } return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs)); } bool is_broadcast_join() const { return _is_broadcast_join; } diff --git a/be/src/exec/operator/nested_loop_join_build_operator.h b/be/src/exec/operator/nested_loop_join_build_operator.h index 17a22a54019..f16e8590e28 100644 --- a/be/src/exec/operator/nested_loop_join_build_operator.h +++ b/be/src/exec/operator/nested_loop_join_build_operator.h @@ -69,10 +69,10 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } - return _child->is_serial_operator() ? DataDistribution(ExchangeType::BROADCAST) - : DataDistribution(ExchangeType::NOOP); + return _child->is_serial_operator() ? DataDistribution(TLocalPartitionType::BROADCAST) + : DataDistribution(TLocalPartitionType::NOOP); } private: diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.h b/be/src/exec/operator/nested_loop_join_probe_operator.h index 793f0129953..0e4882bf7c9 100644 --- a/be/src/exec/operator/nested_loop_join_probe_operator.h +++ b/be/src/exec/operator/nested_loop_join_probe_operator.h @@ -240,9 +240,9 @@ public: if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } - return {ExchangeType::ADAPTIVE_PASSTHROUGH}; + return {TLocalPartitionType::ADAPTIVE_PASSTHROUGH}; } const RowDescriptor& row_desc() const override { diff --git a/be/src/exec/operator/operator.cpp b/be/src/exec/operator/operator.cpp index 5940d3bd7da..2d032286fcc 100644 --- a/be/src/exec/operator/operator.cpp +++ b/be/src/exec/operator/operator.cpp @@ -143,8 +143,8 @@ Status PipelineXSinkLocalState<SharedStateArg>::terminate(RuntimeState* state) { DataDistribution OperatorBase::required_data_distribution(RuntimeState* /*state*/) const { return _child && _child->is_serial_operator() && !is_source() - ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataDistribution(ExchangeType::NOOP); + ? DataDistribution(TLocalPartitionType::PASSTHROUGH) + : DataDistribution(TLocalPartitionType::NOOP); } const RowDescriptor& OperatorBase::row_desc() const { diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h index a89cd30770e..9821dc067d2 100644 --- a/be/src/exec/operator/operator.h +++ b/be/src/exec/operator/operator.h @@ -575,8 +575,8 @@ public: virtual bool reset_to_rerun(RuntimeState* state, OperatorXBase* root) const { return false; } Status init(const TDataSink& tsink) override; - [[nodiscard]] virtual Status init(RuntimeState* state, ExchangeType type, const int num_buckets, - const bool use_global_hash_shuffle, + [[nodiscard]] virtual Status init(RuntimeState* state, TLocalPartitionType::type type, + const int num_buckets, const std::map<int, int>& shuffle_idx_to_instance_idx) { return Status::InternalError("init() is only implemented in local exchange!"); } @@ -825,7 +825,7 @@ public: Status init(const TDataSink& tsink) override { throw Exception(Status::FatalError("should not reach here!")); } - virtual Status init(ExchangeType type) { + virtual Status init(TLocalPartitionType::type type) { throw Exception(Status::FatalError("should not reach here!")); } [[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { diff --git a/be/src/exec/operator/partition_sort_sink_operator.h b/be/src/exec/operator/partition_sort_sink_operator.h index 989e9de6d3a..f274d05d7ed 100644 --- a/be/src/exec/operator/partition_sort_sink_operator.h +++ b/be/src/exec/operator/partition_sort_sink_operator.h @@ -95,9 +95,9 @@ public: Status sink(RuntimeState* state, Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { - return DataDistribution(ExchangeType::HASH_SHUFFLE, _distribute_exprs); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _distribute_exprs); } - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.h b/be/src/exec/operator/partitioned_hash_join_probe_operator.h index 2a53458e129..f5b1a272e2b 100644 --- a/be/src/exec/operator/partitioned_hash_join_probe_operator.h +++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.h @@ -229,16 +229,8 @@ public: Status pull(doris::RuntimeState* state, Block* output_block, bool* eos) const override; bool need_more_input_data(RuntimeState* state) const override; - DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; - } - return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || - _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, - _distribution_partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, - _distribution_partition_exprs)); + DataDistribution required_data_distribution(RuntimeState* state) const override { + return _inner_probe_operator->required_data_distribution(state); } size_t revocable_mem_size(RuntimeState* state) const override; diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.h b/be/src/exec/operator/partitioned_hash_join_sink_operator.h index c4ffeb0ce44..068a030156f 100644 --- a/be/src/exec/operator/partitioned_hash_join_sink_operator.h +++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.h @@ -128,14 +128,14 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _distribution_partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _distribution_partition_exprs); } diff --git a/be/src/exec/operator/rec_cte_anchor_sink_operator.h b/be/src/exec/operator/rec_cte_anchor_sink_operator.h index fb93a89c317..f8ccdffe7f9 100644 --- a/be/src/exec/operator/rec_cte_anchor_sink_operator.h +++ b/be/src/exec/operator/rec_cte_anchor_sink_operator.h @@ -67,7 +67,7 @@ public: bool is_serial_operator() const override { return true; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } Status terminate(RuntimeState* state) override { diff --git a/be/src/exec/operator/rec_cte_sink_operator.h b/be/src/exec/operator/rec_cte_sink_operator.h index 44d42346f19..f1932625a97 100644 --- a/be/src/exec/operator/rec_cte_sink_operator.h +++ b/be/src/exec/operator/rec_cte_sink_operator.h @@ -78,7 +78,7 @@ public: bool is_serial_operator() const override { return true; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } Status sink(RuntimeState* state, Block* input_block, bool eos) override { diff --git a/be/src/exec/operator/rec_cte_source_operator.h b/be/src/exec/operator/rec_cte_source_operator.h index 84a1f81f6b2..9dd54f81125 100644 --- a/be/src/exec/operator/rec_cte_source_operator.h +++ b/be/src/exec/operator/rec_cte_source_operator.h @@ -94,7 +94,7 @@ public: bool is_serial_operator() const override { return true; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } Status get_block(RuntimeState* state, Block* block, bool* eos) override { diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h index f9b162be62d..8ab5879ce10 100644 --- a/be/src/exec/operator/scan_operator.h +++ b/be/src/exec/operator/scan_operator.h @@ -370,9 +370,9 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (OperatorX<LocalStateType>::is_serial_operator()) { // `is_serial_operator()` returns true means we ignore the distribution. - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } - return {ExchangeType::BUCKET_HASH_SHUFFLE}; + return {TLocalPartitionType::BUCKET_HASH_SHUFFLE}; } void set_low_memory_mode(RuntimeState* state) override { diff --git a/be/src/exec/operator/set_probe_sink_operator.h b/be/src/exec/operator/set_probe_sink_operator.h index 78af00cc1d0..9d98df71681 100644 --- a/be/src/exec/operator/set_probe_sink_operator.h +++ b/be/src/exec/operator/set_probe_sink_operator.h @@ -104,8 +104,10 @@ public: Status sink(RuntimeState* state, Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } diff --git a/be/src/exec/operator/set_sink_operator.h b/be/src/exec/operator/set_sink_operator.h index b0a0868566e..3d2d11137d2 100644 --- a/be/src/exec/operator/set_sink_operator.h +++ b/be/src/exec/operator/set_sink_operator.h @@ -110,8 +110,8 @@ public: Status sink(RuntimeState* state, Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _partition_exprs); } size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; diff --git a/be/src/exec/operator/set_source_operator.h b/be/src/exec/operator/set_source_operator.h index f2f245f1edc..fa203ab71c5 100644 --- a/be/src/exec/operator/set_source_operator.h +++ b/be/src/exec/operator/set_source_operator.h @@ -82,8 +82,8 @@ public: bool is_shuffled_operator() const override { return true; } bool is_colocated_operator() const override { return _is_colocate; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE) - : DataDistribution(ExchangeType::HASH_SHUFFLE); + return _is_colocate ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); } Status get_block(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/exec/operator/sort_sink_operator.h b/be/src/exec/operator/sort_sink_operator.h index d70fba0866f..4bc9d0f7f37 100644 --- a/be/src/exec/operator/sort_sink_operator.h +++ b/be/src/exec/operator/sort_sink_operator.h @@ -80,13 +80,15 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_is_analytic_sort) { return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } else if (_merge_by_exchange) { // The current sort node is used for the ORDER BY - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } else { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } } bool is_colocated_operator() const override { return _is_colocate; } diff --git a/be/src/exec/operator/streaming_aggregation_operator.h b/be/src/exec/operator/streaming_aggregation_operator.h index cf1100f8dc1..ed1db0dae9c 100644 --- a/be/src/exec/operator/streaming_aggregation_operator.h +++ b/be/src/exec/operator/streaming_aggregation_operator.h @@ -225,7 +225,7 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_child && _child->is_hash_join_probe() && state->enable_streaming_agg_hash_join_force_passthrough()) { - return DataDistribution(ExchangeType::PASSTHROUGH); + return DataDistribution(TLocalPartitionType::PASSTHROUGH); } if (!state->get_query_ctx()->should_be_shuffled_agg( StatefulOperatorX<StreamingAggLocalState>::node_id())) { @@ -233,11 +233,12 @@ public: } if (_partition_exprs.empty()) { return _needs_finalize - ? DataDistribution(ExchangeType::NOOP) + ? DataDistribution(TLocalPartitionType::NOOP) : StatefulOperatorX<StreamingAggLocalState>::required_data_distribution( state); } - return DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } private: diff --git a/be/src/exec/operator/table_function_operator.h b/be/src/exec/operator/table_function_operator.h index c6c3ddac577..c9d4d58bb85 100644 --- a/be/src/exec/operator/table_function_operator.h +++ b/be/src/exec/operator/table_function_operator.h @@ -105,7 +105,7 @@ public: } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } Status push(RuntimeState* state, Block* input_block, bool eos) const override { diff --git a/be/src/exec/operator/union_sink_operator.h b/be/src/exec/operator/union_sink_operator.h index 9bcf52a10a6..294cc6106d6 100644 --- a/be/src/exec/operator/union_sink_operator.h +++ b/be/src/exec/operator/union_sink_operator.h @@ -117,10 +117,10 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_require_bucket_distribution) { - return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _distribute_exprs); + return DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _distribute_exprs); } if (_followed_by_shuffled_operator) { - return DataDistribution(ExchangeType::HASH_SHUFFLE, _distribute_exprs); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _distribute_exprs); } return Base::required_data_distribution(state); } diff --git a/be/src/exec/operator/union_source_operator.h b/be/src/exec/operator/union_source_operator.h index 78c1eb2e99b..3e3ec8cc47d 100644 --- a/be/src/exec/operator/union_source_operator.h +++ b/be/src/exec/operator/union_source_operator.h @@ -102,10 +102,10 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_require_bucket_distribution) { - return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE); + return DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE); } if (_followed_by_shuffled_operator) { - return DataDistribution(ExchangeType::HASH_SHUFFLE); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); } return Base::required_data_distribution(state); } diff --git a/be/src/exec/pipeline/dependency.h b/be/src/exec/pipeline/dependency.h index 2f82d4f18e2..74cb7e30b18 100644 --- a/be/src/exec/pipeline/dependency.h +++ b/be/src/exec/pipeline/dependency.h @@ -653,50 +653,44 @@ public: Status hash_table_init(); }; -enum class ExchangeType : uint8_t { - NOOP = 0, - // Shuffle data by Crc32CHashPartitioner - HASH_SHUFFLE = 1, - // Round-robin passthrough data blocks. - PASSTHROUGH = 2, - // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as storage engine). - BUCKET_HASH_SHUFFLE = 3, - // Passthrough data blocks to all channels. - BROADCAST = 4, - // Passthrough data to channels evenly in an adaptive way. - ADAPTIVE_PASSTHROUGH = 5, - // Send all data to the first channel. - PASS_TO_ONE = 6, -}; +inline bool is_shuffled_exchange(TLocalPartitionType::type idx) { + return idx == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE || + idx == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE || + idx == TLocalPartitionType::BUCKET_HASH_SHUFFLE; +} -inline std::string get_exchange_type_name(ExchangeType idx) { +inline std::string get_exchange_type_name(TLocalPartitionType::type idx) { switch (idx) { - case ExchangeType::NOOP: + case TLocalPartitionType::NOOP: return "NOOP"; - case ExchangeType::HASH_SHUFFLE: - return "HASH_SHUFFLE"; - case ExchangeType::PASSTHROUGH: + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: + return "GLOBAL_HASH_SHUFFLE"; + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + return "LOCAL_HASH_SHUFFLE"; + case TLocalPartitionType::PASSTHROUGH: return "PASSTHROUGH"; - case ExchangeType::BUCKET_HASH_SHUFFLE: + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: return "BUCKET_HASH_SHUFFLE"; - case ExchangeType::BROADCAST: + case TLocalPartitionType::BROADCAST: return "BROADCAST"; - case ExchangeType::ADAPTIVE_PASSTHROUGH: + case TLocalPartitionType::ADAPTIVE_PASSTHROUGH: return "ADAPTIVE_PASSTHROUGH"; - case ExchangeType::PASS_TO_ONE: + case TLocalPartitionType::PASS_TO_ONE: return "PASS_TO_ONE"; + case TLocalPartitionType::LOCAL_MERGE_SORT: + return "LOCAL_MERGE_SORT"; } throw Exception(Status::FatalError("__builtin_unreachable")); } struct DataDistribution { - DataDistribution(ExchangeType type) : distribution_type(type) {} - DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_) + DataDistribution(TLocalPartitionType::type type) : distribution_type(type) {} + DataDistribution(TLocalPartitionType::type type, const std::vector<TExpr>& partition_exprs_) : distribution_type(type), partition_exprs(partition_exprs_) {} DataDistribution(const DataDistribution& other) = default; - bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } + bool need_local_exchange() const { return distribution_type != TLocalPartitionType::NOOP; } DataDistribution& operator=(const DataDistribution& other) = default; - ExchangeType distribution_type; + TLocalPartitionType::type distribution_type; std::vector<TExpr> partition_exprs; }; diff --git a/be/src/exec/pipeline/pipeline.cpp b/be/src/exec/pipeline/pipeline.cpp index 78a2cffafad..fecb80b4d1c 100644 --- a/be/src/exec/pipeline/pipeline.cpp +++ b/be/src/exec/pipeline/pipeline.cpp @@ -54,8 +54,7 @@ bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distrib return true; } - if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && - target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { + if (!is_shuffled_exchange(target_data_distribution.distribution_type)) { // Always do local exchange if non-hash-partition exchanger is required. // For example, `PASSTHROUGH` exchanger is always required to distribute data evenly. return true; diff --git a/be/src/exec/pipeline/pipeline.h b/be/src/exec/pipeline/pipeline.h index 75e801ddc82..b7421dbb3ea 100644 --- a/be/src/exec/pipeline/pipeline.h +++ b/be/src/exec/pipeline/pipeline.h @@ -69,16 +69,15 @@ public: [[nodiscard]] PipelineId id() const { return _pipeline_id; } - static bool is_hash_exchange(ExchangeType idx) { - return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; + static bool is_hash_exchange(TLocalPartitionType::type idx) { + return is_shuffled_exchange(idx); } // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH, // data is processed and shuffled on the sink. // Compared to PASSTHROUGH, this is a relatively heavy operation. - static bool heavy_operations_on_the_sink(ExchangeType idx) { - return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE || - idx == ExchangeType::ADAPTIVE_PASSTHROUGH; + static bool heavy_operations_on_the_sink(TLocalPartitionType::type idx) { + return is_shuffled_exchange(idx) || idx == TLocalPartitionType::ADAPTIVE_PASSTHROUGH; } bool need_to_local_exchange(const DataDistribution target_data_distribution, @@ -166,7 +165,7 @@ private: // Input data distribution of this pipeline. We do local exchange when input data distribution // does not match the target data distribution. - DataDistribution _data_distribution {ExchangeType::NOOP}; + DataDistribution _data_distribution {TLocalPartitionType::NOOP}; // How many tasks should be created ? int _num_tasks = 1; diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index ebb56df51d0..2343dc2c695 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -261,6 +261,9 @@ Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, &_root_op, root_pipeline)); + // Create deferred local exchangers now that all pipelines have final num_tasks. + RETURN_IF_ERROR(_create_deferred_local_exchangers()); + // 3. Create sink operator if (!_params.fragment.__isset.output_sink) { return Status::InternalError("No output sink in this fragment!"); @@ -278,7 +281,7 @@ Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr } } // 4. Build local exchanger - if (_runtime_state->enable_local_shuffle()) { + if (_runtime_state->plan_local_shuffle()) { SCOPED_TIMER(_plan_local_exchanger_timer); RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, _params.bucket_seq_to_instance_idx, @@ -663,6 +666,50 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip return Status::OK(); } +Status PipelineFragmentContext::_create_deferred_local_exchangers() { + for (auto& info : _deferred_exchangers) { + const int sender_count = info.upstream_pipe->num_tasks(); + switch (info.partition_type) { + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: + info.shared_state->exchanger = ShuffleExchanger::create_unique( + sender_count, _num_instances, info.num_partitions, info.free_blocks_limit, + info.partition_type); + break; + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: + info.shared_state->exchanger = BucketShuffleExchanger::create_unique( + sender_count, _num_instances, info.num_partitions, info.free_blocks_limit); + break; + case TLocalPartitionType::PASSTHROUGH: + info.shared_state->exchanger = PassthroughExchanger::create_unique( + sender_count, _num_instances, info.free_blocks_limit); + break; + case TLocalPartitionType::BROADCAST: + info.shared_state->exchanger = BroadcastExchanger::create_unique( + sender_count, _num_instances, info.free_blocks_limit); + break; + case TLocalPartitionType::PASS_TO_ONE: + if (_runtime_state->enable_share_hash_table_for_broadcast_join()) { + info.shared_state->exchanger = PassToOneExchanger::create_unique( + sender_count, _num_instances, info.free_blocks_limit); + } else { + info.shared_state->exchanger = BroadcastExchanger::create_unique( + sender_count, _num_instances, info.free_blocks_limit); + } + break; + case TLocalPartitionType::ADAPTIVE_PASSTHROUGH: + info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( + sender_count, _num_instances, info.free_blocks_limit); + break; + default: + return Status::InternalError("Unsupported FE-planned local exchange type: " + + std::to_string(static_cast<int>(info.partition_type))); + } + } + _deferred_exchangers.clear(); + return Status::OK(); +} + Status PipelineFragmentContext::_create_tree_helper( ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs, OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, @@ -783,28 +830,35 @@ Status PipelineFragmentContext::_add_local_exchange_impl( sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx); if (bucket_seq_to_instance_idx.empty() && - data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { - data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; + data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { + data_distribution.distribution_type = + use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE + : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE; + } + if (!use_global_hash_shuffle && + data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) { + data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE; } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type, - num_buckets, use_global_hash_shuffle, - shuffle_idx_to_instance_idx)); + num_buckets, shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. std::shared_ptr<LocalExchangeSharedState> shared_state = LocalExchangeSharedState::create_shared(_num_instances); switch (data_distribution.distribution_type) { - case ExchangeType::HASH_SHUFFLE: + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, use_global_hash_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? cast_set<int>( _runtime_state->query_options().local_exchange_free_blocks_limit) - : 0); + : 0, + data_distribution.distribution_type); break; - case ExchangeType::BUCKET_HASH_SHUFFLE: + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -812,7 +866,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; - case ExchangeType::PASSTHROUGH: + case TLocalPartitionType::PASSTHROUGH: shared_state->exchanger = PassthroughExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -820,7 +874,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; - case ExchangeType::BROADCAST: + case TLocalPartitionType::BROADCAST: shared_state->exchanger = BroadcastExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -828,7 +882,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; - case ExchangeType::PASS_TO_ONE: + case TLocalPartitionType::PASS_TO_ONE: if (_runtime_state->enable_share_hash_table_for_broadcast_join()) { // If shared hash table is enabled for BJ, hash table will be built by only one task shared_state->exchanger = PassToOneExchanger::create_unique( @@ -846,7 +900,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( : 0); } break; - case ExchangeType::ADAPTIVE_PASSTHROUGH: + case TLocalPartitionType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -957,9 +1011,9 @@ Status PipelineFragmentContext::_add_local_exchange( Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) { RETURN_IF_ERROR(_add_local_exchange_impl( cast_set<int>(new_pip->operators().size()), pool, new_pip, - add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), - do_local_exchange, num_buckets, bucket_seq_to_instance_idx, - shuffle_idx_to_instance_idx)); + add_pipeline(new_pip, pip_idx + 2), + DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); } return Status::OK(); } @@ -1725,6 +1779,84 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); break; } + case TPlanNodeType::LOCAL_EXCHANGE_NODE: { + op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs); + // Save downstream pipeline's num_tasks before add_operator potentially reduces it + // (is_serial_operator on the LocalExchangeSourceOperatorX would set num_tasks=1, + // but the downstream pipeline needs _num_instances tasks — the serial semantics + // should only apply to the upstream scan pipeline). + auto downstream_num_tasks = cur_pipe->num_tasks(); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties: + // downstream keeps _num_instances, upstream gets the serial/reduced count) + cur_pipe->set_num_tasks(downstream_num_tasks); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(cur_pipe); + // If this local exchange was inserted because of a serial scan (is_serial_operator), + // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task). + // We set this now so the exchanger is created with the correct sender count. + // Child operators added later (serial scan) will also set num_tasks=1, which is + // consistent with this. + if (op->is_serial_operator() && _parallel_instances > 0) { + cur_pipe->set_num_tasks(_parallel_instances); + } + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + int num_partitions = 0; + std::map<int, int> shuffle_id_to_instance_idx; + auto partition_type = tnode.local_exchange_node.partition_type; + switch (partition_type) { + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: + num_partitions = _params.num_buckets; + shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx; + break; + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + for (int i = 0; i < _num_instances; i++) { + shuffle_id_to_instance_idx[i] = i; + } + num_partitions = _num_instances; + break; + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: + num_partitions = _total_instances; + shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx; + break; + default: + break; + } + auto local_exchange_id = op->operator_id(); + auto sink_id = next_sink_operator_id(); + DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>( + sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx); + sink_ops.push_back(sink); + RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); + + // For FE-planned local exchange, we need to: + // 1. Initialize the partitioner for hash shuffle types + // 2. Defer exchanger creation until after the full plan tree is built + // (child operators like serial ExchangeNode may change cur_pipe->num_tasks()) + // 3. Register shared state so pipeline tasks can find it + RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink()) + ->init_partitioner(_runtime_state.get())); + + int free_blocks_limit = + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) + : 0; + auto shared_state = LocalExchangeSharedState::create_shared(_num_instances); + shared_state->create_source_dependencies(_num_instances, local_exchange_id, + local_exchange_id, "LOCAL_EXCHANGE_OPERATOR"); + shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK"); + _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}}); + // Defer exchanger creation: sender count depends on final upstream num_tasks + _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions, + free_blocks_limit, local_exchange_id, sink_id}); + break; + } default: return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e44062b6888..f26ee42d1d7 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -393,8 +393,11 @@ public: BeExecVersionManager::check_be_exec_version(_query_options.be_exec_version)); return _query_options.be_exec_version; } - bool enable_local_shuffle() const { - return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle; + bool plan_local_shuffle() const { + // If local shuffle is enabled and not planned by local shuffle planner, we should plan local shuffle in BE. + return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle && + (!_query_options.__isset.enable_local_shuffle_planner || + !_query_options.enable_local_shuffle_planner); } MOCK_FUNCTION bool enable_local_exchange() const { diff --git a/be/test/exec/pipeline/local_exchanger_test.cpp b/be/test/exec/pipeline/local_exchanger_test.cpp index 2a1bb3ddfc2..7412c0f0e9c 100644 --- a/be/test/exec/pipeline/local_exchanger_test.cpp +++ b/be/test/exec/pipeline/local_exchanger_test.cpp @@ -89,8 +89,9 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { _local_states.resize(num_sources); auto profile = std::make_shared<RuntimeProfile>(""); auto shared_state = LocalExchangeSharedState::create_shared(num_partitions); - shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, - free_block_limit); + shared_state->exchanger = + ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, free_block_limit, + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); @@ -1162,8 +1163,9 @@ TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) { _local_states.resize(num_sources); auto profile = std::make_shared<RuntimeProfile>(""); auto shared_state = LocalExchangeSharedState::create_shared(num_partitions); - shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, - free_block_limit); + shared_state->exchanger = + ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, free_block_limit, + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); diff --git a/be/test/exec/pipeline/pipeline_test.cpp b/be/test/exec/pipeline/pipeline_test.cpp index 083679406b6..156058428ea 100644 --- a/be/test/exec/pipeline/pipeline_test.cpp +++ b/be/test/exec/pipeline/pipeline_test.cpp @@ -479,7 +479,7 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { DescriptorTbl* desc; OperatorPtr op; _build_fragment_context(); - EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true); + EXPECT_EQ(_runtime_state.front()->plan_local_shuffle(), true); auto cur_pipe = _build_pipeline(parallelism); { auto tnode = TPlanNodeBuilder(_next_node_id(), TPlanNodeType::EXCHANGE_NODE) @@ -554,11 +554,12 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { } { cur_pipe->init_data_distribution(_runtime_state.back().get()); - EXPECT_EQ(cur_pipe->data_distribution().distribution_type, ExchangeType::HASH_SHUFFLE); + EXPECT_EQ(cur_pipe->data_distribution().distribution_type, + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(cur_pipe->sink() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::NOOP); + TLocalPartitionType::NOOP); EXPECT_EQ(cur_pipe->need_to_local_exchange( cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()), 1), @@ -567,11 +568,11 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { { cur_pipe->operators().front()->set_serial_operator(); cur_pipe->init_data_distribution(_runtime_state.back().get()); - EXPECT_EQ(cur_pipe->data_distribution().distribution_type, ExchangeType::NOOP); + EXPECT_EQ(cur_pipe->data_distribution().distribution_type, TLocalPartitionType::NOOP); EXPECT_EQ(cur_pipe->sink() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::PASSTHROUGH); + TLocalPartitionType::PASSTHROUGH); EXPECT_EQ(cur_pipe->need_to_local_exchange( cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()), 1), @@ -590,7 +591,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { // Build pipeline DescriptorTbl* desc; _build_fragment_context(); - EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true); + EXPECT_EQ(_runtime_state.front()->plan_local_shuffle(), true); { TTupleDescriptor tuple0 = TTupleDescriptorBuilder().set_id(0).build(); TSlotDescriptor slot0 = @@ -873,12 +874,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { if (pip_idx == 1) { // Pipeline(ExchangeOperator(id=1, HASH_PARTITIONED) -> HashJoinBuildOperator(id=0)) EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx] ->sink() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange( _pipelines[pip_idx]->sink()->required_data_distribution( _runtime_state.back().get()), @@ -889,7 +890,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { _pipelines[pip_idx]->set_data_distribution( _pipelines[pip_idx]->children().front()->data_distribution()); EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange( _pipelines[pip_idx]->sink()->required_data_distribution( _runtime_state.back().get()), @@ -900,7 +901,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .back() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange( _pipelines[pip_idx]->operators().back()->required_data_distribution( _runtime_state.back().get()), diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index f2da2243ff5..c045f568333 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -472,6 +472,8 @@ struct TQueryOptions { // session variable `spill_repartition_max_depth` in FE. Default is 8. 209: optional i32 spill_repartition_max_depth = 8 + // enable plan local exchange node in fe + 210: optional bool enable_local_shuffle_planner; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index de6ba8127af..bf23f418eaf 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -55,6 +55,40 @@ enum TPartitionType { MERGE_PARTITIONED = 9 } +enum TLocalPartitionType { + NOOP = 0, + // used to resume the global hash distribution because other distribution break the global hash distribution, + // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash distribution. + // + // for example: look here, need resume to GLOBAL_EXECUTION_HASH_SHUFFLE + // ↓ + // Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode → LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode + // ExchangeNode(BROADCAST) ↗ ↑ + // ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) + GLOBAL_EXECUTION_HASH_SHUFFLE = 1, + // used to rebalance data for rebalance data and add parallelism + // + // for example: look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to rebalance data + // ↓ + // Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, name)) → AggregationNode(group by(id,name)) + // + // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash distribution of scan node is based on id, + // but the hash distribution of aggregation node is based on id and name, so we need to rebalance data by both + // id and name to make sure the data with same id and name can be sent to the same instance of aggregation node. + // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because + // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping partial global instance index to local + // instance index, and discard the other backend's instance index, the data not belong to the local instance will be + // discarded, which cause data loss. + LOCAL_EXECUTION_HASH_SHUFFLE = 2, + BUCKET_HASH_SHUFFLE = 3, + // round-robin partition, used to rebalance data for rebalance data and add parallelism + PASSTHROUGH = 4, + ADAPTIVE_PASSTHROUGH = 5, + BROADCAST = 6, + PASS_TO_ONE = 7, + LOCAL_MERGE_SORT = 8 +} + enum TDistributionType { UNPARTITIONED = 0, @@ -119,3 +153,5 @@ struct TDataPartition { 3: optional list<TRangePartition> partition_infos 4: optional TMergePartitionInfo merge_partition_info } + + diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4f029d4aa26..f987deab244 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -62,7 +62,8 @@ enum TPlanNodeType { GROUP_COMMIT_SCAN_NODE = 33, MATERIALIZATION_NODE = 34, REC_CTE_NODE = 35, - REC_CTE_SCAN_NODE = 36 + REC_CTE_SCAN_NODE = 36, + LOCAL_EXCHANGE_NODE = 37 } struct TKeyRange { @@ -1309,6 +1310,24 @@ struct TExchangeNode { 4: optional Partitions.TPartitionType partition_type } +struct TLocalExchangeNode { + 1: required Partitions.TLocalPartitionType partition_type + // when partition_type in (GLOBAL_EXECUTION_HASH_SHUFFLE, LOCAL_EXECUTION_HASH_SHUFFLE, BUCKET_HASH_SHUFFLE), + // the distribute_expr_lists is not null, and the legacy `TPlanNode.distribute_expr_lists` is deprecated + // + // the hash computation: + // 1. for BUCKET_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.num_buckets`, and mapping bucket index to local instance id by + // `TPipelineFragmentParams.bucket_seq_to_instance_idx` + // 2. for LOCAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.local_params.size`, and backend will mapping instance index to local instance + // by `i -> i`, for example: 1 -> 1, 2 -> 2, ... + // 3. for GLOBAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.total_instances`, and mapping global instance index to local instance by + // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` + 2: optional list<Exprs.TExpr> distribute_expr_lists +} + struct TOlapRewriteNode { 1: required list<Exprs.TExpr> columns 2: required list<Types.TColumnType> column_types @@ -1525,6 +1544,7 @@ struct TPlanNode { 50: optional list<list<Exprs.TExpr>> distribute_expr_lists 51: optional bool is_serial_operator 52: optional TRecCTEScanNode rec_cte_scan_node + 53: optional TLocalExchangeNode local_exchange_node // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list<Exprs.TExpr> projections --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
