This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a771037442f [Improvement](streaming-agg) adjust single streaming
threshold (#61104)
a771037442f is described below
commit a771037442fc378eb9f29e5275265689eb6f00bd
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 13 12:37:41 2026 +0800
[Improvement](streaming-agg) adjust single streaming threshold (#61104)
This pull request introduces an optimization for streaming aggregation
operators by detecting when a query is assigned to a single backend and
applying more aggressive hash table expansion thresholds in that case.
The change propagates a new `single_backend_query` flag from the
frontend to the backend, and both the `streaming_aggregation_operator`
and `distinct_streaming_aggregation_operator` use this flag to select
different hash table reduction thresholds, potentially improving
performance for single-backend queries.
**Single-backend query detection and propagation:**
- Added a `single_backend_query` field to `TQueryOptions` and set it in
the FE (`CoordinatorContext`, `Coordinator`, and `ThriftPlansBuilder`)
when all fragments are assigned to a single backend.
[[1]](diffhunk://#diff-e23e53cda5dd2228558b7c3d07ed85b76911e3962c1b6eb46108340966179a5eR443-R447)
[[2]](diffhunk://#diff-1e3445c79eb4d715c96a128cb34afc75943f324edf3a28eeffe791c5f72db7a3R106)
[[3]](diffhunk://#diff-1e3445c79eb4d715c96a128cb34afc75943f324edf3a28eeffe791c5f72db7a3R451-R454)
[[4]](diffhunk://#diff-45871b12ae79fb05592420c1f6399b3e5a5836873690278ec917403451605b94R109-R113)
[[5]](diffhunk://#diff-1065d3f88b9a3b4bb8f151cac8514ebd39085c9dc2f0081b0eec43fd67ad7b74R838)
[[6]](diffhunk://#diff-1065d3f88b9a3b4bb8f151cac8514ebd39085c9dc2f0081b0eec43fd67ad7b74R875)
- The BE receives and stores this flag in `QueryContext`, with new
getter and setter methods and a backing field.
[[1]](diffhunk://#diff-3be0bcdb0c3c5fe6415556cf49c7270d4fc2e2f071c0240145ce175a32a484e2R926-R928)
[[2]](diffhunk://#diff-b6e358e9c765ec5d14aa33dfeab3f20288e26bf636e4da72161aac4a42c82923R112-R117)
[[3]](diffhunk://#diff-b6e358e9c765ec5d14aa33dfeab3f20288e26bf636e4da72161aac4a42c82923R397-R398)
**Streaming aggregation operator optimization:**
- Defined new, more aggressive hash table expansion thresholds for
single-backend queries (`SINGLE_BE_STREAMING_HT_MIN_REDUCTION`) in both
`streaming_aggregation_operator.cpp` and
`distinct_streaming_aggregation_operator.cpp`.
[[1]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L69-R86)
[[2]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aR56-R66)
- The local state for both streaming aggregation operators now checks
the `single_backend_query` flag and uses the appropriate threshold table
to decide when to expand hash tables.
[[1]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497R95)
[[2]](diffhunk://#diff-81893b8013f0ade85a4aca0bcf807655fc08a57e7a6cc8531278cdc5330fae9eR119)
[[3]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aL65-R77)
[[4]](diffhunk://#diff-0098f1d5348adf3114a99bf8e273b95e68e137e3d45a1c89e5db06134a63adb2R90-R91)
- Logic in the hash table expansion decision method selects the correct
reduction table based on the flag, ensuring the optimization is applied
only for single-backend queries.
[[1]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497R251-R258)
[[2]](diffhunk://#diff-5f2882c1f711fc0954459c6b98a1dcde9b688bb5634be71ce6f585332d8b6497L271-R287)
[[3]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aR128-R135)
[[4]](diffhunk://#diff-e78adff1d9b4effff7657f3966e776bf9e8acd9dee58f69df3aa2692a450501aL148-R164)
---
.../distinct_streaming_aggregation_operator.cpp | 36 +++-------
.../distinct_streaming_aggregation_operator.h | 2 +
be/src/exec/operator/streaming_agg_min_reduction.h | 76 ++++++++++++++++++++++
.../operator/streaming_aggregation_operator.cpp | 55 +++-------------
.../exec/operator/streaming_aggregation_operator.h | 2 +
be/src/runtime/fragment_mgr.cpp | 5 ++
be/src/runtime/query_context.h | 8 +++
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +
.../org/apache/doris/qe/CoordinatorContext.java | 5 ++
.../doris/qe/runtime/ThriftPlansBuilder.java | 5 ++
gensrc/thrift/PaloInternalService.thrift | 5 ++
11 files changed, 128 insertions(+), 73 deletions(-)
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
index 48a43ca4c39..f785f056a73 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
@@ -23,6 +23,7 @@
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "exec/operator/streaming_agg_min_reduction.h"
#include "exprs/vectorized_agg_fn.h"
namespace doris {
@@ -32,29 +33,6 @@ class RuntimeState;
namespace doris {
#include "common/compile_check_begin.h"
-struct StreamingHtMinReductionEntry {
- // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
- // bytes is greater than this threshold.
- int min_ht_mem;
- // The minimum reduction factor to expand the hash tables.
- double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
- // Expand up to L2 cache always.
- {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
- // Expand into L3 cache if we look like we're getting some reduction.
- // At present, The L2 cache is generally 1024k or more
- {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
- // Expand into main memory if we're getting a significant reduction.
- // The L3 cache is generally 16MB or more
- {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
- sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
OperatorXBase*
parent)
@@ -62,7 +40,8 @@
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
batch_size(state->batch_size()),
_agg_data(std::make_unique<DistinctDataVariants>()),
_child_block(Block::create_unique()),
- _aggregated_block(Block::create_unique()) {}
+ _aggregated_block(Block::create_unique()),
+
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -113,10 +92,14 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
return true;
}
+ const auto* reduction = _is_single_backend
+ ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
STREAMING_HT_MIN_REDUCTION;
+
// Find the appropriate reduction factor in our table
for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
++cache_level;
}
@@ -145,8 +128,7 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
// double estimated_reduction = aggregated_input_rows
>= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+ double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h
b/be/src/exec/operator/distinct_streaming_aggregation_operator.h
index 392da56e89b..07e015af86e 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h
@@ -84,6 +84,8 @@ private:
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+
+ bool _is_single_backend = false;
};
class DistinctStreamingAggOperatorX final
diff --git a/be/src/exec/operator/streaming_agg_min_reduction.h
b/be/src/exec/operator/streaming_agg_min_reduction.h
new file mode 100644
index 00000000000..df2acc66ec8
--- /dev/null
+++ b/be/src/exec/operator/streaming_agg_min_reduction.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace doris {
+
+/// The minimum reduction factor (input rows divided by output rows) to grow
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the
bucket
+/// arrays will fit in a cache level. Intuitively, we don't want the working
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the
input
+/// enough to outweigh the increased memory latency we'll incur for each hash
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of
the
+/// final reduction. It may be biased either way depending on the ordering of
the
+/// input. If the input order is random, we will underestimate the final
reduction
+/// factor because the probability of a row having the same key as a previous
row
+/// increases as more input is processed. If the input order is correlated
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final
reduction
+/// using the planner's estimated input cardinality and the assumption that
input
+/// is in a random order. This means that we assume that the reduction factor
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+ // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
+ // bytes is greater than this threshold.
+ int min_ht_mem;
+ // The minimum reduction factor to expand the hash tables.
+ double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the
cache size
+// of the machine that we're running on.
+static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 256k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 256k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 4.0},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 5.0},
+};
+
+static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
+ sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+} // namespace doris
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index 4725e8ffb10..5ea488cf7e7 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -25,6 +25,7 @@
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "exec/operator/operator.h"
+#include "exec/operator/streaming_agg_min_reduction.h"
#include "exprs/aggregate/aggregate_function_simple_factory.h"
#include "exprs/vectorized_agg_fn.h"
#include "exprs/vslot_ref.h"
@@ -35,54 +36,13 @@ class RuntimeState;
} // namespace doris
namespace doris {
-/// The minimum reduction factor (input rows divided by output rows) to grow
hash tables
-/// in a streaming preaggregation, given that the hash tables are currently
the given
-/// size or above. The sizes roughly correspond to hash table sizes where the
bucket
-/// arrays will fit in a cache level. Intuitively, we don't want the working
set of the
-/// aggregation to expand to the next level of cache unless we're reducing the
input
-/// enough to outweigh the increased memory latency we'll incur for each hash
table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of
the
-/// final reduction. It may be biased either way depending on the ordering of
the
-/// input. If the input order is random, we will underestimate the final
reduction
-/// factor because the probability of a row having the same key as a previous
row
-/// increases as more input is processed. If the input order is correlated
with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we
underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final
reduction
-/// using the planner's estimated input cardinality and the assumption that
input
-/// is in a random order. This means that we assume that the reduction factor
will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
- // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
- // bytes is greater than this threshold.
- int min_ht_mem;
- // The minimum reduction factor to expand the hash tables.
- double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
- // Expand up to L2 cache always.
- {0, 0.0},
- // Expand into L3 cache if we look like we're getting some reduction.
- // At present, The L2 cache is generally 1024k or more
- {1024 * 1024, 1.1},
- // Expand into main memory if we're getting a significant reduction.
- // The L3 cache is generally 16MB or more
- {16 * 1024 * 1024, 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
- sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
_agg_data(std::make_unique<AggregatedDataVariants>()),
_child_block(Block::create_unique()),
- _pre_aggregated_block(Block::create_unique()) {}
+ _pre_aggregated_block(Block::create_unique()),
+
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info)
{
RETURN_IF_ERROR(Base::init(state, info));
@@ -235,10 +195,14 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
return true;
}
+ const auto* reduction = _is_single_backend
+ ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
STREAMING_HT_MIN_REDUCTION;
+
// Find the appropriate reduction factor in our table
for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
++cache_level;
}
@@ -267,8 +231,7 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
// double estimated_reduction = aggregated_input_rows
>= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+ double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
diff --git a/be/src/exec/operator/streaming_aggregation_operator.h
b/be/src/exec/operator/streaming_aggregation_operator.h
index f72d6dce24a..cd4ab29b068 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.h
+++ b/be/src/exec/operator/streaming_aggregation_operator.h
@@ -192,6 +192,8 @@ private:
}},
_agg_data->method_variant);
}
+
+ bool _is_single_backend = false;
};
class StreamingAggOperatorX MOCK_REMOVE(final) : public
StatefulOperatorX<StreamingAggLocalState> {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b8298174f3e..10c4a7a96b6 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -877,6 +877,11 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source,
query_ctx));
SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
+ // Set single_backend_query before prepare() so that pipeline local states
+ // (e.g. StreamingAggLocalState) can read the correct value in their
constructors.
+ query_ctx->set_single_backend_query(params.__isset.query_options &&
+
params.query_options.__isset.single_backend_query &&
+
params.query_options.single_backend_query);
int64_t duration_ns = 0;
std::shared_ptr<PipelineFragmentContext> context =
std::make_shared<PipelineFragmentContext>(
query_ctx->query_id(), params, query_ctx, _exec_env, cb,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 1326d0bdfbe..aa1746ed8b0 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -107,6 +107,12 @@ public:
return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
}
+ bool is_single_backend_query() const { return _is_single_backend_query; }
+
+ void set_single_backend_query(bool is_single_backend_query) {
+ _is_single_backend_query = is_single_backend_query;
+ }
+
int64_t get_remaining_query_time_seconds() const {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
@@ -381,6 +387,8 @@ private:
std::string _load_error_url;
std::string _first_error_msg;
+ bool _is_single_backend_query = false;
+
// file cache context holders
std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr>
_query_context_holders;
// instance id + node id -> cte scan
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d0f002a3f15..f0705f8e195 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -835,6 +835,7 @@ public class Coordinator implements CoordInterface {
// TableValuedFunctionScanNode, we should ensure
TableValuedFunctionScanNode does not
// send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() > 1;
+ boolean isSingleBackend = addressToBackendID.size() == 1;
for (PlanFragment fragment : fragments) {
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());
@@ -871,6 +872,7 @@ public class Coordinator implements CoordInterface {
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId),
pipelineExecContext);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index f3c825cf9d8..f1a124f487c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
public final Supplier<Set<TUniqueId>> instanceIds =
Suppliers.memoize(this::getInstanceIds);
public final Supplier<Map<TNetworkAddress, Long>> backends =
Suppliers.memoize(this::getBackends);
public final Supplier<Integer> scanRangeNum =
Suppliers.memoize(this::getScanRangeNum);
+ public final Supplier<Boolean> isSingleBackendQuery =
Suppliers.memoize(this::computeIsSingleBackendQuery);
public final Supplier<TNetworkAddress> directConnectFrontendAddress
= Suppliers.memoize(this::computeDirectConnectCoordinator);
@@ -447,6 +448,10 @@ public class CoordinatorContext {
return scanRangeNum;
}
+ private boolean computeIsSingleBackendQuery() {
+ return backends.get().size() == 1;
+ }
+
private int computeScanRangeNumByScanRange(TScanRangeParams param) {
int scanRangeNum = 0;
TScanRange scanRange = param.getScanRange();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index a026f066eaf..41b4ac9e765 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -107,6 +107,11 @@ public class ThriftPlansBuilder {
Set<Integer> fragmentToNotifyClose =
setParamsForRecursiveCteNode(distributedPlans,
coordinatorContext.runtimeFilters);
+ // Determine whether this query is assigned to a single backend and
propagate it to
+ // TQueryOptions so that BE can apply more appropriate optimization
strategies (e.g.
+ // streaming aggregation hash table thresholds).
+
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
// we should set runtime predicate first, then we can use heap sort
and to thrift
setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..495c1477647 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,11 @@ struct TQueryOptions {
// Use paimon-cpp to read Paimon splits on BE
201: optional bool enable_paimon_cpp_reader = false;
+ // Whether all fragments of this query are assigned to a single backend.
+ // When true, the streaming aggregation operator can use more aggressive
+ // hash table expansion thresholds since all data is local.
+ 202: optional bool single_backend_query = false;
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]