This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 8d7f09a0c53 [fix](spill) runtime filter and add some counters (#46999)
8d7f09a0c53 is described below
commit 8d7f09a0c5348a663de8d2b0ffbd5f078ec8d916
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Jan 16 21:17:55 2025 +0800
[fix](spill) runtime filter and add some counters (#46999)
---
.../exec/partitioned_hash_join_probe_operator.cpp | 3 ++-
.../exec/partitioned_hash_join_sink_operator.cpp | 29 +++++++++++++++-------
.../exec/partitioned_hash_join_sink_operator.h | 2 ++
3 files changed, 24 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 120c6bcbd06..565aeaa5fee 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -534,7 +534,9 @@ Status
PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(_inner_probe_operator->set_child(child));
DCHECK(_build_side_child != nullptr);
_inner_probe_operator->set_build_side_child(_build_side_child);
+ RETURN_IF_ERROR(_inner_sink_operator->set_child(_build_side_child));
RETURN_IF_ERROR(_inner_probe_operator->open(state));
+ RETURN_IF_ERROR(_inner_sink_operator->open(state));
_child = std::move(child);
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
@@ -948,7 +950,6 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
local_state._shared_state->inner_runtime_state.get(),
block, eos));
if (*eos) {
_update_profile_from_internal_states(local_state);
- local_state._shared_state->inner_runtime_state.reset();
}
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 4da7abec23e..3546818a1a9 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -24,6 +24,7 @@
#include <mutex>
#include "common/logging.h"
+#include "common/status.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
@@ -52,7 +53,7 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
"HashJoinBuildSpillDependency", true);
state->get_task()->add_spill_dependency(_spill_dependency.get());
- _internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
+ _internal_runtime_profile =
std::make_unique<RuntimeProfile>("internal_profile");
_partition_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionTime",
1);
_partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillPartitionShuffleTime", 1);
@@ -60,7 +61,6 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow",
TUnit::UNIT, 1);
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved",
TUnit::BYTES, 1);
-
return Status::OK();
}
@@ -70,6 +70,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState*
state) {
_shared_state->setup_shared_profile(_profile);
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ RETURN_IF_ERROR(p._setup_internal_operator(state));
for (uint32_t i = 0; i != p._partition_count; ++i) {
auto& spilling_stream = _shared_state->spilled_streams[i];
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -87,6 +88,11 @@ Status
PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec
return Status::OK();
}
dec_running_big_mem_op_num(state);
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) {
+
RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->inner_runtime_state.get(),
+ exec_status));
+ }
return PipelineXSpillSinkLocalState::close(state, exec_status);
}
@@ -156,6 +162,15 @@ size_t
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
return size_to_reserve;
}
+Dependency* PartitionedHashJoinSinkLocalState::finishdependency() {
+ if (auto* tmp_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state()) {
+ auto* inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
+ return inner_sink_state->finishdependency();
+ }
+ DCHECK(false) << "Should not reach here!";
+ return nullptr;
+}
+
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
@@ -176,6 +191,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
if (inner_sink_state) {
build_block = inner_sink_state->_build_side_mutable_block.to_block();
block_old_mem = build_block.allocated_bytes();
+ // If spilling was triggered, constructing runtime filters is
meaningless,
+ // therefore, all runtime filters are temporarily disabled.
RETURN_IF_ERROR(inner_sink_state->disable_runtime_filters(
_shared_state->inner_runtime_state.get()));
}
@@ -503,6 +520,7 @@ Status
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
local_state._shared_state->inner_runtime_state =
RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(),
state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(),
state->get_query_ctx());
+
local_state._shared_state->inner_runtime_state->set_task(state->get_task());
local_state._shared_state->inner_runtime_state->set_task_execution_context(
state->get_task_execution_context().lock());
local_state._shared_state->inner_runtime_state->set_be_number(state->be_number());
@@ -582,10 +600,6 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
if (need_to_spill) {
return revoke_memory(state, nullptr);
} else {
- if (UNLIKELY(!local_state._shared_state->inner_runtime_state))
{
- RETURN_IF_ERROR(_setup_internal_operator(state));
- }
-
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_sink "
@@ -633,9 +647,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
return revoke_memory(state, nullptr);
}
} else {
- if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
- RETURN_IF_ERROR(_setup_internal_operator(state));
- }
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_sink "
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e1a76fa17de..73955932427 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -52,6 +52,8 @@ public:
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
void update_memory_usage();
+ Dependency* finishdependency() override;
+
protected:
PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
:
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]