This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 8dfa03fb587 reserve bock in scanner (#44185)
8dfa03fb587 is described below
commit 8dfa03fb587dbc47870b623feb13e030b2252b49
Author: TengJianPing <[email protected]>
AuthorDate: Wed Nov 20 09:57:54 2024 +0800
reserve bock in scanner (#44185)
---
be/src/pipeline/exec/operator.h | 5 --
.../exec/streaming_aggregation_operator.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 8 +--
be/src/pipeline/pipeline_task.cpp | 43 ++++++++--------
be/src/runtime/query_context.h | 4 +-
be/src/runtime/runtime_state.h | 14 ++---
be/src/vec/exec/scan/scanner_context.cpp | 17 +++++--
be/src/vec/exec/scan/scanner_context.h | 1 +
be/src/vec/exec/scan/scanner_scheduler.cpp | 27 +++++++++-
be/src/vec/spill/spill_stream.cpp | 4 +-
.../java/org/apache/doris/qe/SessionVariable.java | 59 +++++++---------------
gensrc/thrift/PaloInternalService.thrift | 8 +++
.../query/test_nested_type_with_resize.groovy | 8 +--
.../nereids_rules_p0/mv/variant/variant_mv.groovy | 2 +-
.../query_p0/limit/test_group_by_limit.groovy | 2 +-
.../suites/spill_p0/aggregate_spill.groovy | 2 +-
regression-test/suites/variant_p0/load.groovy | 2 +-
regression-test/suites/variant_p0/nested.groovy | 4 +-
.../suites/variant_p0/test_sub_path_pruning.groovy | 2 +-
19 files changed, 111 insertions(+), 103 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e7c668d2cdb..4c431470435 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -645,11 +645,6 @@ public:
[[nodiscard]] std::string get_name() const override { return _name; }
- [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state,
vectorized::Block* block,
- bool eos) {
- return true;
- }
-
virtual bool should_dry_run(RuntimeState* state) { return false; }
[[nodiscard]] virtual bool count_down_destination() { return true; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 6532c5571f6..2fff3d1b7c7 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1154,7 +1154,7 @@ Status StreamingAggOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
_aggregate_evaluators.push_back(evaluator);
}
- if (state->enable_agg_spill()) {
+ if (state->enable_spill()) {
// If spill enabled, the streaming agg should not occupy too much
memory.
_spill_streaming_agg_mem_limit =
state->query_options().__isset.spill_streaming_agg_mem_limit
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 3c275b4b8a2..944374b66e2 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1288,7 +1288,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
/// PartitionedAggSourceOperatorX does not support "group by limit
opt(#29641)" yet.
/// If `group_by_limit_opt` is true, then it might not need to spill
at all.
- const bool enable_spill = _runtime_state->enable_agg_spill() &&
+ const bool enable_spill = _runtime_state->enable_spill() &&
!tnode.agg_node.grouping_exprs.empty() &&
!group_by_limit_opt;
if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
@@ -1382,8 +1382,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::HASH_JOIN_NODE: {
const auto is_broadcast_join =
tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join;
- const auto enable_join_spill = _runtime_state->enable_join_spill();
- if (enable_join_spill && !is_broadcast_join) {
+ const auto enable_spill = _runtime_state->enable_spill();
+ if (enable_spill && !is_broadcast_join) {
auto tnode_ = tnode;
/// TODO: support rf in partitioned hash join
tnode_.runtime_filters.clear();
@@ -1499,7 +1499,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
case TPlanNodeType::SORT_NODE: {
- const auto should_spill = _runtime_state->enable_sort_spill() &&
+ const auto should_spill = _runtime_state->enable_spill() &&
tnode.sort_node.algorithm ==
TSortAlgorithm::FULL_SORT;
if (should_spill) {
op.reset(new SpillSortSourceOperatorX(pool, tnode,
next_operator_id(), descs));
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index e6801121358..69a1c490911 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,7 +426,7 @@ Status PipelineTask::execute(bool* eos) {
_root->reset_reserve_mem_size(_state);
auto workload_group = _state->get_query_ctx()->workload_group();
- if (workload_group && reserve_size > 0) {
+ if (workload_group && _state->enable_reserve_memory() &&
reserve_size > 0) {
auto st = thread_context()->try_reserve_memory(reserve_size);
COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -458,25 +458,28 @@ Status PipelineTask::execute(bool* eos) {
DEFER_RELEASE_RESERVED();
COUNTER_UPDATE(_memory_reserve_times, 1);
const auto sink_reserve_size = _sink->get_reserve_mem_size(_state,
*eos);
- status = thread_context()->try_reserve_memory(sink_reserve_size);
- if (!status.ok()) {
- COUNTER_UPDATE(_memory_reserve_failed_times, 1);
- LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
- << PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
- << ", sink name: " << _sink->get_name()
- << ", node id: " << _sink->node_id() << ", task id:
" << _state->task_id()
- << ", failed: " << status.to_string()
- << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
- _state->get_query_ctx()->update_paused_reason(status);
- _state->get_query_ctx()->set_low_memory_mode();
- ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
sink_reserve_size);
- DCHECK_EQ(_pending_block.get(), nullptr);
- _pending_block = std::move(_block);
- _block =
vectorized::Block::create_unique(_pending_block->clone_empty());
- _eos = *eos;
- *eos = false;
- continue;
+ if (_state->enable_reserve_memory()) {
+ status =
thread_context()->try_reserve_memory(sink_reserve_size);
+ if (!status.ok()) {
+ COUNTER_UPDATE(_memory_reserve_failed_times, 1);
+ LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
+ << PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
+ << ", sink name: " << _sink->get_name()
+ << ", node id: " << _sink->node_id()
+ << ", task id: " << _state->task_id()
+ << ", failed: " << status.to_string()
+ << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+ _state->get_query_ctx()->update_paused_reason(status);
+ _state->get_query_ctx()->set_low_memory_mode();
+
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ _state->get_query_ctx()->shared_from_this(),
sink_reserve_size);
+ DCHECK_EQ(_pending_block.get(), nullptr);
+ _pending_block = std::move(_block);
+ _block =
vectorized::Block::create_unique(_pending_block->clone_empty());
+ _eos = *eos;
+ *eos = false;
+ continue;
+ }
}
// Define a lambda function to catch sink exception, because sink
will check
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index e363fa3ba49..49d7742d03c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -313,9 +313,7 @@ public:
}
if (is_low_watermark &&
- ((_query_options.__isset.enable_join_spill &&
_query_options.enable_join_spill) ||
- (_query_options.__isset.enable_sort_spill &&
_query_options.enable_sort_spill) ||
- (_query_options.__isset.enable_agg_spill &&
_query_options.enable_agg_spill))) {
+ (_query_options.__isset.enable_spill &&
_query_options.enable_spill)) {
LOG(INFO) << "Query " << print_id(_query_id)
<< " goes to low memory mode due to workload group
low water mark "
"reached and the query enable spill";
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 31e1d378526..5ca361246e4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -561,19 +561,13 @@ public:
std::shared_ptr<IRuntimeFilter>*
producer_filter);
bool is_nereids() const;
- bool enable_join_spill() const {
- return (_query_options.__isset.enable_force_spill &&
_query_options.enable_force_spill) ||
- (_query_options.__isset.enable_join_spill &&
_query_options.enable_join_spill);
- }
-
- bool enable_sort_spill() const {
- return (_query_options.__isset.enable_force_spill &&
_query_options.enable_force_spill) ||
- (_query_options.__isset.enable_sort_spill &&
_query_options.enable_sort_spill);
+ bool enable_reserve_memory() const {
+ return _query_options.__isset.enable_reserve_memory &&
_query_options.enable_reserve_memory;
}
- bool enable_agg_spill() const {
+ bool enable_spill() const {
return (_query_options.__isset.enable_force_spill &&
_query_options.enable_force_spill) ||
- (_query_options.__isset.enable_agg_spill &&
_query_options.enable_agg_spill);
+ (_query_options.__isset.enable_spill &&
_query_options.enable_spill);
}
bool enable_force_spill() const {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index af92080c777..64411ecd290 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -240,7 +240,9 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool
force) {
}
void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
- if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
+ // If under low memory mode, should not return the freeblock, it will
occupy too much memory.
+ if (!_local_state->low_memory_mode() && block->mem_reuse() &&
+ _block_memory_usage < _max_bytes_in_queue) {
size_t block_size_to_reuse = block->allocated_bytes();
_block_memory_usage += block_size_to_reuse;
_scanner_memory_used_counter->set(_block_memory_usage);
@@ -251,6 +253,14 @@ void
ScannerContext::return_free_block(vectorized::BlockUPtr block) {
}
}
+void ScannerContext::clear_free_blocks() {
+ vectorized::BlockUPtr block;
+ while (_free_blocks.try_dequeue(block)) {
+ // do nothing
+ }
+ block.reset();
+}
+
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
@@ -321,10 +331,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
update_peak_memory_usage(-current_block->allocated_bytes());
// consume current block
block->swap(*current_block);
- // If under low memory mode, should not return the freeblock, it
will occupy too memory.
- if (!_local_state->low_memory_mode()) {
- return_free_block(std::move(current_block));
- }
+ return_free_block(std::move(current_block));
} else {
// This scan task do not have any cached blocks.
_tasks_queue.pop_front();
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index b81834fd7b7..da04a764a55 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -123,6 +123,7 @@ public:
vectorized::BlockUPtr get_free_block(bool force);
void return_free_block(vectorized::BlockUPtr block);
+ void clear_free_blocks();
inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
int64_t block_memory_usage() { return _block_memory_usage; }
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 0dae893b5c6..65883355130 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -277,6 +277,7 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
bool first_read = true;
// If the first block is full, then it is true. Or the first block
+ second block > batch_size
bool has_first_full_block = false;
+ size_t block_avg_bytes = ctx->batch_size();
// During low memory mode, every scan task will return at most 2
block to reduce memory usage.
while (!eos && raw_bytes_read < raw_bytes_threshold &&
@@ -291,7 +292,25 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
config::doris_scanner_max_run_time_ms * 1e6) {
break;
}
- BlockUPtr free_block = ctx->get_free_block(first_read);
+ DEFER_RELEASE_RESERVED();
+ BlockUPtr free_block;
+ if (first_read) {
+ free_block = ctx->get_free_block(first_read);
+ } else {
+ if (state->enable_reserve_memory()) {
+ status =
thread_context()->try_reserve_memory(block_avg_bytes);
+ if (!status.ok()) {
+ LOG(INFO) << "query: " <<
print_id(state->query_id())
+ << ", scanner try to reserve: "
+ << PrettyPrinter::print(block_avg_bytes,
TUnit::BYTES)
+ << ", failed: " << status.to_string() <<
", debug info: "
+ <<
GlobalMemoryArbitrator::process_mem_log_str();
+ ctx->clear_free_blocks();
+ break;
+ }
+ }
+ free_block = ctx->get_free_block(first_read);
+ }
if (free_block == nullptr) {
break;
}
@@ -338,6 +357,12 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
+ if (scan_task->cached_blocks.back().first->rows() > 0) {
+ block_avg_bytes =
(scan_task->cached_blocks.back().first->allocated_bytes() +
+
scan_task->cached_blocks.back().first->rows() - 1) /
+
scan_task->cached_blocks.back().first->rows() *
+ ctx->batch_size();
+ }
} // end for while
if (UNLIKELY(!status.ok())) {
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index a27916a87a3..fafebb4e62a 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -116,7 +116,9 @@ Status SpillStream::prepare() {
}
SpillReaderUPtr SpillStream::create_separate_reader() const {
- return std::make_unique<SpillReader>(stream_id_, writer_->get_file_path());
+ return std::make_unique<SpillReader>(
+
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
+ writer_->get_file_path());
}
const TUniqueId& SpillStream::query_id() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 52e78618c05..4024fa51ab5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -553,9 +553,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXTERNAL_AGG_PARTITION_BITS =
"external_agg_partition_bits";
public static final String SPILL_STREAMING_AGG_MEM_LIMIT =
"spill_streaming_agg_mem_limit";
public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
- public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
- public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
- public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
+ public static final String ENABLE_SPILL = "enable_spill";
+ public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
@@ -2133,28 +2132,20 @@ public class SessionVariable implements Serializable,
Writable {
public int maxFetchRemoteTabletCount = 512;
@VariableMgr.VarAttr(
- name = ENABLE_JOIN_SPILL,
- description = {"控制是否启用join算子落盘。默认为 false。",
- "Controls whether to enable spill to disk of join
operation. "
- + "The default value is false."},
- needForward = true, fuzzy = true)
- public boolean enableJoinSpill = false;
-
- @VariableMgr.VarAttr(
- name = ENABLE_SORT_SPILL,
- description = {"控制是否启用排序算子落盘。默认为 false。",
- "Controls whether to enable spill to disk of sort
operation. "
- + "The default value is false."},
+ name = ENABLE_RESERVE_MEMORY,
+ description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。",
+ "Controls whether to enable reserve memory before
allocating memory. "
+ + "The default value is true."},
needForward = true, fuzzy = true)
- public boolean enableSortSpill = false;
+ public boolean enableReserveMemory = true;
@VariableMgr.VarAttr(
- name = ENABLE_AGG_SPILL,
- description = {"控制是否启用聚合算子落盘。默认为 false。",
- "Controls whether to enable spill to disk of aggregation
operation. "
+ name = ENABLE_SPILL,
+ description = {"控制是否启用查询算子落盘。默认为 false。",
+ "Controls whether to enable spill to disk for query. "
+ "The default value is false."},
needForward = true, fuzzy = true)
- public boolean enableAggSpill = false;
+ public boolean enableSpill = false;
@VariableMgr.VarAttr(
name = ENABLE_FORCE_SPILL,
@@ -2287,18 +2278,6 @@ public class SessionVariable implements Serializable,
Writable {
return enableESParallelScroll;
}
- public boolean isEnableJoinSpill() {
- return enableJoinSpill;
- }
-
- public void setEnableJoinSpill(boolean enableJoinSpill) {
- this.enableJoinSpill = enableJoinSpill;
- }
-
- public boolean isEnableSortSpill() {
- return enableSortSpill;
- }
-
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
@SuppressWarnings("checkstyle:Indentation")
@@ -2428,10 +2407,8 @@ public class SessionVariable implements Serializable,
Writable {
// for spill to disk
if (Config.pull_request_id > 10000) {
if (Config.pull_request_id % 2 == 0) {
- this.enableJoinSpill = true;
- this.enableSortSpill = true;
- this.enableAggSpill = true;
-
+ this.enableSpill = true;
+ this.enableReserveMemory = true;
randomInt = random.nextInt(4);
switch (randomInt) {
case 0:
@@ -2448,9 +2425,8 @@ public class SessionVariable implements Serializable,
Writable {
break;
}
} else {
- this.enableJoinSpill = false;
- this.enableSortSpill = false;
- this.enableAggSpill = false;
+ this.enableSpill = false;
+ this.enableReserveMemory = false;
}
}
}
@@ -3863,9 +3839,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setSkipBadTablet(skipBadTablet);
tResult.setDisableFileCache(disableFileCache);
- tResult.setEnableJoinSpill(enableJoinSpill);
- tResult.setEnableSortSpill(enableSortSpill);
- tResult.setEnableAggSpill(enableAggSpill);
+ tResult.setEnableReserveMemory(enableReserveMemory);
+ tResult.setEnableSpill(enableSpill);
tResult.setEnableForceSpill(enableForceSpill);
tResult.setExternalAggPartitionBits(externalAggPartitionBits);
tResult.setMinRevocableMem(minRevocableMem);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 37c7da3d702..f6350b7fb32 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -281,10 +281,13 @@ struct TQueryOptions {
100: optional bool enable_distinct_streaming_aggregation = true;
+ // deprecated
101: optional bool enable_join_spill = false
+ // deprecated
102: optional bool enable_sort_spill = false
+ // deprecated
103: optional bool enable_agg_spill = false
104: optional i64 min_revocable_mem = 0
@@ -356,6 +359,11 @@ struct TQueryOptions {
139: optional i32 query_slot_count = 0;
140: optional bool enable_auto_create_when_overwrite = false;
+
+ 141: optional bool enable_spill = false
+
+ 142: optional bool enable_reserve_memory = true
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
diff --git
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
index 2a599c1cc1a..eaf8c7e996b 100644
---
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
+++
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
@@ -65,9 +65,9 @@ suite("test_nested_type_with_resize") {
}
}
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
}
diff --git
a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
index 0d0415456f7..6e43620d449 100644
--- a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
+++ b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
@@ -573,7 +573,7 @@ suite("variant_mv") {
where g2.actor['id'] > 34259289;
"""
def query3_6 = """
- SELECT
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
[...]
+ SELECT
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
[...]
g1.id,
g2.type,
floor(cast(g1.actor['id'] as int) + 100.5),
diff --git a/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
b/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
index 271619c4a93..15844ff7f71 100644
--- a/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
+++ b/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
@@ -17,7 +17,7 @@
suite("test_group_by_limit", "query") {
-sql 'set enable_agg_spill=false'
+sql 'set enable_spill=false'
sql 'set enable_force_spill=false'
diff --git a/regression-test/suites/spill_p0/aggregate_spill.groovy
b/regression-test/suites/spill_p0/aggregate_spill.groovy
index 180ab37200f..9a0df219bf9 100644
--- a/regression-test/suites/spill_p0/aggregate_spill.groovy
+++ b/regression-test/suites/spill_p0/aggregate_spill.groovy
@@ -17,7 +17,7 @@
suite("aggregate_spill") {
sql """
- set enable_agg_spill = true;
+ set enable_spill = true;
"""
sql """
set enable_force_spill = true;
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index e79a58f8a7a..c18134a1dd0 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -290,7 +290,7 @@ suite("regression_test_variant", "p0"){
sql """insert into ${table_name} values (5, '{"i" : 1}'), (1, '{"a" :
1}')"""
sql """insert into ${table_name} values (6, '{"j" : 1}'), (1, '{"a" :
1}')"""
sql """insert into ${table_name} values (6, '{"k" : 1}'), (1, '{"a" :
1}')"""
- sql "select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
[...]
+ sql "select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
[...]
qt_sql_36_1 "select cast(v['a'] as int), cast(v['b'] as int),
cast(v['c'] as int) from ${table_name} order by k limit 10"
sql "DELETE FROM ${table_name} WHERE k=1"
sql "select * from ${table_name}"
diff --git a/regression-test/suites/variant_p0/nested.groovy
b/regression-test/suites/variant_p0/nested.groovy
index 90728df2532..ae4988d8a61 100644
--- a/regression-test/suites/variant_p0/nested.groovy
+++ b/regression-test/suites/variant_p0/nested.groovy
@@ -133,7 +133,7 @@ suite("regression_test_variant_nested", "p0"){
qt_sql """select
/*+SET_VAR(batch_size=1024,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16
-,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
[...]
+,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
[...]
qt_sql """select * from var_nested where v['k2'] = 'some' and
array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order
by k limit 1;"""
sql """INSERT INTO var_nested SELECT *, '{"k1":1, "k2": "some", "k3" :
[1234], "k4" : 1.10000, "k5" : [[123]], "nested1" : {"nested2" : [{"a" : 10,
"b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "4096") where number >
1024 limit 1024;"""
@@ -162,7 +162,7 @@
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
properties("replication_num" = "1", "disable_auto_compaction"
= "false", "enable_unique_key_merge_on_write" = "true",
"variant_enable_flatten_nested" = "true");
"""
sql """insert into var_nested2 select * from var_nested order by k
limit 1024"""
- qt_sql """select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
[...]
+ qt_sql """select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
[...]
qt_sql """select v['nested'] from var_nested2 where k < 10 order by k
limit 10;"""
// explode variant array
order_qt_explode_sql """select count(),cast(vv['xx'] as int) from
var_nested lateral view explode_variant_array(v['nested']) tmp as vv where
vv['xx'] = 10 group by cast(vv['xx'] as int)"""
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index f09f4713ad2..4226de14d81 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -139,7 +139,7 @@ suite("variant_sub_path_pruning", "variant_type"){
// two children
order_qt_sql """
- select
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
[...]
+ select
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
[...]
"""
order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test
union all select dt as c1 from pruning_test) v1;"""
order_qt_sql """select c1['b'] from (select dt['a'] as c1 from
pruning_test union all select dt['a'] as c1 from pruning_test) v1;"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]