This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 3020eb00be6 [feat] reserve memory separately for sink and source
operators (#41706)
3020eb00be6 is described below
commit 3020eb00be6ad2e8766625813afd9a32dd430491
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Oct 11 15:59:38 2024 +0800
[feat] reserve memory separately for sink and source operators (#41706)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 6 +-
be/src/pipeline/exec/aggregation_sink_operator.h | 4 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_sink_operator.h | 2 +-
be/src/pipeline/exec/exchange_sink_buffer.h | 4 +-
be/src/pipeline/exec/exchange_source_operator.cpp | 4 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 80 ++++++++++++++++++----
be/src/pipeline/exec/hashjoin_build_sink.h | 4 +-
be/src/pipeline/exec/operator.h | 14 ++--
.../pipeline/exec/partition_sort_sink_operator.cpp | 2 +-
.../pipeline/exec/partition_sort_sink_operator.h | 2 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 4 +-
.../exec/partitioned_aggregation_sink_operator.h | 2 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 42 ++++++++++--
.../exec/partitioned_hash_join_probe_operator.h | 4 ++
.../exec/partitioned_hash_join_sink_operator.cpp | 8 +--
.../exec/partitioned_hash_join_sink_operator.h | 7 +-
be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +-
be/src/pipeline/exec/set_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 2 +-
be/src/pipeline/pipeline_task.cpp | 55 +++++++++++----
be/src/pipeline/pipeline_task.h | 4 +-
be/src/runtime/query_context.cpp | 10 +--
be/src/vec/common/hash_table/hash_map_context.h | 56 +++++++++++++++
be/src/vec/runtime/vdata_stream_recvr.cpp | 3 +-
26 files changed, 255 insertions(+), 72 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index bcbf83f6290..3dd43c3c4d8 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -718,7 +718,7 @@ Status AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs&
return Status::OK();
}
-size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const {
+size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos)
const {
size_t size_to_reserve = std::visit(
[&](auto&& arg) -> size_t {
using HashTableCtxType = std::decay_t<decltype(arg)>;
@@ -891,9 +891,9 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState*
state) {
return Status::OK();
}
-size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
auto& local_state = get_local_state(state);
- return local_state.get_reserve_mem_size(state);
+ return local_state.get_reserve_mem_size(state, eos);
}
Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 477df890b4f..8bf3a8493fc 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -97,7 +97,7 @@ protected:
Status _create_agg_status(vectorized::AggregateDataPtr data);
size_t _memory_usage() const;
- size_t get_reserve_mem_size(RuntimeState* state) const;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) const;
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
@@ -170,7 +170,7 @@ public:
Status reset_hash_table(RuntimeState* state);
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
using DataSinkOperatorX<AggSinkLocalState>::node_id;
using DataSinkOperatorX<AggSinkLocalState>::operator_id;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index e11ffe0c48a..63b93ad4a59 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -332,7 +332,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block
return Status::OK();
}
-size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool
eos) {
auto& local_state = get_local_state(state);
return local_state._reserve_mem_size;
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index dee7ee27c30..f5b53314450 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -95,7 +95,7 @@ public:
return !_partition_by_eq_expr_ctxs.empty() &&
_order_by_eq_expr_ctxs.empty();
}
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
private:
Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 23ba2220bec..60193b1d7e6 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -91,7 +91,7 @@ public:
void set_low_memory_mode() {
_total_queue_buffer_size_limit = 1024 * 1024;
- _total_queue_blocks_count_limit = 1;
+ _total_queue_blocks_count_limit = 8;
}
void acquire(BroadcastPBlockHolder& holder);
@@ -207,7 +207,7 @@ public:
_set_ready_to_finish(_busy_channels == 0);
}
- void set_low_memory_mode() { _queue_capacity = 1; }
+ void set_low_memory_mode() { _queue_capacity = 8; }
private:
friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index ff25a252cca..dd7b62bcadf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -17,6 +17,8 @@
#include "exchange_source_operator.h"
+#include <fmt/core.h>
+
#include <memory>
#include "pipeline/exec/operator.h"
@@ -71,7 +73,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
deps[i] = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- "SHUFFLE_DATA_DEPENDENCY");
+
fmt::format("SHUFFLE_DATA_DEPENDENCY_{}", i));
queues[i]->set_dependency(deps[i]);
metrics[i] =
_runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
TUnit ::TIME_NS,
timer_name, 1);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 7f8f2da35cb..4cdf2a35d9e 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -19,9 +19,11 @@
#include <string>
+#include "common/exception.h"
#include "exprs/bloom_filter_func.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/operator.h"
+#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/utils/template_helpers.hpp"
@@ -110,7 +112,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState*
state) {
return Status::OK();
}
-size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
+size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state,
bool eos) {
if (!_should_build_hash_table) {
return 0;
}
@@ -121,26 +123,74 @@ size_t
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
size_t size_to_reserve = 0;
- if (!_build_side_mutable_block.empty()) {
+ const size_t build_block_rows = _build_side_mutable_block.rows();
+ if (build_block_rows != 0) {
const auto bytes = _build_side_mutable_block.bytes();
const auto allocated_bytes =
_build_side_mutable_block.allocated_bytes();
- if (allocated_bytes != 0 && ((bytes * 100) / allocated_bytes) >= 85) {
- size_to_reserve += bytes;
+ const auto bytes_per_row = bytes / build_block_rows;
+ const auto estimated_size_of_next_block = bytes_per_row *
state->batch_size();
+
+ // If the new size is greater than 95% of allocalted bytes, it maybe
need to realloc.
+ if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes)
>= 95) {
+ size_to_reserve += bytes + estimated_size_of_next_block;
}
}
- const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
- size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
+ if (eos) {
+ const size_t rows = build_block_rows + state->batch_size();
+ size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
- size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first
- size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next
+ size_to_reserve += bucket_size * sizeof(uint32_t); //
JoinHashTable::first
+ size_to_reserve += rows * sizeof(uint32_t); //
JoinHashTable::next
- auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
- if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op ==
TJoinOp::RIGHT_OUTER_JOIN ||
- p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op ==
TJoinOp::RIGHT_SEMI_JOIN) {
- size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
+ auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
+ if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op ==
TJoinOp::RIGHT_OUTER_JOIN ||
+ p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op ==
TJoinOp::RIGHT_SEMI_JOIN) {
+ size_to_reserve += rows * sizeof(uint8_t); //
JoinHashTable::visited
+ }
+ size_to_reserve += _evaluate_mem_usage;
+
+ vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
+
+ if (build_block_rows > 0) {
+ auto block = _build_side_mutable_block.to_block();
+ Defer defer([&]() {
+ _build_side_mutable_block =
vectorized::MutableBlock(std::move(block));
+ });
+ vectorized::ColumnUInt8::MutablePtr null_map_val;
+ if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::FULL_OUTER_JOIN) {
+ _convert_block_to_null(block);
+ // first row is mocked
+ for (int i = 0; i < block.columns(); i++) {
+ auto [column, is_const] =
unpack_if_const(block.safe_get_by_position(i).column);
+
assert_cast<vectorized::ColumnNullable*>(column->assume_mutable().get())
+ ->get_null_map_column()
+ .get_data()
+ .data()[0] = 1;
+ }
+ }
+
+ null_map_val = vectorized::ColumnUInt8::create();
+ null_map_val->get_data().assign(build_block_rows, (uint8_t)0);
+
+ // Get the key column that needs to be built
+ Status st = _extract_join_column(block, null_map_val, raw_ptrs,
_build_col_ids);
+ if (!st.ok()) {
+ throw Exception(st);
+ }
+
+ std::visit(vectorized::Overload {[&](std::monostate& arg) {
+ LOG(FATAL) << "FATAL:
uninited hash table";
+ __builtin_unreachable();
+ },
+ [&](auto&& hash_map_context) {
+ size_to_reserve +=
hash_map_context.estimated_size(
+ raw_ptrs,
block.rows(), true, true,
+ bucket_size);
+ }},
+ *_shared_state->hash_table_variants);
+ }
}
- size_to_reserve += _evaluate_mem_usage;
return size_to_reserve;
}
@@ -695,9 +745,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
return Status::OK();
}
-size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state,
bool eos) {
auto& local_state = get_local_state(state);
- return local_state.get_reserve_mem_size(state);
+ return local_state.get_reserve_mem_size(state, eos);
}
size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const
{
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 0ced9a77207..b4a60fa362d 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -54,7 +54,7 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
- [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
+ [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
protected:
void _hash_table_init(RuntimeState* state);
@@ -124,7 +124,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
[[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 639a19d9aac..a6cc75541d3 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -109,11 +109,6 @@ public:
virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
- // If this method is not overwrite by child, its default value is 1MB
- [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
- return state->minimum_operator_memory_required_bytes();
- }
-
virtual Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>&
spill_context) {
return Status::OK();
@@ -480,6 +475,10 @@ public:
[[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) =
0;
+ [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state,
bool eos) {
+ return state->minimum_operator_memory_required_bytes();
+ }
+
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this))
@@ -747,6 +746,11 @@ public:
return Status::OK();
}
+ // If this method is not overwrite by child, its default value is 1MB
+ [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
+ return state->minimum_operator_memory_required_bytes();
+ }
+
virtual std::string debug_string(int indentation_level = 0) const;
virtual std::string debug_string(RuntimeState* state, int
indentation_level = 0) const;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index f62d5d13600..129743c3d7d 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -249,7 +249,7 @@ Status
PartitionSortSinkOperatorX::_split_block_by_partition(
return Status::OK();
}
-size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state,
bool eos) {
auto& local_state = get_local_state(state);
auto rows = state->batch_size();
size_t reserve_mem_size = std::visit(
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 5eccb6ac9ca..0f1f3cd1b2c 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -274,7 +274,7 @@ public:
return {ExchangeType::PASSTHROUGH};
}
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
private:
friend class PartitionSortSinkLocalState;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index f84a3636800..1a86fdb2a9d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -254,10 +254,10 @@ Status
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
return sink_local_state->open(state);
}
-size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state,
bool eos) {
auto& local_state = get_local_state(state);
auto* runtime_state = local_state._runtime_state.get();
- auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state);
+ auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state, eos);
COUNTER_SET(local_state._memory_usage_reserved, int64_t(size));
return size;
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1124ba48c35..9b70da54943 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -346,7 +346,7 @@ public:
Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context)
override;
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
private:
friend class PartitionedAggSinkLocalState;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bb65f098151..7b60c9a3e2f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -23,6 +23,7 @@
#include <algorithm>
#include <utility>
+#include "common/logging.h"
#include "common/status.h"
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
@@ -67,6 +68,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState*
state, LocalStateI
_spill_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", 1);
_recovery_probe_blocks = ADD_COUNTER(profile(),
"SpillRecoveryProbeBlocks", TUnit::UNIT);
_recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillRecoveryProbeTime", 1);
+ _get_child_next_timer = ADD_TIMER_WITH_LEVEL(profile(),
"GetChildNextTime", 1);
+
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved",
TUnit::UNIT, 1);
@@ -870,6 +873,35 @@ size_t
PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat
return mem_size;
}
+size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState*
state) {
+ auto& local_state = get_local_state(state);
+ const auto need_to_spill = local_state._shared_state->need_to_spill;
+ if (!need_to_spill || !local_state._child_eos) {
+ return Base::get_reserve_mem_size(state);
+ }
+
+ size_t size_to_reserve =
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+
+ if (local_state._need_to_setup_internal_operators) {
+ const size_t rows =
+ (local_state._recovered_build_block ?
local_state._recovered_build_block->rows()
+ : 0) +
+ state->batch_size();
+ size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
+
+ size_to_reserve += bucket_size * sizeof(uint32_t); //
JoinHashTable::first
+ size_to_reserve += rows * sizeof(uint32_t); //
JoinHashTable::next
+
+ if (_join_op == TJoinOp::FULL_OUTER_JOIN || _join_op ==
TJoinOp::RIGHT_OUTER_JOIN ||
+ _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op ==
TJoinOp::RIGHT_SEMI_JOIN) {
+ size_to_reserve += rows * sizeof(uint8_t); //
JoinHashTable::visited
+ }
+ }
+
+ COUNTER_SET(local_state._memory_usage_reserved, int64_t(size_to_reserve));
+ return size_to_reserve;
+}
+
Status PartitionedHashJoinProbeOperatorX::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& local_state = get_local_state(state);
@@ -925,7 +957,6 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
bool* eos) {
*eos = false;
auto& local_state = get_local_state(state);
- SCOPED_TIMER(local_state.exec_time_counter());
const auto need_to_spill = local_state._shared_state->need_to_spill;
#ifndef NDEBUG
Defer eos_check_defer([&] {
@@ -944,16 +975,19 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
});
if (need_more_input_data(state)) {
- RETURN_IF_ERROR(_child->get_block_after_projects(state,
local_state._child_block.get(),
-
&local_state._child_eos));
+ {
+ SCOPED_TIMER(local_state._get_child_next_timer);
+ RETURN_IF_ERROR(_child->get_block_after_projects(state,
local_state._child_block.get(),
+
&local_state._child_eos));
+ }
+ SCOPED_TIMER(local_state.exec_time_counter());
if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
return Status::OK();
}
Defer defer([&] { local_state._child_block->clear_column_data(); });
if (need_to_spill) {
- SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(push(state, local_state._child_block.get(),
local_state._child_eos));
if (_should_revoke_memory(state)) {
return _revoke_memory(state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 0ffa446f181..e66b730685b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -144,6 +144,8 @@ private:
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
+
+ RuntimeProfile::Counter* _get_child_next_timer = nullptr;
};
class PartitionedHashJoinProbeOperatorX final
@@ -183,6 +185,8 @@ public:
size_t revocable_mem_size(RuntimeState* state) const override;
+ size_t get_reserve_mem_size(RuntimeState* state) override;
+
void set_inner_operators(const
std::shared_ptr<HashJoinBuildSinkOperatorX>& sink_operator,
const std::shared_ptr<HashJoinProbeOperatorX>&
probe_operator) {
_inner_sink_operator = sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index b02c7ee0971..f7d38fe9d5d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -117,7 +117,7 @@ size_t
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
return mem_size;
}
-size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState*
state) {
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
size_t size_to_reserve = 0;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
if (_shared_state->need_to_spill) {
@@ -125,7 +125,7 @@ size_t
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
} else {
if (_shared_state->inner_runtime_state) {
size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
- _shared_state->inner_runtime_state.get());
+ _shared_state->inner_runtime_state.get(), eos);
}
}
@@ -680,9 +680,9 @@ Status PartitionedHashJoinSinkOperatorX::revoke_memory(
return local_state.revoke_memory(state, spill_context);
}
-size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState*
state) {
+size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState*
state, bool eos) {
auto& local_state = get_local_state(state);
- return local_state.get_reserve_mem_size(state);
+ return local_state.get_reserve_mem_size(state, eos);
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index d8cb67ca08d..d3725997882 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -25,6 +25,7 @@
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/join_build_sink_operator.h"
#include "pipeline/exec/spill_utils.h"
+#include "vec/core/block.h"
#include "vec/runtime/partitioner.h"
namespace doris {
@@ -45,7 +46,7 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
size_t revocable_mem_size(RuntimeState* state) const;
- [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
+ [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
protected:
PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
@@ -69,6 +70,8 @@ protected:
std::atomic<bool> _spill_status_ok {true};
std::mutex _spill_lock;
+ vectorized::Block _pending_block;
+
bool _child_eos {false};
Status _spill_status;
@@ -110,7 +113,7 @@ public:
Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context)
override;
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 7e62892f516..1db187ef307 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -196,7 +196,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
}
template <bool is_intersect>
-size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState*
state) {
+size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState*
state, bool eos) {
auto& local_state = get_local_state(state);
return local_state._estimate_memory_usage;
}
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 542f6652dd4..5ba248d9fda 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -102,7 +102,7 @@ public:
std::shared_ptr<BasicSharedState> create_shared_state() const override {
return nullptr; }
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index e15cecd22ed..df5d8d44f0a 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -207,7 +207,7 @@ Status SetSinkOperatorX<is_intersect>::init(const
TPlanNode& tnode, RuntimeState
}
template <bool is_intersect>
-size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState*
state) {
+size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState*
state, bool eos) {
auto& local_state = get_local_state(state);
size_t size_to_reserve = std::visit(
[&](auto&& arg) -> size_t {
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index be600ff6fd5..ed9aa1afead 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -95,7 +95,7 @@ public:
}
bool require_shuffled_data_distribution() const override { return true; }
- size_t get_reserve_mem_size(RuntimeState* state) override;
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
private:
template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index e3f223eb207..3b3845bc038 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -43,6 +43,7 @@
#include "util/mem_info.h"
#include "util/runtime_profile.h"
#include "util/uid_util.h"
+#include "vec/core/block.h"
#include "vec/spill/spill_stream.h"
namespace doris {
@@ -304,7 +305,7 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
- if (_eos) {
+ if (_eos && !_pending_block) {
// If task is waken up by finish dependency, `_eos` is set to true by
last execution, and we should return here.
return Status::OK();
}
@@ -362,10 +363,8 @@ Status PipelineTask::execute(bool* eos) {
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task
executing failed");
return status;
});
-
- DEFER_RELEASE_RESERVED();
// Every loop should check if memory is not enough.
- _state->get_query_ctx()->update_low_memory_mode();
+ // _state->get_query_ctx()->update_low_memory_mode();
// `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
@@ -374,13 +373,19 @@ Status PipelineTask::execute(bool* eos) {
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
_eos = true;
+ } else if (_pending_block) [[unlikely]] {
+ LOG(INFO) << "query: " << print_id(query_id)
+ << " has pending block, size: " <<
_pending_block->allocated_bytes();
+ _block = std::move(_pending_block);
+ block = _block.get();
} else {
SCOPED_TIMER(_get_block_timer);
+ DEFER_RELEASE_RESERVED();
_get_block_counter->update(1);
- size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
- sink_reserve_size =
- std::max(sink_reserve_size,
_state->minimum_operator_memory_required_bytes());
- reserve_size = _root->get_reserve_mem_size(_state) +
sink_reserve_size;
+ // size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
+ // sink_reserve_size =
+ // std::max(sink_reserve_size,
_state->minimum_operator_memory_required_bytes());
+ reserve_size = _root->get_reserve_mem_size(_state);
_root->reset_reserve_mem_size(_state);
auto workload_group = _state->get_query_ctx()->workload_group();
@@ -392,14 +397,14 @@ Status PipelineTask::execute(bool* eos) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
<< PrettyPrinter::print(reserve_size,
TUnit::BYTES)
- << "(sink reserve size:("
- << PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
- << "), sink name: " << _sink->get_name()
- << ", node id: " << _sink->node_id() << "
failed: " << st.to_string()
+ << ", sink name: " << _sink->get_name()
+ << ", node id: " << _sink->node_id()
+ << ", task id: " << _state->task_id()
+ << ", failed: " << st.to_string()
<< ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(st);
- _state->get_query_ctx()->set_low_memory_mode();
+ // _state->get_query_ctx()->set_low_memory_mode();
bool is_high_wartermark = false;
bool is_low_wartermark = false;
workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
@@ -418,6 +423,28 @@ Status PipelineTask::execute(bool* eos) {
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
Status status = Status::OK();
+ DEFER_RELEASE_RESERVED();
+ COUNTER_UPDATE(_memory_reserve_times, 1);
+ size_t sink_reserve_size = _sink->get_reserve_mem_size(_state,
*eos);
+ status = thread_context()->try_reserve_memory(sink_reserve_size);
+ if (!status.ok()) {
+ LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
+ << PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
+ << ", sink name: " << _sink->get_name()
+ << ", node id: " << _sink->node_id() << ", task id:
" << _state->task_id()
+ << ", failed: " << status.to_string()
+ << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+ _state->get_query_ctx()->update_paused_reason(status);
+ _memory_sufficient_dependency->block();
+ ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ _state->get_query_ctx()->shared_from_this(),
sink_reserve_size);
+ _pending_block = std::move(_block);
+ _block = vectorized::Block::create_unique();
+ _eos = *eos;
+ *eos = false;
+ continue;
+ }
+
// Define a lambda function to catch sink exception, because sink
will check
// return error status with EOF, it is special, could not return
directly.
auto sink_function = [&]() -> Status {
@@ -547,7 +574,7 @@ std::string PipelineTask::debug_string() {
}
size_t PipelineTask::get_revocable_size() const {
- if (_running || _eos) {
+ if (_running || (_eos && !_pending_block)) {
return 0;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 448b2ec5f6f..44dfdd7832a 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -261,7 +261,9 @@ private:
RuntimeState* _state = nullptr;
int _previous_schedule_id = -1;
uint32_t _schedule_time = 0;
- std::unique_ptr<doris::vectorized::Block> _block;
+ std::unique_ptr<vectorized::Block> _block;
+ std::unique_ptr<vectorized::Block> _pending_block;
+
PipelineFragmentContext* _fragment_context = nullptr;
TaskQueue* _task_queue = nullptr;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2ea548fd6f5..61d961b6c0e 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -472,7 +472,7 @@ Status QueryContext::revoke_memory() {
// Do not use memlimit, use current memory usage.
// For example, if current limit is 1.6G, but current used is 1G, if
reserve failed
// should free 200MB memory, not 300MB
- //const int64_t target_revoking_size =
(int64_t)(query_mem_tracker->consumption() * 0.2);
+ const auto target_revoking_size =
(int64_t)(query_mem_tracker->consumption() * 0.2);
size_t revoked_size = 0;
std::vector<pipeline::PipelineTask*> chosen_tasks;
@@ -481,10 +481,10 @@ Status QueryContext::revoke_memory() {
revoked_size += revocable_size;
// Only revoke the largest task to ensure memory is used as much as
possible
- break;
- //if (revoked_size >= target_revoking_size) {
- // break;
- //}
+ // break;
+ if (revoked_size >= target_revoking_size) {
+ break;
+ }
}
std::weak_ptr<QueryContext> this_ctx = shared_from_this();
diff --git a/be/src/vec/common/hash_table/hash_map_context.h
b/be/src/vec/common/hash_table/hash_map_context.h
index 2d0b46150b1..bd5bab0be97 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -73,6 +73,10 @@ struct MethodBaseInner {
const uint8_t* null_map = nullptr, bool
is_join = false,
bool is_build = false, uint32_t
bucket_size = 0) = 0;
+ [[nodiscard]] virtual size_t estimated_size(const ColumnRawPtrs&
key_columns, size_t num_rows,
+ bool is_join = false, bool
is_build = false,
+ uint32_t bucket_size = 0) = 0;
+
virtual size_t serialized_keys_size(bool is_build) const { return 0; }
void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const
uint8_t* null_map) {
@@ -215,6 +219,22 @@ struct MethodSerialized : public MethodBase<TData> {
return {begin, sum_size};
}
+ size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows,
bool is_join,
+ bool is_build, uint32_t bucket_size) override {
+ size_t size = 0;
+ for (const auto& column : key_columns) {
+ size += column->byte_size();
+ }
+
+ size += sizeof(StringRef) * num_rows; // stored_keys
+ if (is_join) {
+ size += sizeof(uint32_t) * num_rows; // bucket_nums
+ } else {
+ size += sizeof(size_t) * num_rows; // hash_values
+ }
+ return size;
+ }
+
void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t
num_rows,
DorisVector<StringRef>& input_keys, Arena&
input_arena) {
input_arena.clear();
@@ -299,6 +319,18 @@ struct MethodStringNoCache : public MethodBase<TData> {
: (_stored_keys.size() * sizeof(StringRef));
}
+ size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows,
bool is_join,
+ bool is_build, uint32_t bucket_size) override {
+ size_t size = 0;
+ size += sizeof(StringRef) * num_rows; // stored_keys
+ if (is_join) {
+ size += sizeof(uint32_t) * num_rows; // bucket_nums
+ } else {
+ size += sizeof(size_t) * num_rows; // hash_values
+ }
+ return size;
+ }
+
void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t
num_rows,
DorisVector<StringRef>& stored_keys) {
const IColumn& column = *key_columns[0];
@@ -354,6 +386,17 @@ struct MethodOneNumber : public MethodBase<TData> {
using State = ColumnsHashing::HashMethodOneNumber<typename Base::Value,
typename Base::Mapped,
FieldType>;
+ size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows,
bool is_join,
+ bool is_build, uint32_t bucket_size) override {
+ size_t size = 0;
+ if (is_join) {
+ size += sizeof(uint32_t) * num_rows; // bucket_nums
+ } else {
+ size += sizeof(size_t) * num_rows; // hash_values
+ }
+ return size;
+ }
+
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
const uint8_t* null_map = nullptr, bool is_join
= false,
bool is_build = false, uint32_t bucket_size = 0)
override {
@@ -468,6 +511,19 @@ struct MethodKeysFixed : public MethodBase<TData> {
return (is_build ? build_stored_keys.size() : stored_keys.size()) *
sizeof(typename Base::Key);
}
+
+ size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows,
bool is_join,
+ bool is_build, uint32_t bucket_size) override {
+ size_t size = 0;
+ size += sizeof(StringRef) * num_rows; // stored_keys
+ if (is_join) {
+ size += sizeof(uint32_t) * num_rows; // bucket_nums
+ } else {
+ size += sizeof(size_t) * num_rows; // hash_values
+ }
+ return size;
+ }
+
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
const uint8_t* null_map = nullptr, bool is_join
= false,
bool is_build = false, uint32_t bucket_size = 0)
override {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 6ff1fcc0aca..a83f8d485a3 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -347,7 +347,8 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, pipeline::Exchang
_sender_to_local_channel_dependency.resize(num_queues);
for (size_t i = 0; i < num_queues; i++) {
_sender_to_local_channel_dependency[i] =
pipeline::Dependency::create_shared(
- _dest_node_id, _dest_node_id,
"LocalExchangeChannelDependency", true);
+ _dest_node_id, _dest_node_id,
fmt::format("LocalExchangeChannelDependency_{}", i),
+ true);
}
_sender_queues.reserve(num_queues);
int num_sender_per_queue = is_merging ? 1 : num_senders;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]