This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a771037442f [Improvement](streaming-agg) adjust single streaming 
threshold (#61104)
a771037442f is described below

commit a771037442fc378eb9f29e5275265689eb6f00bd
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 13 12:37:41 2026 +0800

    [Improvement](streaming-agg) adjust single streaming threshold (#61104)
    
    This pull request introduces an optimization for streaming aggregation
    operators by detecting when a query is assigned to a single backend and
    applying more aggressive hash table expansion thresholds in that case.
    The change propagates a new `single_backend_query` flag from the
    frontend to the backend, and both the `streaming_aggregation_operator`
    and `distinct_streaming_aggregation_operator` use this flag to select
    different hash table reduction thresholds, potentially improving
    performance for single-backend queries.
    
    **Single-backend query detection and propagation:**
    - Added a `single_backend_query` field to `TQueryOptions` and set it in
    the FE (`CoordinatorContext`, `Coordinator`, and `ThriftPlansBuilder`)
    when all fragments are assigned to a single backend.
    
[[1]](diffhunk://#diff-e23e53cda5dd2228558b7c3d07ed85b76911e3962c1b6eb46108340966179a5eR443-R447)
    
[[2]](diffhunk://#diff-1e3445c79eb4d715c96a128cb34afc75943f324edf3a28eeffe791c5f72db7a3R106)
    
[[3]](diffhunk://#diff-1e3445c79eb4d715c96a128cb34afc75943f324edf3a28eeffe791c5f72db7a3R451-R454)
    
[[4]](diffhunk://#diff-45871b12ae79fb05592420c1f6399b3e5a5836873690278ec917403451605b94R109-R113)
    
[[5]](diffhunk://#diff-1065d3f88b9a3b4bb8f151cac8514ebd39085c9dc2f0081b0eec43fd67ad7b74R838)
    
[[6]](diffhunk://#diff-1065d3f88b9a3b4bb8f151cac8514ebd39085c9dc2f0081b0eec43fd67ad7b74R875)
    - The BE receives and stores this flag in `QueryContext`, with new
    getter and setter methods and a backing field.
    
[[1]](diffhunk://#diff-3be0bcdb0c3c5fe6415556cf49c7270d4fc2e2f071c0240145ce175a32a484e2R926-R928)
    
[[2]](diffhunk://#diff-b6e358e9c765ec5d14aa33dfeab3f20288e26bf636e4da72161aac4a42c82923R112-R117)
    
[[3]](diffhunk://#diff-b6e358e9c765ec5d14aa33dfeab3f20288e26bf636e4da72161aac4a42c82923R397-R398)
    
    **Streaming aggregation operator optimization:**
    - Defined new, more aggressive hash table expansion thresholds for
    single-backend queries (`SINGLE_BE_STREAMING_HT_MIN_REDUCTION`) in both
    `streaming_aggregation_operator.cpp` and
    `distinct_streaming_aggregation_operator.cpp`.
    
[[1]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L69-R86)
    
[[2]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aR56-R66)
    - The local state for both streaming aggregation operators now checks
    the `single_backend_query` flag and uses the appropriate threshold table
    to decide when to expand hash tables.
    
[[1]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497R95)
    
[[2]](diffhunk://#diff-81893b8013f0ade85a4aca0bcf807655fc08a57e7a6cc8531278cdc5330fae9eR119)
    
[[3]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aL65-R77)
    
[[4]](diffhunk://#diff-0098f1d5348adf3114a99bf8e273b95e68e137e3d45a1c89e5db06134a63adb2R90-R91)
    - Logic in the hash table expansion decision method selects the correct
    reduction table based on the flag, ensuring the optimization is applied
    only for single-backend queries.
    
[[1]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497R251-R258)
    
[[2]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L271-R287)
    
[[3]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aR128-R135)
    
[[4]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aL148-R164)
---
 .../distinct_streaming_aggregation_operator.cpp    | 36 +++-------
 .../distinct_streaming_aggregation_operator.h      |  2 +
 be/src/exec/operator/streaming_agg_min_reduction.h | 76 ++++++++++++++++++++++
 .../operator/streaming_aggregation_operator.cpp    | 55 +++-------------
 .../exec/operator/streaming_aggregation_operator.h |  2 +
 be/src/runtime/fragment_mgr.cpp                    |  5 ++
 be/src/runtime/query_context.h                     |  8 +++
 .../main/java/org/apache/doris/qe/Coordinator.java |  2 +
 .../org/apache/doris/qe/CoordinatorContext.java    |  5 ++
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  5 ++
 gensrc/thrift/PaloInternalService.thrift           |  5 ++
 11 files changed, 128 insertions(+), 73 deletions(-)

diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp 
b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
index 48a43ca4c39..f785f056a73 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
@@ -23,6 +23,7 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "exec/operator/streaming_agg_min_reduction.h"
 #include "exprs/vectorized_agg_fn.h"
 
 namespace doris {
@@ -32,29 +33,6 @@ class RuntimeState;
 
 namespace doris {
 #include "common/compile_check_begin.h"
-struct StreamingHtMinReductionEntry {
-    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
-    // bytes is greater than this threshold.
-    int min_ht_mem;
-    // The minimum reduction factor to expand the hash tables.
-    double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the 
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
-        // Expand up to L2 cache always.
-        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
-        // Expand into L3 cache if we look like we're getting some reduction.
-        // At present, The L2 cache is generally 1024k or more
-        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
-        // Expand into main memory if we're getting a significant reduction.
-        // The L3 cache is generally 16MB or more
-        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
-        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
                                                                OperatorXBase* 
parent)
@@ -62,7 +40,8 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
           batch_size(state->batch_size()),
           _agg_data(std::make_unique<DistinctDataVariants>()),
           _child_block(Block::create_unique()),
-          _aggregated_block(Block::create_unique()) {}
+          _aggregated_block(Block::create_unique()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
 
 Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -113,10 +92,14 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                             return true;
                         }
 
+                        const auto* reduction = _is_single_backend
+                                                        ? 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                        : 
STREAMING_HT_MIN_REDUCTION;
+
                         // Find the appropriate reduction factor in our table 
for the current hash table sizes.
                         int cache_level = 0;
                         while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+                               ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
                             ++cache_level;
                         }
 
@@ -145,8 +128,7 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                         //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
                         //      ? current_reduction
                         //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+                        double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
 
                         //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
                         //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h 
b/be/src/exec/operator/distinct_streaming_aggregation_operator.h
index 392da56e89b..07e015af86e 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h
@@ -84,6 +84,8 @@ private:
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
     RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+
+    bool _is_single_backend = false;
 };
 
 class DistinctStreamingAggOperatorX final
diff --git a/be/src/exec/operator/streaming_agg_min_reduction.h 
b/be/src/exec/operator/streaming_agg_min_reduction.h
new file mode 100644
index 00000000000..df2acc66ec8
--- /dev/null
+++ b/be/src/exec/operator/streaming_agg_min_reduction.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace doris {
+
+/// The minimum reduction factor (input rows divided by output rows) to grow 
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently 
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the 
bucket
+/// arrays will fit in  a cache level. Intuitively, we don't want the working 
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the 
input
+/// enough to outweigh the increased memory latency we'll incur for each hash 
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of 
the
+/// final reduction. It may be biased either way depending on the ordering of 
the
+/// input. If the input order is random, we will underestimate the final 
reduction
+/// factor because the probability of a row having the same key as a previous 
row
+/// increases as more input is processed.  If the input order is correlated 
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we 
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final 
reduction
+/// using the planner's estimated input cardinality and the assumption that 
input
+/// is in a random order. This means that we assume that the reduction factor 
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
+    // bytes is greater than this threshold.
+    int min_ht_mem;
+    // The minimum reduction factor to expand the hash tables.
+    double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the 
cache size
+// of the machine that we're running on.
+static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 256k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 256k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 4.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 5.0},
+};
+
+static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
+        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+} // namespace doris
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp 
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index 4725e8ffb10..5ea488cf7e7 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -25,6 +25,7 @@
 #include "common/cast_set.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "exec/operator/operator.h"
+#include "exec/operator/streaming_agg_min_reduction.h"
 #include "exprs/aggregate/aggregate_function_simple_factory.h"
 #include "exprs/vectorized_agg_fn.h"
 #include "exprs/vslot_ref.h"
@@ -35,54 +36,13 @@ class RuntimeState;
 } // namespace doris
 
 namespace doris {
-/// The minimum reduction factor (input rows divided by output rows) to grow 
hash tables
-/// in a streaming preaggregation, given that the hash tables are currently 
the given
-/// size or above. The sizes roughly correspond to hash table sizes where the 
bucket
-/// arrays will fit in  a cache level. Intuitively, we don't want the working 
set of the
-/// aggregation to expand to the next level of cache unless we're reducing the 
input
-/// enough to outweigh the increased memory latency we'll incur for each hash 
table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of 
the
-/// final reduction. It may be biased either way depending on the ordering of 
the
-/// input. If the input order is random, we will underestimate the final 
reduction
-/// factor because the probability of a row having the same key as a previous 
row
-/// increases as more input is processed.  If the input order is correlated 
with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we 
underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final 
reduction
-/// using the planner's estimated input cardinality and the assumption that 
input
-/// is in a random order. This means that we assume that the reduction factor 
will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
-    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
-    // bytes is greater than this threshold.
-    int min_ht_mem;
-    // The minimum reduction factor to expand the hash tables.
-    double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the 
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
-        // Expand up to L2 cache always.
-        {0, 0.0},
-        // Expand into L3 cache if we look like we're getting some reduction.
-        // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
-        // Expand into main memory if we're getting a significant reduction.
-        // The L3 cache is generally 16MB or more
-        {16 * 1024 * 1024, 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
-        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : Base(state, parent),
           _agg_data(std::make_unique<AggregatedDataVariants>()),
           _child_block(Block::create_unique()),
-          _pre_aggregated_block(Block::create_unique()) {}
+          _pre_aggregated_block(Block::create_unique()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
 
 Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) 
{
     RETURN_IF_ERROR(Base::init(state, info));
@@ -235,10 +195,14 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
                             return true;
                         }
 
+                        const auto* reduction = _is_single_backend
+                                                        ? 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                        : 
STREAMING_HT_MIN_REDUCTION;
+
                         // Find the appropriate reduction factor in our table 
for the current hash table sizes.
                         int cache_level = 0;
                         while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+                               ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
                             ++cache_level;
                         }
 
@@ -267,8 +231,7 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
                         //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
                         //      ? current_reduction
                         //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+                        double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
 
                         //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
                         //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
diff --git a/be/src/exec/operator/streaming_aggregation_operator.h 
b/be/src/exec/operator/streaming_aggregation_operator.h
index f72d6dce24a..cd4ab29b068 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.h
+++ b/be/src/exec/operator/streaming_aggregation_operator.h
@@ -192,6 +192,8 @@ private:
                              }},
                    _agg_data->method_variant);
     }
+
+    bool _is_single_backend = false;
 };
 
 class StreamingAggOperatorX MOCK_REMOVE(final) : public 
StatefulOperatorX<StreamingAggLocalState> {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b8298174f3e..10c4a7a96b6 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -877,6 +877,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, 
query_ctx));
     SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
+    // Set single_backend_query before prepare() so that pipeline local states
+    // (e.g. StreamingAggLocalState) can read the correct value in their 
constructors.
+    query_ctx->set_single_backend_query(params.__isset.query_options &&
+                                        
params.query_options.__isset.single_backend_query &&
+                                        
params.query_options.single_backend_query);
     int64_t duration_ns = 0;
     std::shared_ptr<PipelineFragmentContext> context = 
std::make_shared<PipelineFragmentContext>(
             query_ctx->query_id(), params, query_ctx, _exec_env, cb,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 1326d0bdfbe..aa1746ed8b0 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -107,6 +107,12 @@ public:
         return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
     }
 
+    bool is_single_backend_query() const { return _is_single_backend_query; }
+
+    void set_single_backend_query(bool is_single_backend_query) {
+        _is_single_backend_query = is_single_backend_query;
+    }
+
     int64_t get_remaining_query_time_seconds() const {
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
@@ -381,6 +387,8 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    bool _is_single_backend_query = false;
+
     // file cache context holders
     std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> 
_query_context_holders;
     // instance id + node id -> cte scan
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d0f002a3f15..f0705f8e195 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -835,6 +835,7 @@ public class Coordinator implements CoordInterface {
             // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does not
             // send data until ExchangeNode is ready to receive.
             boolean twoPhaseExecution = fragments.size() > 1;
+            boolean isSingleBackend = addressToBackendID.size() == 1;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
@@ -871,6 +872,7 @@ public class Coordinator implements CoordInterface {
                     
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
                     
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
                     
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+                    
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
                     
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
 
                     
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), 
pipelineExecContext);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index f3c825cf9d8..f1a124f487c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
     public final Supplier<Set<TUniqueId>> instanceIds = 
Suppliers.memoize(this::getInstanceIds);
     public final Supplier<Map<TNetworkAddress, Long>> backends = 
Suppliers.memoize(this::getBackends);
     public final Supplier<Integer> scanRangeNum = 
Suppliers.memoize(this::getScanRangeNum);
+    public final Supplier<Boolean> isSingleBackendQuery = 
Suppliers.memoize(this::computeIsSingleBackendQuery);
     public final Supplier<TNetworkAddress> directConnectFrontendAddress
             = Suppliers.memoize(this::computeDirectConnectCoordinator);
 
@@ -447,6 +448,10 @@ public class CoordinatorContext {
         return scanRangeNum;
     }
 
+    private boolean computeIsSingleBackendQuery() {
+        return backends.get().size() == 1;
+    }
+
     private int computeScanRangeNumByScanRange(TScanRangeParams param) {
         int scanRangeNum = 0;
         TScanRange scanRange = param.getScanRange();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index a026f066eaf..41b4ac9e765 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -107,6 +107,11 @@ public class ThriftPlansBuilder {
         Set<Integer> fragmentToNotifyClose = 
setParamsForRecursiveCteNode(distributedPlans,
                 coordinatorContext.runtimeFilters);
 
+        // Determine whether this query is assigned to a single backend and 
propagate it to
+        // TQueryOptions so that BE can apply more appropriate optimization 
strategies (e.g.
+        // streaming aggregation hash table thresholds).
+        
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
         // we should set runtime predicate first, then we can use heap sort 
and to thrift
         setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..495c1477647 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,11 @@ struct TQueryOptions {
   // Use paimon-cpp to read Paimon splits on BE
   201: optional bool enable_paimon_cpp_reader = false;
 
+  // Whether all fragments of this query are assigned to a single backend.
+  // When true, the streaming aggregation operator can use more aggressive
+  // hash table expansion thresholds since all data is local.
+  202: optional bool single_backend_query = false;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to