This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 21e21f5e3b4 [opt](exec) Use PASSTHROUGH to improve the concurrency of the ADAPTIV… (#44971) 21e21f5e3b4 is described below commit 21e21f5e3b48fe8cbd4a2453e6cf6cbfd9b6edd5 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Fri Dec 6 09:57:31 2024 +0800 [opt](exec) Use PASSTHROUGH to improve the concurrency of the ADAPTIV… (#44971) …E_PASSTHROUGH SINK. (#44925) https://github.com/apache/doris/pull/44925 before ``` op -> local sink(1) -> local source (n) ``` now ``` op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n) ``` profile ``` Pipeline : 1(instance_num=3): AGGREGATION_SINK_OPERATOR (id=4 , nereids_id=255): CROSS_JOIN_OPERATOR (id=3 , nereids_id=245): LOCAL_EXCHANGE_OPERATOR (ADAPTIVE_PASSTHROUGH) (id=-5): Pipeline : 2(instance_num=3): LOCAL_EXCHANGE_SINK_OPERATOR (ADAPTIVE_PASSTHROUGH) (id=-5): LOCAL_EXCHANGE_OPERATOR (PASSTHROUGH) (id=-6): Pipeline : 3(instance_num=1): LOCAL_EXCHANGE_SINK_OPERATOR (PASSTHROUGH) (id=-6): OLAP_SCAN_OPERATOR (id=2. nereids_id=234. table name = nums1(nums1)): ``` --- be/src/pipeline/pipeline.h | 8 ++++++++ be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 +++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index db93e8dfe35..8743cdfb57e 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -122,6 +122,14 @@ public: return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; } + // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH, + // data is processed and shuffled on the sink. + // Compared to PASSTHROUGH, this is a relatively heavy operation. + static bool heavy_operations_on_the_sink(ExchangeType idx) { + return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE || + idx == ExchangeType::ADAPTIVE_PASSTHROUGH; + } + bool need_to_local_exchange(const DataDistribution target_data_distribution) const { if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 860ba31097a..8c2075497a9 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -945,7 +945,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( break; case ExchangeType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( - cur_pipe->num_tasks(), _num_instances, + std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -1047,9 +1047,13 @@ Status PipelineXFragmentContext::_add_local_exchange( << " cur_pipe->operator_xs().size(): " << cur_pipe->operator_xs().size() << " new_pip->operator_xs().size(): " << new_pip->operator_xs().size(); - // Add passthrough local exchanger if necessary + // There are some local shuffles with relatively heavy operations on the sink. + // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck. + // Therefore, local passthrough is used to increase the concurrency of the sink. + // op -> local sink(1) -> local source (n) + // op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n) if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 && - Pipeline::is_hash_exchange(data_distribution.distribution_type)) { + Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) { RETURN_IF_ERROR(_add_local_exchange_impl( new_pip->operator_xs().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org