This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 92ee09e0fd5 fix
92ee09e0fd5 is described below
commit 92ee09e0fd5f97ae1f8b757bf7bb6176ef71e6b0
Author: jacktengg <[email protected]>
AuthorDate: Mon Dec 16 16:02:03 2024 +0800
fix
---
.../exec/partitioned_hash_join_sink_operator.cpp | 34 ++++++++++++++++----
be/src/pipeline/pipeline_task.cpp | 37 ----------------------
be/src/runtime/runtime_state.h | 7 ++++
.../java/org/apache/doris/qe/SessionVariable.java | 5 +++
gensrc/thrift/PaloInternalService.thrift | 1 +
5 files changed, 41 insertions(+), 43 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 672eb36a907..dad82b6cc8a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -260,7 +260,22 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
}
- return Status::OK();
+ Status status;
+ if (_child_eos) {
+ std::for_each(_shared_state->partitioned_build_blocks.begin(),
+ _shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
+ if (block) {
+ COUNTER_UPDATE(_in_mem_rows_counter,
block->rows());
+ }
+ });
+ status = _finish_spilling();
+ VLOG_DEBUG << fmt::format(
+ "Query: {}, task {}, sink {} _revoke_unpartitioned_block
set_ready_to_read",
+ print_id(state->query_id()), state->task_id(),
_parent->node_id());
+ _dependency->set_ready_to_read();
+ }
+
+ return status;
};
auto exception_catch_func = [spill_func]() mutable {
@@ -547,12 +562,19 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
return revoke_memory(state, nullptr);
} else {
const auto revocable_size = revocable_mem_size(state);
- if (revocable_size >=
config::revocable_memory_bytes_high_watermark) {
- LOG(INFO) << fmt::format(
- "Query: {}, sink name: {}, node id: {}, task id:
{}, "
- "revoke_memory "
+ // TODO: consider parallel?
+ // After building hash table it will not be able to spill later
+ // even if memory is low, and will cause cancel of queries.
+ // So make a check here, if build blocks mem usage is too high,
+ // then trigger revoke memory.
+ auto query_mem_limit = state->get_query_ctx()->mem_limit();
+ if (revocable_size >= (double)query_mem_limit / 100.0 *
+
state->revocable_memory_high_watermark_percent()) {
+ VLOG_DEBUG << fmt::format(
+ "Query: {}, task {}, sink {}, query mem limit: {},
revoke_memory "
"because revocable memory is high: {}",
- print_id(state->query_id()), get_name(),
node_id(), state->task_id(),
+ print_id(state->query_id()), state->task_id(),
node_id(),
+ PrettyPrinter::print_bytes(query_mem_limit),
PrettyPrinter::print_bytes(revocable_size));
return revoke_memory(state, nullptr);
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index cd822ef15e2..d4ed0790942 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -494,43 +494,6 @@ Status PipelineTask::execute(bool* eos) {
*eos = false;
continue;
}
- if (workload_group) {
- bool is_low_watermark = false;
- bool is_high_watermark = false;
- workload_group->check_mem_used(&is_low_watermark,
&is_high_watermark);
- // for hash join build sink, if it's eos at this reserve,
it will build hash table and
- // it will not be able to spill later even if memory is
low, and will cause cancel of queries.
- // So make a check here, if it's low watermark after
reserve and if reserved memory is too many,
- // then trigger revoke memory.
-
- // debug
- if (sink_reserve_size > 64 * 1024 * 1024) {
- LOG(INFO) << fmt::format(
- "Query: {}, sink name: {}, node id: {}, task
id: {}, "
- "is_low_watermark: {}, sink_reserve_size: {},
wg mem limit: {}, "
- "reserve/wg_limit: {}",
- print_id(query_id), _sink->get_name(),
_sink->node_id(),
- _state->task_id(), is_low_watermark,
- PrettyPrinter::print_bytes(sink_reserve_size),
-
PrettyPrinter::print_bytes(workload_group->memory_limit()),
- ((double)sink_reserve_size) /
workload_group->memory_limit());
- }
- if (is_low_watermark) {
- const auto revocable_size =
_sink->revocable_mem_size(_state);
- if (revocable_size >=
config::revocable_memory_bytes_high_watermark) {
- LOG(INFO) << fmt::format(
- "Query: {}, sink name: {}, node id: {},
task id: {}, "
- "sink_reserve_size: {}, revoke_memory "
- "because revocable memory is high: {}",
- print_id(query_id), _sink->get_name(),
_sink->node_id(),
- _state->task_id(),
-
PrettyPrinter::print_bytes(sink_reserve_size),
-
PrettyPrinter::print_bytes(revocable_size));
- RETURN_IF_ERROR(_sink->revoke_memory(_state,
nullptr));
- continue;
- }
- }
- }
}
// Define a lambda function to catch sink exception, because sink
will check
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 4da3384cb60..7318c93f15a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -597,6 +597,13 @@ public:
return 1;
}
+ int revocable_memory_high_watermark_percent() const {
+ if (_query_options.__isset.revocable_memory_high_watermark_percent) {
+ return _query_options.revocable_memory_high_watermark_percent;
+ }
+ return 10;
+ }
+
size_t minimum_operator_memory_required_bytes() const {
if (_query_options.__isset.minimum_operator_memory_required_kb) {
return _query_options.minimum_operator_memory_required_kb * 1024;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index af379bc4135..16b214b1536 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -566,6 +566,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SPILL_STREAMING_AGG_MEM_LIMIT =
"spill_streaming_agg_mem_limit";
public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
public static final String ENABLE_SPILL = "enable_spill";
+ public static final String REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT =
"revocable_memory_high_watermark_percent";
public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
@@ -2243,6 +2244,9 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true, fuzzy = true)
public long dataQueueMaxBlocks = 1;
+ @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy
= true)
+ public int revocableMemoryHighWatermarkPercent = 10;
+
// If the memory consumption of sort node exceed this limit, will trigger
spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -3957,6 +3961,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableForceSpill(enableForceSpill);
tResult.setExternalAggPartitionBits(externalAggPartitionBits);
tResult.setMinRevocableMem(minRevocableMem);
+
tResult.setRevocableMemoryHighWatermarkPercent(revocableMemoryHighWatermarkPercent);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
tResult.setEnableLocalMergeSort(enableLocalMergeSort);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index d3a2e4521f9..7ebe16583d8 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -367,6 +367,7 @@ struct TQueryOptions {
144: optional i32 query_slot_count = 0;
145: optional bool enable_spill = false
146: optional bool enable_reserve_memory = true
+ 147: optional i32 revocable_memory_high_watermark_percent = 10
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]