This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 8dfa03fb587 reserve bock in scanner (#44185)
8dfa03fb587 is described below

commit 8dfa03fb587dbc47870b623feb13e030b2252b49
Author: TengJianPing <[email protected]>
AuthorDate: Wed Nov 20 09:57:54 2024 +0800

    reserve bock in scanner (#44185)
---
 be/src/pipeline/exec/operator.h                    |  5 --
 .../exec/streaming_aggregation_operator.cpp        |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  8 +--
 be/src/pipeline/pipeline_task.cpp                  | 43 ++++++++--------
 be/src/runtime/query_context.h                     |  4 +-
 be/src/runtime/runtime_state.h                     | 14 ++---
 be/src/vec/exec/scan/scanner_context.cpp           | 17 +++++--
 be/src/vec/exec/scan/scanner_context.h             |  1 +
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 27 +++++++++-
 be/src/vec/spill/spill_stream.cpp                  |  4 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 59 +++++++---------------
 gensrc/thrift/PaloInternalService.thrift           |  8 +++
 .../query/test_nested_type_with_resize.groovy      |  8 +--
 .../nereids_rules_p0/mv/variant/variant_mv.groovy  |  2 +-
 .../query_p0/limit/test_group_by_limit.groovy      |  2 +-
 .../suites/spill_p0/aggregate_spill.groovy         |  2 +-
 regression-test/suites/variant_p0/load.groovy      |  2 +-
 regression-test/suites/variant_p0/nested.groovy    |  4 +-
 .../suites/variant_p0/test_sub_path_pruning.groovy |  2 +-
 19 files changed, 111 insertions(+), 103 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e7c668d2cdb..4c431470435 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -645,11 +645,6 @@ public:
 
     [[nodiscard]] std::string get_name() const override { return _name; }
 
-    [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, 
vectorized::Block* block,
-                                                  bool eos) {
-        return true;
-    }
-
     virtual bool should_dry_run(RuntimeState* state) { return false; }
 
     [[nodiscard]] virtual bool count_down_destination() { return true; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 6532c5571f6..2fff3d1b7c7 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1154,7 +1154,7 @@ Status StreamingAggOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
         _aggregate_evaluators.push_back(evaluator);
     }
 
-    if (state->enable_agg_spill()) {
+    if (state->enable_spill()) {
         // If spill enabled, the streaming agg should not occupy too much 
memory.
         _spill_streaming_agg_mem_limit =
                 state->query_options().__isset.spill_streaming_agg_mem_limit
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 3c275b4b8a2..944374b66e2 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1288,7 +1288,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 
         /// PartitionedAggSourceOperatorX does not support "group by limit 
opt(#29641)" yet.
         /// If `group_by_limit_opt` is true, then it might not need to spill 
at all.
-        const bool enable_spill = _runtime_state->enable_agg_spill() &&
+        const bool enable_spill = _runtime_state->enable_spill() &&
                                   !tnode.agg_node.grouping_exprs.empty() && 
!group_by_limit_opt;
 
         if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
@@ -1382,8 +1382,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     case TPlanNodeType::HASH_JOIN_NODE: {
         const auto is_broadcast_join = 
tnode.hash_join_node.__isset.is_broadcast_join &&
                                        tnode.hash_join_node.is_broadcast_join;
-        const auto enable_join_spill = _runtime_state->enable_join_spill();
-        if (enable_join_spill && !is_broadcast_join) {
+        const auto enable_spill = _runtime_state->enable_spill();
+        if (enable_spill && !is_broadcast_join) {
             auto tnode_ = tnode;
             /// TODO: support rf in partitioned hash join
             tnode_.runtime_filters.clear();
@@ -1499,7 +1499,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         break;
     }
     case TPlanNodeType::SORT_NODE: {
-        const auto should_spill = _runtime_state->enable_sort_spill() &&
+        const auto should_spill = _runtime_state->enable_spill() &&
                                   tnode.sort_node.algorithm == 
TSortAlgorithm::FULL_SORT;
         if (should_spill) {
             op.reset(new SpillSortSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index e6801121358..69a1c490911 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,7 +426,7 @@ Status PipelineTask::execute(bool* eos) {
             _root->reset_reserve_mem_size(_state);
 
             auto workload_group = _state->get_query_ctx()->workload_group();
-            if (workload_group && reserve_size > 0) {
+            if (workload_group && _state->enable_reserve_memory() && 
reserve_size > 0) {
                 auto st = thread_context()->try_reserve_memory(reserve_size);
 
                 COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -458,25 +458,28 @@ Status PipelineTask::execute(bool* eos) {
             DEFER_RELEASE_RESERVED();
             COUNTER_UPDATE(_memory_reserve_times, 1);
             const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, 
*eos);
-            status = thread_context()->try_reserve_memory(sink_reserve_size);
-            if (!status.ok()) {
-                COUNTER_UPDATE(_memory_reserve_failed_times, 1);
-                LOG(INFO) << "query: " << print_id(query_id) << ", try to 
reserve: "
-                          << PrettyPrinter::print(sink_reserve_size, 
TUnit::BYTES)
-                          << ", sink name: " << _sink->get_name()
-                          << ", node id: " << _sink->node_id() << ", task id: 
" << _state->task_id()
-                          << ", failed: " << status.to_string()
-                          << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
-                _state->get_query_ctx()->update_paused_reason(status);
-                _state->get_query_ctx()->set_low_memory_mode();
-                ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                        _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size);
-                DCHECK_EQ(_pending_block.get(), nullptr);
-                _pending_block = std::move(_block);
-                _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
-                _eos = *eos;
-                *eos = false;
-                continue;
+            if (_state->enable_reserve_memory()) {
+                status = 
thread_context()->try_reserve_memory(sink_reserve_size);
+                if (!status.ok()) {
+                    COUNTER_UPDATE(_memory_reserve_failed_times, 1);
+                    LOG(INFO) << "query: " << print_id(query_id) << ", try to 
reserve: "
+                              << PrettyPrinter::print(sink_reserve_size, 
TUnit::BYTES)
+                              << ", sink name: " << _sink->get_name()
+                              << ", node id: " << _sink->node_id()
+                              << ", task id: " << _state->task_id()
+                              << ", failed: " << status.to_string()
+                              << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    _state->get_query_ctx()->update_paused_reason(status);
+                    _state->get_query_ctx()->set_low_memory_mode();
+                    
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                            _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size);
+                    DCHECK_EQ(_pending_block.get(), nullptr);
+                    _pending_block = std::move(_block);
+                    _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
+                    _eos = *eos;
+                    *eos = false;
+                    continue;
+                }
             }
 
             // Define a lambda function to catch sink exception, because sink 
will check
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index e363fa3ba49..49d7742d03c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -313,9 +313,7 @@ public:
             }
 
             if (is_low_watermark &&
-                ((_query_options.__isset.enable_join_spill && 
_query_options.enable_join_spill) ||
-                 (_query_options.__isset.enable_sort_spill && 
_query_options.enable_sort_spill) ||
-                 (_query_options.__isset.enable_agg_spill && 
_query_options.enable_agg_spill))) {
+                (_query_options.__isset.enable_spill && 
_query_options.enable_spill)) {
                 LOG(INFO) << "Query " << print_id(_query_id)
                           << " goes to low memory mode due to workload group 
low water mark "
                              "reached and the query enable spill";
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 31e1d378526..5ca361246e4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -561,19 +561,13 @@ public:
                                             std::shared_ptr<IRuntimeFilter>* 
producer_filter);
     bool is_nereids() const;
 
-    bool enable_join_spill() const {
-        return (_query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill) ||
-               (_query_options.__isset.enable_join_spill && 
_query_options.enable_join_spill);
-    }
-
-    bool enable_sort_spill() const {
-        return (_query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill) ||
-               (_query_options.__isset.enable_sort_spill && 
_query_options.enable_sort_spill);
+    bool enable_reserve_memory() const {
+        return _query_options.__isset.enable_reserve_memory && 
_query_options.enable_reserve_memory;
     }
 
-    bool enable_agg_spill() const {
+    bool enable_spill() const {
         return (_query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill) ||
-               (_query_options.__isset.enable_agg_spill && 
_query_options.enable_agg_spill);
+               (_query_options.__isset.enable_spill && 
_query_options.enable_spill);
     }
 
     bool enable_force_spill() const {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index af92080c777..64411ecd290 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -240,7 +240,9 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool 
force) {
 }
 
 void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
-    if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
+    // If under low memory mode, should not return the freeblock, it will 
occupy too much memory.
+    if (!_local_state->low_memory_mode() && block->mem_reuse() &&
+        _block_memory_usage < _max_bytes_in_queue) {
         size_t block_size_to_reuse = block->allocated_bytes();
         _block_memory_usage += block_size_to_reuse;
         _scanner_memory_used_counter->set(_block_memory_usage);
@@ -251,6 +253,14 @@ void 
ScannerContext::return_free_block(vectorized::BlockUPtr block) {
     }
 }
 
+void ScannerContext::clear_free_blocks() {
+    vectorized::BlockUPtr block;
+    while (_free_blocks.try_dequeue(block)) {
+        // do nothing
+    }
+    block.reset();
+}
+
 Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
     _scanner_sched_counter->update(1);
     _num_scheduled_scanners++;
@@ -321,10 +331,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             update_peak_memory_usage(-current_block->allocated_bytes());
             // consume current block
             block->swap(*current_block);
-            // If under low memory mode, should not return the freeblock, it 
will occupy too memory.
-            if (!_local_state->low_memory_mode()) {
-                return_free_block(std::move(current_block));
-            }
+            return_free_block(std::move(current_block));
         } else {
             // This scan task do not have any cached blocks.
             _tasks_queue.pop_front();
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index b81834fd7b7..da04a764a55 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -123,6 +123,7 @@ public:
 
     vectorized::BlockUPtr get_free_block(bool force);
     void return_free_block(vectorized::BlockUPtr block);
+    void clear_free_blocks();
     inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
 
     int64_t block_memory_usage() { return _block_memory_usage; }
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 0dae893b5c6..65883355130 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -277,6 +277,7 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             bool first_read = true;
             // If the first block is full, then it is true. Or the first block 
+ second block > batch_size
             bool has_first_full_block = false;
+            size_t block_avg_bytes = ctx->batch_size();
 
             // During low memory mode, every scan task will return at most 2 
block to reduce memory usage.
             while (!eos && raw_bytes_read < raw_bytes_threshold &&
@@ -291,7 +292,25 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     config::doris_scanner_max_run_time_ms * 1e6) {
                     break;
                 }
-                BlockUPtr free_block = ctx->get_free_block(first_read);
+                DEFER_RELEASE_RESERVED();
+                BlockUPtr free_block;
+                if (first_read) {
+                    free_block = ctx->get_free_block(first_read);
+                } else {
+                    if (state->enable_reserve_memory()) {
+                        status = 
thread_context()->try_reserve_memory(block_avg_bytes);
+                        if (!status.ok()) {
+                            LOG(INFO) << "query: " << 
print_id(state->query_id())
+                                      << ", scanner try to reserve: "
+                                      << PrettyPrinter::print(block_avg_bytes, 
TUnit::BYTES)
+                                      << ", failed: " << status.to_string() << 
", debug info: "
+                                      << 
GlobalMemoryArbitrator::process_mem_log_str();
+                            ctx->clear_free_blocks();
+                            break;
+                        }
+                    }
+                    free_block = ctx->get_free_block(first_read);
+                }
                 if (free_block == nullptr) {
                     break;
                 }
@@ -338,6 +357,12 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     ctx->inc_block_usage(free_block->allocated_bytes());
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
+                if (scan_task->cached_blocks.back().first->rows() > 0) {
+                    block_avg_bytes = 
(scan_task->cached_blocks.back().first->allocated_bytes() +
+                                       
scan_task->cached_blocks.back().first->rows() - 1) /
+                                      
scan_task->cached_blocks.back().first->rows() *
+                                      ctx->batch_size();
+                }
             } // end for while
 
             if (UNLIKELY(!status.ok())) {
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index a27916a87a3..fafebb4e62a 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -116,7 +116,9 @@ Status SpillStream::prepare() {
 }
 
 SpillReaderUPtr SpillStream::create_separate_reader() const {
-    return std::make_unique<SpillReader>(stream_id_, writer_->get_file_path());
+    return std::make_unique<SpillReader>(
+            
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
+            writer_->get_file_path());
 }
 
 const TUniqueId& SpillStream::query_id() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 52e78618c05..4024fa51ab5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -553,9 +553,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String EXTERNAL_AGG_PARTITION_BITS = 
"external_agg_partition_bits";
     public static final String SPILL_STREAMING_AGG_MEM_LIMIT = 
"spill_streaming_agg_mem_limit";
     public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
-    public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
-    public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
-    public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
+    public static final String ENABLE_SPILL = "enable_spill";
+    public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
     public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
     public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
 
@@ -2133,28 +2132,20 @@ public class SessionVariable implements Serializable, 
Writable {
     public int maxFetchRemoteTabletCount = 512;
 
     @VariableMgr.VarAttr(
-            name = ENABLE_JOIN_SPILL,
-            description = {"控制是否启用join算子落盘。默认为 false。",
-                    "Controls whether to enable spill to disk of join 
operation. "
-                            + "The default value is false."},
-            needForward = true, fuzzy = true)
-    public boolean enableJoinSpill = false;
-
-    @VariableMgr.VarAttr(
-            name = ENABLE_SORT_SPILL,
-            description = {"控制是否启用排序算子落盘。默认为 false。",
-                    "Controls whether to enable spill to disk of sort 
operation. "
-                            + "The default value is false."},
+            name = ENABLE_RESERVE_MEMORY,
+            description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。",
+                    "Controls whether to enable reserve memory before 
allocating memory. "
+                            + "The default value is true."},
             needForward = true, fuzzy = true)
-    public boolean enableSortSpill = false;
+    public boolean enableReserveMemory = true;
 
     @VariableMgr.VarAttr(
-            name = ENABLE_AGG_SPILL,
-            description = {"控制是否启用聚合算子落盘。默认为 false。",
-                    "Controls whether to enable spill to disk of aggregation 
operation. "
+            name = ENABLE_SPILL,
+            description = {"控制是否启用查询算子落盘。默认为 false。",
+                    "Controls whether to enable spill to disk for query. "
                             + "The default value is false."},
             needForward = true, fuzzy = true)
-    public boolean enableAggSpill = false;
+    public boolean enableSpill = false;
 
     @VariableMgr.VarAttr(
             name = ENABLE_FORCE_SPILL,
@@ -2287,18 +2278,6 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableESParallelScroll;
     }
 
-    public boolean isEnableJoinSpill() {
-        return enableJoinSpill;
-    }
-
-    public void setEnableJoinSpill(boolean enableJoinSpill) {
-        this.enableJoinSpill = enableJoinSpill;
-    }
-
-    public boolean isEnableSortSpill() {
-        return enableSortSpill;
-    }
-
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     @SuppressWarnings("checkstyle:Indentation")
@@ -2428,10 +2407,8 @@ public class SessionVariable implements Serializable, 
Writable {
         // for spill to disk
         if (Config.pull_request_id > 10000) {
             if (Config.pull_request_id % 2 == 0) {
-                this.enableJoinSpill = true;
-                this.enableSortSpill = true;
-                this.enableAggSpill = true;
-
+                this.enableSpill = true;
+                this.enableReserveMemory = true;
                 randomInt = random.nextInt(4);
                 switch (randomInt) {
                     case 0:
@@ -2448,9 +2425,8 @@ public class SessionVariable implements Serializable, 
Writable {
                         break;
                 }
             } else {
-                this.enableJoinSpill = false;
-                this.enableSortSpill = false;
-                this.enableAggSpill = false;
+                this.enableSpill = false;
+                this.enableReserveMemory = false;
             }
         }
     }
@@ -3863,9 +3839,8 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
         tResult.setSkipBadTablet(skipBadTablet);
         tResult.setDisableFileCache(disableFileCache);
-        tResult.setEnableJoinSpill(enableJoinSpill);
-        tResult.setEnableSortSpill(enableSortSpill);
-        tResult.setEnableAggSpill(enableAggSpill);
+        tResult.setEnableReserveMemory(enableReserveMemory);
+        tResult.setEnableSpill(enableSpill);
         tResult.setEnableForceSpill(enableForceSpill);
         tResult.setExternalAggPartitionBits(externalAggPartitionBits);
         tResult.setMinRevocableMem(minRevocableMem);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 37c7da3d702..f6350b7fb32 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -281,10 +281,13 @@ struct TQueryOptions {
 
   100: optional bool enable_distinct_streaming_aggregation = true;
 
+  // deprecated
   101: optional bool enable_join_spill = false
 
+  // deprecated
   102: optional bool enable_sort_spill = false
 
+  // deprecated
   103: optional bool enable_agg_spill = false
 
   104: optional i64 min_revocable_mem = 0
@@ -356,6 +359,11 @@ struct TQueryOptions {
   139: optional i32 query_slot_count = 0;
 
   140: optional bool enable_auto_create_when_overwrite = false;
+
+  141: optional bool enable_spill = false
+
+  142: optional bool enable_reserve_memory = true
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.
diff --git 
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
 
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
index 2a599c1cc1a..eaf8c7e996b 100644
--- 
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
+++ 
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
@@ -65,9 +65,9 @@ suite("test_nested_type_with_resize") {
         }
     }
 
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
 
 }
diff --git 
a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy 
b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
index 0d0415456f7..6e43620d449 100644
--- a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
+++ b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
@@ -573,7 +573,7 @@ suite("variant_mv") {
     where g2.actor['id'] > 34259289;
     """
     def query3_6 = """
-    SELECT  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
 [...]
+    SELECT  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
 [...]
     g1.id,
     g2.type,
     floor(cast(g1.actor['id'] as int) + 100.5),
diff --git a/regression-test/suites/query_p0/limit/test_group_by_limit.groovy 
b/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
index 271619c4a93..15844ff7f71 100644
--- a/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
+++ b/regression-test/suites/query_p0/limit/test_group_by_limit.groovy
@@ -17,7 +17,7 @@
 
 suite("test_group_by_limit", "query") {
 
-sql 'set enable_agg_spill=false'
+sql 'set enable_spill=false'
 
 sql 'set enable_force_spill=false'
 
diff --git a/regression-test/suites/spill_p0/aggregate_spill.groovy 
b/regression-test/suites/spill_p0/aggregate_spill.groovy
index 180ab37200f..9a0df219bf9 100644
--- a/regression-test/suites/spill_p0/aggregate_spill.groovy
+++ b/regression-test/suites/spill_p0/aggregate_spill.groovy
@@ -17,7 +17,7 @@
 
 suite("aggregate_spill") {
     sql """
-        set enable_agg_spill = true;
+        set enable_spill = true;
     """
     sql """
         set enable_force_spill = true;
diff --git a/regression-test/suites/variant_p0/load.groovy 
b/regression-test/suites/variant_p0/load.groovy
index e79a58f8a7a..c18134a1dd0 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -290,7 +290,7 @@ suite("regression_test_variant", "p0"){
         sql """insert into ${table_name} values (5, '{"i" : 1}'), (1, '{"a" : 
1}')"""
         sql """insert into ${table_name} values (6, '{"j" : 1}'), (1, '{"a" : 
1}')"""
         sql """insert into ${table_name} values (6, '{"k" : 1}'), (1, '{"a" : 
1}')"""
-        sql "select 
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
 [...]
+        sql "select 
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
 [...]
         qt_sql_36_1 "select cast(v['a'] as int), cast(v['b'] as int), 
cast(v['c'] as int) from ${table_name} order by k limit 10"
         sql "DELETE FROM ${table_name} WHERE k=1"
         sql "select * from ${table_name}"
diff --git a/regression-test/suites/variant_p0/nested.groovy 
b/regression-test/suites/variant_p0/nested.groovy
index 90728df2532..ae4988d8a61 100644
--- a/regression-test/suites/variant_p0/nested.groovy
+++ b/regression-test/suites/variant_p0/nested.groovy
@@ -133,7 +133,7 @@ suite("regression_test_variant_nested", "p0"){
 
         qt_sql """select  
/*+SET_VAR(batch_size=1024,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_
 
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16
-,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
 [...]
+,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
 [...]
         qt_sql """select * from var_nested where v['k2'] = 'some'  and 
array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order 
by k limit 1;"""
 
         sql """INSERT INTO var_nested SELECT *, '{"k1":1, "k2": "some", "k3" : 
[1234], "k4" : 1.10000, "k5" : [[123]], "nested1" : {"nested2" : [{"a" : 10, 
"b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "4096") where number > 
1024 limit 1024;"""
@@ -162,7 +162,7 @@ 
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
                 properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "enable_unique_key_merge_on_write" = "true", 
"variant_enable_flatten_nested" = "true");
             """
         sql """insert into var_nested2 select * from var_nested order by k 
limit 1024"""
-        qt_sql """select  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
 [...]
+        qt_sql """select  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
 [...]
         qt_sql """select v['nested'] from var_nested2 where k < 10 order by k 
limit 10;"""
         // explode variant array
         order_qt_explode_sql """select count(),cast(vv['xx'] as int) from 
var_nested lateral view explode_variant_array(v['nested']) tmp as vv where 
vv['xx'] = 10 group by cast(vv['xx'] as int)"""
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy 
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index f09f4713ad2..4226de14d81 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -139,7 +139,7 @@ suite("variant_sub_path_pruning", "variant_type"){
 
     // two children
     order_qt_sql """
-        select  
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
 [...]
+        select  
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
 [...]
         """
     order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test 
union all select dt as c1 from pruning_test) v1;"""
     order_qt_sql """select c1['b'] from (select dt['a'] as c1 from 
pruning_test union all select dt['a'] as c1 from pruning_test) v1;"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to