This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa60f5186d [opt](exec) Use PASSTHROUGH to improve the concurrency of 
the ADAPTIVE_PASSTHROUGH SINK. (#44925)
1aa60f5186d is described below

commit 1aa60f5186d19c3cb01a6d9732306ecf6df94606
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Wed Dec 4 10:15:54 2024 +0800

    [opt](exec) Use PASSTHROUGH to improve the concurrency of the 
ADAPTIVE_PASSTHROUGH SINK. (#44925)
    
    before
    ```
    op -> local sink(1) -> local source (n)
    ```
    now
    ```
    op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> 
local source (n)
    ```
    
    profile
    ```
                      Pipeline  :  1(instance_num=3):
                          AGGREGATION_SINK_OPERATOR  (id=4  ,  nereids_id=255):
                              CROSS_JOIN_OPERATOR  (id=3  ,  nereids_id=245):
                                  LOCAL_EXCHANGE_OPERATOR  
(ADAPTIVE_PASSTHROUGH)  (id=-5):
                      Pipeline  :  2(instance_num=3):
                          LOCAL_EXCHANGE_SINK_OPERATOR  (ADAPTIVE_PASSTHROUGH)  
(id=-5):
                              LOCAL_EXCHANGE_OPERATOR  (PASSTHROUGH)  (id=-6):
                      Pipeline  :  3(instance_num=1):
                          LOCAL_EXCHANGE_SINK_OPERATOR  (PASSTHROUGH)  (id=-6):
                              OLAP_SCAN_OPERATOR  (id=2.  nereids_id=234.  
table  name  =  nums1(nums1)):
    ```
---
 be/src/pipeline/pipeline.h                    |  8 ++++++++
 be/src/pipeline/pipeline_fragment_context.cpp | 10 +++++++---
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index b969186b178..afbe6c77596 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -73,6 +73,14 @@ public:
         return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
     }
 
+    // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
+    // data is processed and shuffled on the sink.
+    // Compared to PASSTHROUGH, this is a relatively heavy operation.
+    static bool heavy_operations_on_the_sink(ExchangeType idx) {
+        return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE ||
+               idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
+    }
+
     bool need_to_local_exchange(const DataDistribution 
target_data_distribution,
                                 const int idx) const;
     void init_data_distribution() {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0775ef3fb19..e6f257f9da7 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -814,7 +814,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     }
     case ExchangeType::ADAPTIVE_PASSTHROUGH:
         shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
-                cur_pipe->num_tasks(), _num_instances,
+                std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? cast_set<int>(
                                   
_runtime_state->query_options().local_exchange_free_blocks_limit)
@@ -915,9 +915,13 @@ Status PipelineFragmentContext::_add_local_exchange(
             << " cur_pipe->operators().size(): " << 
cur_pipe->operators().size()
             << " new_pip->operators().size(): " << new_pip->operators().size();
 
-    // Add passthrough local exchanger if necessary
+    // There are some local shuffles with relatively heavy operations on the 
sink.
+    // If the local sink concurrency is 1 and the local source concurrency is 
n, the sink becomes a bottleneck.
+    // Therefore, local passthrough is used to increase the concurrency of the 
sink.
+    // op -> local sink(1) -> local source (n)
+    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> 
local source (n)
     if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
-        Pipeline::is_hash_exchange(data_distribution.distribution_type)) {
+        
Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
         RETURN_IF_ERROR(_add_local_exchange_impl(
                 cast_set<int>(new_pip->operators().size()), pool, new_pip,
                 add_pipeline(new_pip, pip_idx + 2), 
DataDistribution(ExchangeType::PASSTHROUGH),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to