This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new bb17194077b improve spill FE variables (#45934)
bb17194077b is described below
commit bb17194077bdd00ccbc59862b97781f3bab3b466
Author: TengJianPing <[email protected]>
AuthorDate: Wed Dec 25 16:46:33 2024 +0800
improve spill FE variables (#45934)
---
be/src/pipeline/dependency.cpp | 5 +-
be/src/pipeline/dependency.h | 14 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 6 +-
.../exec/partitioned_aggregation_sink_operator.h | 2 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 30 ++---
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 5 +-
.../pipeline/exec/spill_sort_source_operator.cpp | 17 +--
be/src/pipeline/exec/spill_sort_source_operator.h | 3 +-
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/runtime/runtime_state.h | 51 ++++---
.../java/org/apache/doris/qe/SessionVariable.java | 146 +++++++++------------
gensrc/thrift/PaloInternalService.thrift | 10 +-
.../query/test_nested_type_with_resize.groovy | 8 +-
.../nereids_rules_p0/mv/variant/variant_mv.groovy | 2 +-
regression-test/suites/variant_p0/load.groovy | 2 +-
regression-test/suites/variant_p0/nested.groovy | 4 +-
.../suites/variant_p0/test_sub_path_pruning.groovy | 2 +-
17 files changed, 151 insertions(+), 158 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index a7198a97da4..aee19ff58df 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -308,9 +308,8 @@ Status AggSharedState::reset_hash_table() {
agg_data->method_variant);
}
-void PartitionedAggSharedState::init_spill_params(size_t
spill_partition_count_bits) {
- partition_count_bits = spill_partition_count_bits;
- partition_count = (1 << spill_partition_count_bits);
+void PartitionedAggSharedState::init_spill_params(size_t
spill_partition_count) {
+ partition_count = spill_partition_count;
max_partition_index = partition_count - 1;
for (int i = 0; i < partition_count; ++i) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 0f1539cadf2..13f983db3dd 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -453,24 +453,20 @@ struct PartitionedAggSharedState : public
BasicSharedState,
void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
- void init_spill_params(size_t spill_partition_count_bits);
+ void init_spill_params(size_t spill_partition_count);
void close();
AggSharedState* in_mem_shared_state = nullptr;
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
- size_t partition_count_bits;
size_t partition_count;
size_t max_partition_index;
bool is_spilled = false;
std::atomic_bool is_closed = false;
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
- size_t get_partition_index(size_t hash_value) const {
- // return (hash_value >> (32 - partition_count_bits)) &
max_partition_index;
- return hash_value % partition_count;
- }
+ size_t get_partition_index(size_t hash_value) const { return hash_value %
partition_count; }
};
struct AggSpillPartition {
@@ -523,14 +519,12 @@ struct SpillSortSharedState : public BasicSharedState,
SpillSortSharedState() = default;
~SpillSortSharedState() override = default;
- // This number specifies the maximum size of sub blocks
- static constexpr size_t SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
- void update_spill_block_batch_row_count(const vectorized::Block* block) {
+ void update_spill_block_batch_row_count(RuntimeState* state, const
vectorized::Block* block) {
auto rows = block->rows();
if (rows > 0 && 0 == avg_row_bytes) {
avg_row_bytes = std::max((std::size_t)1, block->bytes() / rows);
spill_block_batch_row_count =
- (SORT_BLOCK_SPILL_BATCH_BYTES + avg_row_bytes - 1) /
avg_row_bytes;
+ (state->spill_sort_batch_bytes() + avg_row_bytes - 1) /
avg_row_bytes;
LOG(INFO) << "spill sort block batch row count: " <<
spill_block_batch_row_count;
}
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 58b272b3ac8..8cc6ae58a4f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -47,7 +47,7 @@ Status
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
_init_counters();
auto& parent = Base::_parent->template cast<Parent>();
- Base::_shared_state->init_spill_params(parent._spill_partition_count_bits);
+ Base::_shared_state->init_spill_params(parent._spill_partition_count);
RETURN_IF_ERROR(setup_in_memory_agg_op(state));
@@ -155,9 +155,7 @@
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int o
Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode,
state));
_name = "PARTITIONED_AGGREGATION_SINK_OPERATOR";
- if (state->query_options().__isset.external_agg_partition_bits) {
- _spill_partition_count_bits =
state->query_options().external_agg_partition_bits;
- }
+ _spill_partition_count = state->spill_aggregation_partition_count();
_agg_sink_operator->set_dests_id(DataSinkOperatorX<PartitionedAggSinkLocalState>::dests_id());
RETURN_IF_ERROR(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 499db4919e7..dae3ee4f4b3 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -337,6 +337,6 @@ private:
friend class PartitionedAggSinkLocalState;
std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
- size_t _spill_partition_count_bits = 5;
+ size_t _spill_partition_count = 32;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 95675004c70..2e2c38f04c3 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -534,6 +534,19 @@ Status
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
return Status::OK();
}
+// After building hash table it will not be able to spill later
+// even if memory is low, and will cause cancel of queries.
+// So make a check here, if build blocks mem usage is too high,
+// then trigger revoke memory.
+static bool is_revocable_mem_high_watermark(RuntimeState* state, size_t
revocable_size,
+ int64_t query_mem_limit) {
+ auto revocable_memory_high_watermark_percent =
+ state->spill_revocable_memory_high_watermark_percent();
+ return revocable_memory_high_watermark_percent > 0 &&
+ revocable_size >=
+ (double)query_mem_limit / 100.0 *
revocable_memory_high_watermark_percent;
+}
+
Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
@@ -575,16 +588,7 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
"sink_eos failed");
});
- // TODO: consider parallel?
- // After building hash table it will not be able to spill later
- // even if memory is low, and will cause cancel of queries.
- // So make a check here, if build blocks mem usage is too high,
- // then trigger revoke memory.
- auto revocable_memory_high_watermark_percent =
- state->revocable_memory_high_watermark_percent();
- if (revocable_memory_high_watermark_percent > 0 &&
- revocable_size >= (double)query_mem_limit / 100.0 *
-
revocable_memory_high_watermark_percent) {
+ if (is_revocable_mem_high_watermark(state, revocable_size,
query_mem_limit)) {
LOG(INFO) << fmt::format(
"Query: {}, task {}, hash join sink {} eos,
revoke_memory "
"because revocable memory is high",
@@ -636,11 +640,7 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
});
if (eos) {
- auto revocable_memory_high_watermark_percent =
- state->revocable_memory_high_watermark_percent();
- if (revocable_memory_high_watermark_percent > 0 &&
- revocable_size >=
- (double)query_mem_limit / 100.0 *
revocable_memory_high_watermark_percent) {
+ if (is_revocable_mem_high_watermark(state, revocable_size,
query_mem_limit)) {
LOG(INFO) << fmt::format(
"Query: {}, task {}, hash join sink {} eos,
revoke_memory "
"because revocable memory is high",
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 2fa0c0ce8e1..03c4072f7de 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -155,7 +155,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0) {
-
local_state._shared_state->update_spill_block_batch_row_count(in_block);
+ local_state._shared_state->update_spill_block_batch_row_count(state,
in_block);
}
local_state._eos = eos;
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink",
@@ -201,8 +201,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
auto status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, _spilling_stream, print_id(state->query_id()), "sort",
_parent->node_id(),
- _shared_state->spill_block_batch_row_count,
- SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
+ _shared_state->spill_block_batch_row_count,
state->spill_sort_batch_bytes(), profile());
RETURN_IF_ERROR(status);
_shared_state->sorted_streams.emplace_back(_spilling_stream);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 3464ecd847f..8a58d0b1504 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -29,11 +29,8 @@
namespace doris::pipeline {
SpillSortLocalState::SpillSortLocalState(RuntimeState* state, OperatorXBase*
parent)
- : Base(state, parent) {
- if (state->external_sort_bytes_threshold() > 0) {
- _external_sort_bytes_threshold =
state->external_sort_bytes_threshold();
- }
-}
+ : Base(state, parent) {}
+
Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
init_spill_write_counters();
@@ -67,8 +64,8 @@ Status SpillSortLocalState::close(RuntimeState* state) {
dec_running_big_mem_op_num(state);
return Base::close(state);
}
-int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
- int count = _external_sort_bytes_threshold /
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES;
+int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state)
const {
+ int count = state->spill_sort_mem_limit() /
state->spill_sort_batch_bytes();
return std::max(2, count);
}
Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState*
state) {
@@ -101,7 +98,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
while (!state->is_cancelled()) {
- int max_stream_count = _calc_spill_blocks_to_merge();
+ int max_stream_count = _calc_spill_blocks_to_merge(state);
VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " <<
_parent->node_id()
<< " merge spill streams, streams count: "
<< _shared_state->sorted_streams.size()
@@ -122,8 +119,8 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
{
status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, tmp_stream, print_id(state->query_id()),
"sort", _parent->node_id(),
- _shared_state->spill_block_batch_row_count,
- SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES,
profile());
+ _shared_state->spill_block_batch_row_count,
state->spill_sort_batch_bytes(),
+ profile());
RETURN_IF_ERROR(status);
_shared_state->sorted_streams.emplace_back(tmp_stream);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index a7b8e8efde8..fae64e051f4 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -47,7 +47,7 @@ public:
Status initiate_merge_sort_spill_streams(RuntimeState* state);
protected:
- int _calc_spill_blocks_to_merge() const;
+ int _calc_spill_blocks_to_merge(RuntimeState* state) const;
Status _create_intermediate_merger(int num_blocks,
const vectorized::SortDescription&
sort_description);
friend class SpillSortSourceOperatorX;
@@ -55,7 +55,6 @@ protected:
bool _opened = false;
- int64_t _external_sort_bytes_threshold = 134217728; // 128M
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index f86fb491d71..9acd38af7b0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1365,7 +1365,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
auto tnode_ = tnode;
/// TODO: support rf in partitioned hash join
tnode_.runtime_filters.clear();
- const uint32_t partition_count = 32;
+ uint32_t partition_count =
_runtime_state->spill_hash_join_partition_count();
auto inner_probe_operator =
std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0,
descs);
auto inner_sink_operator =
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 879bd647d96..2220c1fc41e 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -504,13 +504,6 @@ public:
: 0;
}
- int64_t external_sort_bytes_threshold() const {
- if (_query_options.__isset.external_sort_bytes_threshold) {
- return _query_options.external_sort_bytes_threshold;
- }
- return 0;
- }
-
void set_be_exec_version(int32_t version) noexcept {
_query_options.be_exec_version = version; }
inline bool enable_delete_sub_pred_v2() const {
@@ -558,10 +551,6 @@ public:
std::shared_ptr<IRuntimeFilter>*
producer_filter);
bool is_nereids() const;
- bool enable_reserve_memory() const {
- return _query_options.__isset.enable_reserve_memory &&
_query_options.enable_reserve_memory;
- }
-
bool enable_spill() const {
return (_query_options.__isset.enable_force_spill &&
_query_options.enable_force_spill) ||
(_query_options.__isset.enable_spill &&
_query_options.enable_spill);
@@ -571,9 +560,8 @@ public:
return _query_options.__isset.enable_force_spill &&
_query_options.enable_force_spill;
}
- bool enable_local_merge_sort() const {
- return _query_options.__isset.enable_local_merge_sort &&
- _query_options.enable_local_merge_sort;
+ bool enable_reserve_memory() const {
+ return _query_options.__isset.enable_reserve_memory &&
_query_options.enable_reserve_memory;
}
int64_t min_revocable_mem() const {
@@ -583,13 +571,46 @@ public:
return 1;
}
- int revocable_memory_high_watermark_percent() const {
+ int64_t spill_sort_mem_limit() const {
+ if (_query_options.__isset.spill_sort_mem_limit) {
+ return std::max(_query_options.spill_sort_mem_limit,
(int64_t)16777216);
+ }
+ return 134217728;
+ }
+
+ int64_t spill_sort_batch_bytes() const {
+ if (_query_options.__isset.spill_sort_batch_bytes) {
+ return std::max(_query_options.spill_sort_batch_bytes,
(int64_t)8388608);
+ }
+ return 8388608;
+ }
+
+ int spill_aggregation_partition_count() const {
+ if (_query_options.__isset.spill_aggregation_partition_count) {
+ return
std::min(std::max(_query_options.spill_aggregation_partition_count, 16), 8192);
+ }
+ return 32;
+ }
+
+ int spill_hash_join_partition_count() const {
+ if (_query_options.__isset.spill_hash_join_partition_count) {
+ return
std::min(std::max(_query_options.spill_hash_join_partition_count, 16), 8192);
+ }
+ return 32;
+ }
+
+ int spill_revocable_memory_high_watermark_percent() const {
if (_query_options.__isset.revocable_memory_high_watermark_percent) {
return _query_options.revocable_memory_high_watermark_percent;
}
return -1;
}
+ bool enable_local_merge_sort() const {
+ return _query_options.__isset.enable_local_merge_sort &&
+ _query_options.enable_local_merge_sort;
+ }
+
size_t minimum_operator_memory_required_bytes() const {
if (_query_options.__isset.minimum_operator_memory_required_kb) {
return _query_options.minimum_operator_memory_required_kb * 1024;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8de6f379ecd..bab6a34528e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -397,8 +397,6 @@ public class SessionVariable implements Serializable,
Writable {
public static final String INTERNAL_SESSION = "internal_session";
- public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD =
"partitioned_hash_agg_rows_threshold";
-
public static final String PARTITION_PRUNING_EXPAND_THRESHOLD =
"partition_pruning_expand_threshold";
public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
@@ -560,14 +558,17 @@ public class SessionVariable implements Serializable,
Writable {
public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES =
"huge_table_lower_bound_size_in_bytes";
// for spill to disk
- public static final String EXTERNAL_SORT_BYTES_THRESHOLD =
"external_sort_bytes_threshold";
- public static final String EXTERNAL_AGG_PARTITION_BITS =
"external_agg_partition_bits";
- public static final String SPILL_STREAMING_AGG_MEM_LIMIT =
"spill_streaming_agg_mem_limit";
- public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
public static final String ENABLE_SPILL = "enable_spill";
- public static final String REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT =
"revocable_memory_high_watermark_percent";
- public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
+ public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
+ public static final String SPILL_MIN_REVOCABLE_MEM =
"spill_min_revocable_mem";
+ public static final String SPILL_SORT_MEM_LIMIT = "spill_sort_mem_limit";
+ // spill_sort_batch_bytes controls the memory size of a sindle block data
of spill sort.
+ public static final String SPILL_SORT_BATCH_BYTES =
"spill_sort_batch_bytes";
+ public static final String SPILL_AGGREGATION_PARTITION_COUNT =
"spill_aggregation_partition_count";
+ public static final String SPILL_STREAMING_AGG_MEM_LIMIT =
"spill_streaming_agg_mem_limit";
+ public static final String SPILL_HASH_JOIN_PARTITION_COUNT =
"spill_hash_join_partition_count";
+ public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT =
"spill_revocable_memory_high_watermark_percent";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
@@ -1586,10 +1587,6 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = INTERNAL_SESSION)
public boolean internalSession = false;
- // Use partitioned hash join if build side row count >= the threshold . 0
- the threshold is not set.
- @VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy =
true)
- public int partitionedHashAggRowsThreshold = 0;
-
@VariableMgr.VarAttr(name = PARTITION_PRUNING_EXPAND_THRESHOLD, fuzzy =
true)
public int partitionPruningExpandThreshold = 10;
@@ -2187,10 +2184,6 @@ public class SessionVariable implements Serializable,
Writable {
public boolean disableEmptyPartitionPrune = false;
// CLOUD_VARIABLES_END
- // for spill to disk
- @VariableMgr.VarAttr(name = MIN_REVOCABLE_MEM, fuzzy = true)
- public long minRevocableMem = 32 * 1024 * 1024;
-
// fetch remote schema rpc timeout
@VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy =
true)
public long fetchRemoteSchemaTimeoutSeconds = 120;
@@ -2198,14 +2191,6 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
public int maxFetchRemoteTabletCount = 512;
- @VariableMgr.VarAttr(
- name = ENABLE_RESERVE_MEMORY,
- description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。",
- "Controls whether to enable reserve memory before
allocating memory. "
- + "The default value is true."},
- needForward = true, fuzzy = true)
- public boolean enableReserveMemory = true;
-
@VariableMgr.VarAttr(
name = "ENABLE_COMPRESS_MATERIALIZE",
description = {"控制是否启用compress materialize。",
@@ -2215,6 +2200,14 @@ public class SessionVariable implements Serializable,
Writable {
)
public boolean enableCompressMaterialize = false;
+ @VariableMgr.VarAttr(
+ name = DATA_QUEUE_MAX_BLOCKS,
+ description = {"DataQueue 中每个子队列允许最大的 block 个数",
+ "Max blocks in DataQueue."},
+ needForward = true, fuzzy = true)
+ public long dataQueueMaxBlocks = 1;
+
+ // for spill to disk
@VariableMgr.VarAttr(
name = ENABLE_SPILL,
description = {"控制是否启用查询算子落盘。默认为 false。",
@@ -2233,32 +2226,39 @@ public class SessionVariable implements Serializable,
Writable {
public boolean enableForceSpill = false;
@VariableMgr.VarAttr(
- name = DATA_QUEUE_MAX_BLOCKS,
- description = {"DataQueue 中每个子队列允许最大的 block 个数",
- "Max blocks in DataQueue."},
+ name = ENABLE_RESERVE_MEMORY,
+ description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。",
+ "Controls whether to enable reserve memory before
allocating memory. "
+ + "The default value is true."},
needForward = true, fuzzy = true)
- public long dataQueueMaxBlocks = 1;
+ public boolean enableReserveMemory = true;
- @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy
= true)
- public int revocableMemoryHighWatermarkPercent = -1;
+ @VariableMgr.VarAttr(name = SPILL_MIN_REVOCABLE_MEM, fuzzy = true)
+ public long spillMinRevocableMem = 32 * 1024 * 1024;
- // If the memory consumption of sort node exceed this limit, will trigger
spill to disk;
- // Set to 0 to disable; min: 128M
- public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
- @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD,
- checker = "checkExternalSortBytesThreshold", varType =
VariableAnnotation.DEPRECATED)
- public long externalSortBytesThreshold = 0;
+ // spill_sort_mem_limit controls the memory usage during merge sort phase
of spill sort.
+ // During merge sort phase, mutiple sorted blocks will be read into memory
and do merge sort,
+ // the count of blocks should be controlled or else will cause OOM, it's
calculated as
+ // std::max(spill_sort_mem_limit / spill_sort_batch_bytes, 2)
+ @VariableMgr.VarAttr(name = SPILL_SORT_MEM_LIMIT)
+ public long spillSortMemLimit = 134217728; // 128M
+
+ @VariableMgr.VarAttr(name = SPILL_SORT_BATCH_BYTES)
+ public long spillSortBatchBytes = 8388608; // 8M
+
+ @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy =
true)
+ public int spillAggregationPartitionCount = 32;
// The memory limit of streaming agg when spilling is enabled
// NOTE: streaming agg operator will not spill to disk.
@VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true)
public long spillStreamingAggMemLimit = 268435456; //256MB
- public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
- public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20;
- @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
- checker = "checkExternalAggPartitionBits", fuzzy = true)
- public int externalAggPartitionBits = 5; // means that the hash table will
be partitioned into 32 blocks.
+ @VariableMgr.VarAttr(name = SPILL_HASH_JOIN_PARTITION_COUNT, fuzzy = true)
+ public int spillHashJoinPartitionCount = 32;
+
+ @VariableMgr.VarAttr(name = SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT,
fuzzy = true)
+ public int spillRevocableMemoryHighWatermarkPercent = -1;
@VariableMgr.VarAttr(name = USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS, needForward
= true, description = {
"在CTAS中,如果 CHAR / VARCHAR 列不来自于源表,是否是将这一列的长度设置为 MAX,即65533。默认为
true。",
@@ -2372,7 +2372,6 @@ public class SessionVariable implements Serializable,
Writable {
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
this.disableStreamPreaggregations = random.nextBoolean();
- this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 :
1048576;
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
// this.enableHashJoinEarlyStartProbe = random.nextBoolean();
this.enableParallelResultSink = random.nextBoolean();
@@ -2394,23 +2393,23 @@ public class SessionVariable implements Serializable,
Writable {
/*
switch (randomInt) {
case 0:
- this.externalSortBytesThreshold = 0;
+ this.spillSortBytesThreshold = 0;
this.externalAggBytesThreshold = 0;
break;
case 1:
- this.externalSortBytesThreshold = 1;
+ this.spillSortBytesThreshold = 1;
this.externalAggBytesThreshold = 1;
- this.externalAggPartitionBits = 6;
+ this.spillAggregationPartitionCount = 6;
break;
case 2:
- this.externalSortBytesThreshold = 1024 * 1024;
+ this.spillSortBytesThreshold = 1024 * 1024;
this.externalAggBytesThreshold = 1024 * 1024;
- this.externalAggPartitionBits = 8;
+ this.spillAggregationPartitionCount = 8;
break;
default:
- this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024;
+ this.spillSortBytesThreshold = 100 * 1024 * 1024 * 1024;
this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024;
- this.externalAggPartitionBits = 4;
+ this.spillAggregationPartitionCount = 4;
break;
}
*/
@@ -2493,16 +2492,16 @@ public class SessionVariable implements Serializable,
Writable {
randomInt = random.nextInt(4);
switch (randomInt) {
case 0:
- this.minRevocableMem = 0;
+ this.spillMinRevocableMem = 0;
break;
case 1:
- this.minRevocableMem = 1;
+ this.spillMinRevocableMem = 1;
break;
case 2:
- this.minRevocableMem = 1024 * 1024;
+ this.spillMinRevocableMem = 1024 * 1024;
break;
default:
- this.minRevocableMem = 100L * 1024 * 1024 * 1024;
+ this.spillMinRevocableMem = 100L * 1024 * 1024 * 1024;
break;
}
} else {
@@ -3659,24 +3658,6 @@ public class SessionVariable implements Serializable,
Writable {
return dropTableIfCtasFailed;
}
- public void checkExternalSortBytesThreshold(String
externalSortBytesThreshold) {
- long value = Long.valueOf(externalSortBytesThreshold);
- if (value > 0 && value < MIN_EXTERNAL_SORT_BYTES_THRESHOLD) {
- LOG.warn("external sort bytes threshold: {}, min: {}", value,
MIN_EXTERNAL_SORT_BYTES_THRESHOLD);
- throw new UnsupportedOperationException("minimum value is " +
MIN_EXTERNAL_SORT_BYTES_THRESHOLD);
- }
- }
-
- public void checkExternalAggPartitionBits(String externalAggPartitionBits)
{
- int value = Integer.valueOf(externalAggPartitionBits);
- if (value < MIN_EXTERNAL_AGG_PARTITION_BITS || value >
MAX_EXTERNAL_AGG_PARTITION_BITS) {
- LOG.warn("external agg bytes threshold: {}, min: {}, max: {}",
- value, MIN_EXTERNAL_AGG_PARTITION_BITS,
MAX_EXTERNAL_AGG_PARTITION_BITS);
- throw new UnsupportedOperationException("min value is " +
MIN_EXTERNAL_AGG_PARTITION_BITS + " max value is "
- + MAX_EXTERNAL_AGG_PARTITION_BITS);
- }
- }
-
public void checkQueryTimeoutValid(String newQueryTimeout) {
int value = Integer.valueOf(newQueryTimeout);
if (value <= 0) {
@@ -3896,14 +3877,6 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setSkipDeleteBitmap(skipDeleteBitmap);
-
tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold);
-
- tResult.setExternalSortBytesThreshold(externalSortBytesThreshold);
-
- tResult.setExternalAggBytesThreshold(0); // disable for now
-
- tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit);
-
tResult.setEnableFileCache(enableFileCache);
tResult.setEnablePageCache(enablePageCache);
@@ -3941,12 +3914,19 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setSkipBadTablet(skipBadTablet);
tResult.setDisableFileCache(disableFileCache);
- tResult.setEnableReserveMemory(enableReserveMemory);
+
+ // for spill
tResult.setEnableSpill(enableSpill);
tResult.setEnableForceSpill(enableForceSpill);
- tResult.setExternalAggPartitionBits(externalAggPartitionBits);
- tResult.setMinRevocableMem(minRevocableMem);
-
tResult.setRevocableMemoryHighWatermarkPercent(revocableMemoryHighWatermarkPercent);
+ tResult.setEnableReserveMemory(enableReserveMemory);
+ tResult.setMinRevocableMem(spillMinRevocableMem);
+ tResult.setSpillSortMemLimit(spillSortMemLimit);
+ tResult.setSpillSortBatchBytes(spillSortBatchBytes);
+
tResult.setSpillAggregationPartitionCount(spillAggregationPartitionCount);
+ tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit);
+ tResult.setSpillHashJoinPartitionCount(spillHashJoinPartitionCount);
+
tResult.setRevocableMemoryHighWatermarkPercent(spillRevocableMemoryHighWatermarkPercent);
+
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
tResult.setEnableLocalMergeSort(enableLocalMergeSort);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 8cf33a6218b..f99a88e55b6 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -196,9 +196,10 @@ struct TQueryOptions {
58: optional i32 repeat_max_num = 0 // Deprecated
+ // deprecated, use spill_sort_mem_limit
59: optional i64 external_sort_bytes_threshold = 0
- // deprecated
+ // Not used any more
60: optional i32 partitioned_hash_agg_rows_threshold = 0
61: optional bool enable_file_cache = false
@@ -214,9 +215,10 @@ struct TQueryOptions {
66: optional i32 parallel_instance = 1
// Indicate where useServerPrepStmts enabled
67: optional bool mysql_row_binary_format = false;
+ // Not used anymore
68: optional i64 external_agg_bytes_threshold = 0
- // partition count(1 << external_agg_partition_bits) when spill aggregation
data into disk
+ // Not used anymore, use spill_aggregation_partition_count
69: optional i32 external_agg_partition_bits = 4
// Specify base path for file cache
@@ -369,6 +371,10 @@ struct TQueryOptions {
145: optional bool enable_spill = false
146: optional bool enable_reserve_memory = true
147: optional i32 revocable_memory_high_watermark_percent = -1
+ 148: optional i64 spill_sort_mem_limit = 134217728
+ 149: optional i64 spill_sort_batch_bytes = 8388608
+ 150: optional i32 spill_aggregation_partition_count = 32
+ 151: optional i32 spill_hash_join_partition_count = 32
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
diff --git
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
index eaf8c7e996b..2918d2ffbcd 100644
---
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
+++
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
@@ -65,9 +65,9 @@ suite("test_nested_type_with_resize") {
}
}
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
- order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
+ order_qt_sql """ /*set
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
[...]
}
diff --git
a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
index 493873fd5c8..d81f01aeeaa 100644
--- a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
+++ b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
@@ -574,7 +574,7 @@ suite("variant_mv") {
where g2.actor['id'] > 34259289;
"""
def query3_6 = """
- SELECT
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
[...]
+ SELECT
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
[...]
g1.id,
g2.type,
floor(cast(g1.actor['id'] as int) + 100.5),
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index 5abc3346f4d..1b93ecc9747 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -290,7 +290,7 @@ suite("regression_test_variant", "p0"){
sql """insert into ${table_name} values (5, '{"i" : 1}'), (1, '{"a" :
1}')"""
sql """insert into ${table_name} values (6, '{"j" : 1}'), (1, '{"a" :
1}')"""
sql """insert into ${table_name} values (6, '{"k" : 1}'), (1, '{"a" :
1}')"""
- sql "select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
[...]
+ sql "select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
[...]
qt_sql_36_1 "select cast(v['a'] as int), cast(v['b'] as int),
cast(v['c'] as int) from ${table_name} order by k limit 10"
sql "DELETE FROM ${table_name} WHERE k=1"
sql "select * from ${table_name}"
diff --git a/regression-test/suites/variant_p0/nested.groovy
b/regression-test/suites/variant_p0/nested.groovy
index 584ba0e336d..6ae7ee2c3a7 100644
--- a/regression-test/suites/variant_p0/nested.groovy
+++ b/regression-test/suites/variant_p0/nested.groovy
@@ -133,7 +133,7 @@ suite("regression_test_variant_nested", "p0"){
qt_sql """select
/*+SET_VAR(batch_size=1024,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16
-,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
[...]
+,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
[...]
qt_sql """select * from var_nested where v['k2'] = 'some' and
array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order
by k limit 1;"""
sql """INSERT INTO var_nested SELECT *, '{"k1":1, "k2": "some", "k3" :
[1234], "k4" : 1.10000, "k5" : [[123]], "nested1" : {"nested2" : [{"a" : 10,
"b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "4096") where number >
1024 limit 1024;"""
@@ -162,7 +162,7 @@
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
properties("replication_num" = "1", "disable_auto_compaction"
= "false", "enable_unique_key_merge_on_write" = "true",
"variant_enable_flatten_nested" = "true");
"""
sql """insert into var_nested2 select * from var_nested order by k
limit 1024"""
- qt_sql """select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
[...]
+ qt_sql """select
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
[...]
qt_sql """select v['nested'] from var_nested2 where k < 10 order by k
limit 10;"""
// 0. nomal explode variant array
order_qt_explode_sql """select count(),cast(vv['xx'] as int) from
var_nested lateral view explode_variant_array(v['nested']) tmp as vv where
vv['xx'] = 10 group by cast(vv['xx'] as int)"""
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index 4226de14d81..4b9cc317318 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -139,7 +139,7 @@ suite("variant_sub_path_pruning", "variant_type"){
// two children
order_qt_sql """
- select
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
[...]
+ select
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
[...]
"""
order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test
union all select dt as c1 from pruning_test) v1;"""
order_qt_sql """select c1['b'] from (select dt['a'] as c1 from
pruning_test union all select dt['a'] as c1 from pruning_test) v1;"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]