This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 2d099802744 [fix] std::sort coredump and low mem mode buffer limit
(#46006)
2d099802744 is described below
commit 2d09980274434df222a25a7243302502e075776d
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Dec 26 14:54:10 2024 +0800
[fix] std::sort coredump and low mem mode buffer limit (#46006)
---
be/src/pipeline/dependency.h | 10 +++---
be/src/pipeline/exec/multi_cast_data_streamer.h | 6 +---
.../local_exchange_sink_operator.cpp | 2 +-
be/src/runtime/runtime_state.h | 8 +++++
be/src/vec/spill/spill_stream_manager.cpp | 40 +++++-----------------
.../java/org/apache/doris/qe/SessionVariable.java | 10 ++++--
gensrc/thrift/PaloInternalService.thrift | 2 ++
7 files changed, 35 insertions(+), 43 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 13f983db3dd..a023e5661e2 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -840,8 +840,9 @@ public:
}
}
- virtual void set_low_memory_mode() {
- _buffer_mem_limit =
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
+ virtual void set_low_memory_mode(RuntimeState* state) {
+ _buffer_mem_limit =
std::min<int64_t>(config::local_exchange_buffer_mem_limit,
+
state->low_memory_mode_buffer_limit());
}
};
@@ -888,8 +889,9 @@ struct LocalMergeExchangeSharedState : public
LocalExchangeSharedState {
source_deps[channel_id]->set_ready();
}
- void set_low_memory_mode() override {
- _buffer_mem_limit =
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
+ void set_low_memory_mode(RuntimeState* state) override {
+ _buffer_mem_limit =
std::min<int64_t>(config::local_exchange_buffer_mem_limit,
+
state->low_memory_mode_buffer_limit());
DCHECK(!_queues_mem_usage.empty());
_each_queue_limit =
std::max<int64_t>(64 * 1024, _buffer_mem_limit /
_queues_mem_usage.size());
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 0b925f7a5fe..4669a0389c7 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -81,11 +81,7 @@ public:
_process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
};
- ~MultiCastDataStreamer() {
- for (auto& item : _spill_readers) {
- DCHECK(item.empty());
- }
- }
+ ~MultiCastDataStreamer() = default;
Status pull(RuntimeState* state, int sender_idx, vectorized::Block* block,
bool* eos);
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 968812e594e..7f9228e246d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -146,7 +146,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (state->get_query_ctx()->low_memory_mode()) {
- local_state._shared_state->set_low_memory_mode();
+ local_state._shared_state->set_low_memory_mode(state);
local_state._exchanger->set_low_memory_mode();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 2220c1fc41e..a47c5d6f202 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -26,6 +26,7 @@
#include <stdint.h>
#include <atomic>
+#include <cstdint>
#include <fstream>
#include <functional>
#include <memory>
@@ -599,6 +600,13 @@ public:
return 32;
}
+ int64_t low_memory_mode_buffer_limit() const {
+ if (_query_options.__isset.low_memory_mode_buffer_limit) {
+ return std::max(_query_options.low_memory_mode_buffer_limit,
(int64_t)1);
+ }
+ return 32L * 1024 * 1024;
+ }
+
int spill_revocable_memory_high_watermark_percent() const {
if (_query_options.__isset.revocable_memory_high_watermark_percent) {
return _query_options.revocable_memory_high_watermark_percent;
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index b323e1ab2e8..07a947b5ef3 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -108,45 +108,23 @@ Status SpillStreamManager::_init_spill_store_map() {
std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
TStorageMedium::type storage_medium) {
- std::vector<SpillDataDir*> stores;
+ std::vector<std::pair<SpillDataDir*, double>> stores_with_usage;
for (auto& [_, store] : _spill_store_map) {
if (store->storage_medium() == storage_medium &&
!store->reach_capacity_limit(0)) {
- stores.push_back(store.get());
+ stores_with_usage.emplace_back(store.get(),
store->_get_disk_usage(0));
}
}
- if (stores.empty()) {
- return stores;
+ if (stores_with_usage.empty()) {
+ return {};
}
- std::sort(stores.begin(), stores.end(), [](SpillDataDir* a, SpillDataDir*
b) {
- return a->_get_disk_usage(0) < b->_get_disk_usage(0);
- });
+ std::sort(stores_with_usage.begin(), stores_with_usage.end(),
+ [](auto&& a, auto&& b) { return a.second < b.second; });
- size_t seventy_percent_index = stores.size();
- size_t eighty_five_percent_index = stores.size();
- for (size_t index = 0; index < stores.size(); index++) {
- // If the usage of the store is less than 70%, we choose disk randomly.
- if (stores[index]->_get_disk_usage(0) > 0.7 && seventy_percent_index
== stores.size()) {
- seventy_percent_index = index;
- }
- if (stores[index]->_get_disk_usage(0) > 0.85 &&
- eighty_five_percent_index == stores.size()) {
- eighty_five_percent_index = index;
- break;
- }
- }
-
- std::random_device rd;
- std::mt19937 g(rd());
- std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g);
- if (seventy_percent_index != stores.size()) {
- std::shuffle(stores.begin() + seventy_percent_index,
- stores.begin() + eighty_five_percent_index, g);
- }
- if (eighty_five_percent_index != stores.size()) {
- std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(),
g);
+ std::vector<SpillDataDir*> stores;
+ for (const auto& [store, _] : stores_with_usage) {
+ stores.emplace_back(store);
}
-
return stores;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index bab6a34528e..7ce2c922caf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -568,8 +568,10 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SPILL_AGGREGATION_PARTITION_COUNT =
"spill_aggregation_partition_count";
public static final String SPILL_STREAMING_AGG_MEM_LIMIT =
"spill_streaming_agg_mem_limit";
public static final String SPILL_HASH_JOIN_PARTITION_COUNT =
"spill_hash_join_partition_count";
- public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT =
"spill_revocable_memory_high_watermark_percent";
+ public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT =
+ "spill_revocable_memory_high_watermark_percent";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
+ public static final String LOW_MEMORY_MODE_BUFFER_LIMIT =
"low_memory_mode_buffer_limit";
public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
@@ -2249,6 +2251,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy =
true)
public int spillAggregationPartitionCount = 32;
+ @VariableMgr.VarAttr(name = LOW_MEMORY_MODE_BUFFER_LIMIT, fuzzy = false)
+ public long lowMemoryModeBufferLimit = 33554432;
+
// The memory limit of streaming agg when spilling is enabled
// NOTE: streaming agg operator will not spill to disk.
@VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true)
@@ -3914,7 +3919,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setSkipBadTablet(skipBadTablet);
tResult.setDisableFileCache(disableFileCache);
-
+
// for spill
tResult.setEnableSpill(enableSpill);
tResult.setEnableForceSpill(enableForceSpill);
@@ -3928,6 +3933,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setRevocableMemoryHighWatermarkPercent(spillRevocableMemoryHighWatermarkPercent);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+ tResult.setLowMemoryModeBufferLimit(lowMemoryModeBufferLimit);
tResult.setEnableLocalMergeSort(enableLocalMergeSort);
tResult.setEnableParallelResultSink(enableParallelResultSink);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index f99a88e55b6..52388284a73 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -371,10 +371,12 @@ struct TQueryOptions {
145: optional bool enable_spill = false
146: optional bool enable_reserve_memory = true
147: optional i32 revocable_memory_high_watermark_percent = -1
+
148: optional i64 spill_sort_mem_limit = 134217728
149: optional i64 spill_sort_batch_bytes = 8388608
150: optional i32 spill_aggregation_partition_count = 32
151: optional i32 spill_hash_join_partition_count = 32
+ 152: optional i64 low_memory_mode_buffer_limit = 33554432
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]