This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 2ae4c86c81e [fix](spill) Thread conflicts caused by non-sink operators
spilling (#45796)
2ae4c86c81e is described below
commit 2ae4c86c81eef96b3c035f9fd076921b563b87f1
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Dec 23 17:40:09 2024 +0800
[fix](spill) Thread conflicts caused by non-sink operators spilling (#45796)
---
be/src/pipeline/exec/operator.h | 8 ----
.../exec/partitioned_hash_join_probe_operator.cpp | 55 +++++++---------------
.../exec/partitioned_hash_join_probe_operator.h | 6 +--
be/src/pipeline/exec/spill_utils.h | 39 +++------------
be/src/pipeline/pipeline_task.cpp | 4 +-
5 files changed, 25 insertions(+), 87 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index af13ded196e..3267416e4b2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -857,14 +857,6 @@ public:
return (_child and !is_source()) ? _child->revocable_mem_size(state) :
0;
}
- Status revoke_memory(RuntimeState* state,
- const std::shared_ptr<SpillContext>& spill_context)
override {
- if (_child and !is_source()) {
- return _child->revoke_memory(state, spill_context);
- }
- return Status::OK();
- }
-
// If this method is not overwrite by child, its default value is 1MB
[[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
return state->minimum_operator_memory_required_bytes();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f6cea157cd5..f5568ba1022 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -172,8 +172,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
return Status::OK();
}
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
- RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state) {
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto query_id = state->query_id();
@@ -208,7 +207,9 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
std::numeric_limits<size_t>::max(),
_runtime_profile.get()));
}
- auto merged_block =
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
+ auto merged_block =
vectorized::MutableBlock::create_unique(std::move(blocks.back()));
+ blocks.pop_back();
+
while (!blocks.empty() && !state->is_cancelled()) {
auto block = std::move(blocks.back());
blocks.pop_back();
@@ -218,17 +219,9 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_probe
spill_probe_blocks failed");
});
-
- if (merged_block->allocated_bytes() >=
- vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
- COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
- RETURN_IF_ERROR(
- spilling_stream->spill_block(state,
merged_block->to_block(), false));
- COUNTER_UPDATE(_spill_probe_blocks, 1);
- }
}
- if (!merged_block->empty()) {
+ if (!merged_block->empty()) [[likely]] {
COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
RETURN_IF_ERROR(
spilling_stream->spill_block(state,
merged_block->to_block(), false));
@@ -256,9 +249,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return status;
};
- if (spill_context) {
- spill_context->on_non_sink_task_started();
- }
_spill_dependency->block();
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
{
return Status::Error<INTERNAL_ERROR>(
@@ -266,8 +256,8 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
});
auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
- state, spill_context, _spill_dependency, _runtime_profile.get(),
- _shared_state->shared_from_this(), exception_catch_func);
+ state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
+ exception_catch_func);
return spill_io_pool->submit(std::move(spill_runnable));
}
@@ -856,27 +846,6 @@ size_t
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
return size_to_reserve;
}
-Status PartitionedHashJoinProbeOperatorX::revoke_memory(
- RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
- auto& local_state = get_local_state(state);
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
- << ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos;
-
- if (local_state._child_eos) {
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash
probe node: " << node_id()
- << ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos
- << ", will not revoke size: " << revocable_mem_size(state);
- return Status::OK();
- }
-
- RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context));
-
- if (_child) {
- return _child->revoke_memory(state, spill_context);
- }
- return Status::OK();
-}
-
Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
@@ -891,7 +860,15 @@ bool
PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = _revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
- return revocable_size > min_revocable_size;
+
+ if (state->get_query_ctx()->low_memory_mode()) {
+ return revocable_size >
+ std::min<int64_t>(min_revocable_size,
+ static_cast<int64_t>(
+
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM));
+ } else {
+ return vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+ }
}
return false;
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 7b77e1e6e3f..40c6b0fcef5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -47,8 +47,7 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
- Status spill_probe_blocks(RuntimeState* state,
- const std::shared_ptr<SpillContext>&
spill_context = nullptr);
+ Status spill_probe_blocks(RuntimeState* state);
Status recover_build_blocks_from_disk(RuntimeState* state, uint32_t
partition_index,
bool& has_data);
@@ -181,9 +180,6 @@ public:
return _inner_probe_operator->require_data_distribution();
}
- Status revoke_memory(RuntimeState* state,
- const std::shared_ptr<SpillContext>& spill_context)
override;
-
private:
Status _revoke_memory(RuntimeState* state);
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index 84a3f8c2e29..c0bb47960c5 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -50,36 +50,18 @@ struct SpillContext {
~SpillContext() {
if (running_tasks_count.load() != 0) {
- LOG_EVERY_T(WARNING, 60) << "Query: " << print_id(query_id)
- << " not all spill tasks finished,
remaining tasks: "
- << running_tasks_count.load();
- }
-
- if (_running_non_sink_tasks_count.load() != 0) {
- LOG_EVERY_T(WARNING, 60)
- << "Query: " << print_id(query_id)
- << " not all spill tasks(non sink tasks) finished,
remaining tasks: "
- << _running_non_sink_tasks_count.load();
+ LOG(WARNING) << "Query: " << print_id(query_id)
+ << " not all spill tasks finished, remaining tasks: "
+ << running_tasks_count.load();
}
}
void on_task_finished() {
auto count = running_tasks_count.fetch_sub(1);
- if (count == 1 && _running_non_sink_tasks_count.load() == 0) {
- all_tasks_finished_callback(this);
- }
- }
-
- void on_non_sink_task_started() {
_running_non_sink_tasks_count.fetch_add(1); }
- void on_non_sink_task_finished() {
- const auto count = _running_non_sink_tasks_count.fetch_sub(1);
- if (count == 1 && running_tasks_count.load() == 0) {
+ if (count == 1) {
all_tasks_finished_callback(this);
}
}
-
-private:
- std::atomic_int _running_non_sink_tasks_count {0};
};
class SpillRunnable : public Runnable {
@@ -233,20 +215,13 @@ public:
class SpillNonSinkRunnable : public SpillRunnable {
public:
- SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext>
spill_context,
- std::shared_ptr<Dependency> spill_dependency,
RuntimeProfile* profile,
+ SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<Dependency>
spill_dependency,
+ RuntimeProfile* profile,
const std::shared_ptr<BasicSpillSharedState>&
shared_state,
std::function<Status()> spill_exec_func,
std::function<Status()> spill_fin_cb = {})
- : SpillRunnable(state, spill_context, spill_dependency, profile,
shared_state, true,
+ : SpillRunnable(state, nullptr, spill_dependency, profile,
shared_state, true,
spill_exec_func, spill_fin_cb) {}
-
-protected:
- void _on_task_finished() override {
- if (_spill_context) {
- _spill_context->on_non_sink_task_finished();
- }
- }
};
class SpillRecoverRunnable : public SpillRunnable {
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 9d284b31861..0ddc329da3b 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -619,7 +619,7 @@ size_t PipelineTask::get_revocable_size() const {
return 0;
}
- return _sink->revocable_mem_size(_state) +
_root->revocable_mem_size(_state);
+ return _sink->revocable_mem_size(_state);
}
Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
@@ -632,8 +632,6 @@ Status PipelineTask::revoke_memory(const
std::shared_ptr<SpillContext>& spill_co
return Status::OK();
}
- RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context));
-
const auto revocable_size = _sink->revocable_mem_size(_state);
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]