This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5571b1f93fb [improvement](spill) optimize the spilling logic of hash
join operator (#32202)
5571b1f93fb is described below
commit 5571b1f93fbd342bc8fd3accda3f18048efaf4d2
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Mar 20 08:20:40 2024 +0800
[improvement](spill) optimize the spilling logic of hash join operator
(#32202)
---
.../exec/partitioned_hash_join_probe_operator.cpp | 86 +++++++++++++++++++---
.../exec/partitioned_hash_join_probe_operator.h | 8 ++
.../exec/partitioned_hash_join_sink_operator.cpp | 33 ++++++++-
.../exec/partitioned_hash_join_sink_operator.h | 6 ++
be/src/pipeline/pipeline_x/dependency.h | 1 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 2 +-
6 files changed, 121 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f37f28d59c7..93ce24e8e72 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -58,6 +58,21 @@ Status
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
_recovery_probe_blocks =
ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT,
"SpillAndPartition");
+ _spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL(
+ Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition",
1);
+ _spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteDiskTime",
+ "SpillAndPartition",
1);
+ _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteDataSize",
+ TUnit::BYTES,
"SpillAndPartition", 1);
+ _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount",
+ TUnit::UNIT,
"SpillAndPartition", 1);
+ _spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadDataTime",
+ "SpillAndPartition", 1);
+ _spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillDeserializeTime",
+ "SpillAndPartition",
1);
+ _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadDataSize",
+ TUnit::BYTES,
"SpillAndPartition", 1);
+
// Build phase
_build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
_build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows",
TUnit::UNIT, "BuildPhase");
@@ -141,7 +156,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
uint32_t
partition_index) {
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
- if (!mutable_block || mutable_block->rows() == 0) {
+ if (!mutable_block || mutable_block->rows() < state->batch_size()) {
--_spilling_task_count;
return Status::OK();
}
@@ -153,6 +168,8 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
_parent->id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _runtime_profile.get()));
RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
+
build_spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
+ _spill_data_size,
_spill_write_disk_timer);
}
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@@ -191,18 +208,28 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(),
_runtime_profile.get()));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
+ spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
+ _spill_data_size,
_spill_write_disk_timer);
}
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
spilling_stream->get_spill_root_dir());
auto& blocks = _probe_blocks[partition_index];
+ auto& partitioned_block = _partitioned_blocks[partition_index];
+ if (partitioned_block && partitioned_block->rows() >= state->batch_size())
{
+ blocks.emplace_back(partitioned_block->to_block());
+ partitioned_block.reset();
+ }
if (!blocks.empty()) {
return spill_io_pool->submit_func([state, &blocks, &spilling_stream,
this] {
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
- for (auto& block : blocks) {
+ COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+ while (!blocks.empty()) {
+ auto block = std::move(blocks.back());
+ blocks.pop_back();
if (_spill_status_ok) {
auto st = spilling_stream->spill_block(block, false);
if (!st.ok()) {
@@ -217,8 +244,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}
}
- COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
- blocks.clear();
--_spilling_task_count;
if (_spilling_task_count == 0) {
@@ -241,6 +266,8 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
if (build_spilling_stream) {
build_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(build_spilling_stream->spill_eof());
+ build_spilling_stream->set_read_counters(_spill_read_data_time,
_spill_deserialize_time,
+ _spill_read_bytes);
}
auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
@@ -248,6 +275,8 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
if (probe_spilling_stream) {
probe_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
+ probe_spilling_stream->set_read_counters(_spill_read_data_time,
_spill_deserialize_time,
+ _spill_read_bytes);
}
return Status::OK();
@@ -259,6 +288,8 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
+ LOG(INFO) << "no data need to recovery for partition: " <<
partition_index
+ << ", node id: " << _parent->id() << ", task id: " <<
state->task_id();
return Status::OK();
}
@@ -288,6 +319,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
continue;
}
+ DCHECK_EQ(mutable_block->columns(), block.columns());
if (mutable_block->empty()) {
*mutable_block = std::move(block);
} else {
@@ -301,6 +333,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
}
+ LOG(INFO) << "recovery data done for partition: " <<
spilled_stream->get_spill_dir();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
_dependency->set_ready();
@@ -350,6 +383,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
}
if (eos) {
+ LOG(INFO) << "recovery probe data done: " <<
spilled_stream->get_spill_dir();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
@@ -401,10 +435,17 @@ Status PartitionedHashJoinProbeOperatorX::init(const
TPlanNode& tnode, RuntimeSt
return _probe_operator->init(tnode_, state);
}
Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(OperatorXBase::prepare(state));
+ // here do NOT call `OperatorXBase::prepare(state)`
+ // RETURN_IF_ERROR(OperatorXBase::prepare(state));
+ for (auto& conjunct : _conjuncts) {
+ RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
+ }
+
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
intermediate_row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state,
*_intermediate_row_desc));
RETURN_IF_ERROR(_probe_operator->set_child(_child_x));
- RETURN_IF_ERROR(_probe_operator->set_child(_build_side_child));
+ DCHECK(_build_side_child != nullptr);
+ _probe_operator->set_build_side_child(_build_side_child);
RETURN_IF_ERROR(_sink_operator->set_child(_build_side_child));
RETURN_IF_ERROR(_probe_operator->prepare(state));
RETURN_IF_ERROR(_sink_operator->prepare(state));
@@ -524,6 +565,9 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
partitioned_block.reset();
}
RETURN_IF_ERROR(_sink_operator->sink(local_state._runtime_state.get(),
&block, true));
+ LOG(INFO) << "internal build operator finished, node id: " << id()
+ << ", task id: " << state->task_id()
+ << ", partition: " << local_state._partition_cursor;
return Status::OK();
}
@@ -615,18 +659,25 @@ bool
PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st
size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState*
state) const {
auto& local_state = get_local_state(state);
size_t mem_size = 0;
+ uint32_t spilling_start = local_state._child_eos ?
local_state._partition_cursor + 1 : 0;
+ DCHECK_GE(spilling_start, local_state._partition_cursor);
auto& partitioned_build_blocks =
local_state._shared_state->partitioned_build_blocks;
auto& probe_blocks = local_state._probe_blocks;
- for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count;
++i) {
+ for (uint32_t i = spilling_start; i < _partition_count; ++i) {
auto& build_block = partitioned_build_blocks[i];
- if (build_block && build_block->rows() > 0) {
+ if (build_block && build_block->rows() >= state->batch_size()) {
mem_size += build_block->allocated_bytes();
}
for (auto& block : probe_blocks[i]) {
mem_size += block.allocated_bytes();
}
+
+ auto& partitioned_block = local_state._partitioned_blocks[i];
+ if (partitioned_block && partitioned_block->rows() >=
state->batch_size()) {
+ mem_size += partitioned_block->allocated_bytes();
+ }
}
return mem_size;
}
@@ -634,14 +685,16 @@ size_t
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state,
bool& wait_for_io) {
auto& local_state = get_local_state(state);
wait_for_io = false;
- if (_partition_count > (local_state._partition_cursor + 1)) {
- local_state._spilling_task_count =
- (_partition_count - local_state._partition_cursor - 1) * 2;
+ uint32_t spilling_start = local_state._child_eos ?
local_state._partition_cursor + 1 : 0;
+ DCHECK_GE(spilling_start, local_state._partition_cursor);
+
+ if (_partition_count > spilling_start) {
+ local_state._spilling_task_count = (_partition_count - spilling_start)
* 2;
} else {
return Status::OK();
}
- for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count;
++i) {
+ for (uint32_t i = spilling_start; i < _partition_count; ++i) {
RETURN_IF_ERROR(local_state.spill_build_block(state, i));
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
}
@@ -657,6 +710,14 @@ Status
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
}
bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState*
state) const {
+ auto& local_state = get_local_state(state);
+
+ if (local_state._shared_state->need_to_spill) {
+ const auto revocable_size = revocable_mem_size(state);
+ const auto min_revocable_size = state->min_revocable_mem();
+ return revocable_size > min_revocable_size;
+ }
+
auto sys_mem_available = MemInfo::sys_mem_available();
auto sys_mem_warning_water_mark =
doris::MemInfo::sys_mem_available_warning_water_mark();
@@ -692,6 +753,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
bool wait_for_io = false;
RETURN_IF_ERROR(_revoke_memory(state, wait_for_io));
if (wait_for_io) {
+ local_state._shared_state->need_to_spill = true;
return Status::OK();
}
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 27c4a9e0bb2..adbaf19314f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -100,6 +100,14 @@ private:
RuntimeProfile::Counter* _recovery_probe_rows = nullptr;
RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
+ RuntimeProfile::Counter* _spill_read_data_time = nullptr;
+ RuntimeProfile::Counter* _spill_deserialize_time = nullptr;
+ RuntimeProfile::Counter* _spill_read_bytes = nullptr;
+ RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
+ RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
+ RuntimeProfile::Counter* _spill_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_block_count = nullptr;
+
RuntimeProfile::Counter* _build_phase_label = nullptr;
RuntimeProfile::Counter* _build_rows_counter = nullptr;
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 5cbf2b15ec1..20b3531c0cd 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -36,6 +36,11 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_partition_timer = ADD_TIMER(profile(), "PartitionTime");
_partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime");
+ _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillSerializeBlockTime", 1);
+ _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWriteDiskTime", 1);
+ _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize",
TUnit::BYTES, 1);
+ _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
+
return _partitioner->prepare(state, p._child_x->row_desc());
}
@@ -51,7 +56,8 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
- if (!mutable_block || mutable_block->rows() == 0) {
+ if (!mutable_block || mutable_block->rows() < state->batch_size()) {
+ --_spilling_streams_count;
continue;
}
@@ -61,6 +67,8 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
_parent->id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
+ spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
+ _spill_data_size,
_spill_write_disk_timer);
}
auto* spill_io_pool =
@@ -79,9 +87,14 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}
if (_spilling_streams_count > 0) {
+ _shared_state->need_to_spill = true;
std::unique_lock<std::mutex> lock(_spill_lock);
if (_spilling_streams_count > 0) {
_dependency->block();
+ } else if (_child_eos) {
+ LOG(INFO) << "sink eos, set_ready_to_read, node id: " <<
_parent->id()
+ << ", task id: " << state->task_id();
+ _dependency->set_ready_to_read();
}
}
return Status::OK();
@@ -108,6 +121,11 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
if (_spilling_streams_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
+ if (_child_eos) {
+ LOG(INFO) << "sink eos, set_ready_to_read, node id: " <<
_parent->id()
+ << ", task id: " << state()->task_id();
+ _dependency->set_ready_to_read();
+ }
}
}
@@ -157,6 +175,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
return local_state._spill_status;
}
+ local_state._child_eos = eos;
+
const auto rows = in_block->rows();
if (rows > 0) {
@@ -190,9 +210,18 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
partitioned_blocks[i]->add_rows(in_block,
&(partition_indexes[i][0]),
&(partition_indexes[i][count]));
}
+
+ if (local_state._shared_state->need_to_spill) {
+ const auto revocable_size = revocable_mem_size(state);
+ if (revocable_size > state->min_revocable_mem()) {
+ return local_state.revoke_memory(state);
+ }
+ }
}
if (eos) {
+ LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id()
+ << ", task id: " << state->task_id();
local_state._dependency->set_ready_to_read();
}
@@ -207,7 +236,7 @@ size_t
PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
size_t mem_size = 0;
for (uint32_t i = 0; i != _partition_count; ++i) {
auto& block = partitioned_blocks[i];
- if (block) {
+ if (block && block->rows() >= state->batch_size()) {
mem_size += block->allocated_bytes();
}
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 152b36459b3..f2d5ca3e140 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -61,6 +61,8 @@ protected:
std::atomic<bool> _spill_status_ok {true};
std::mutex _spill_lock;
+ bool _child_eos {false};
+
Status _spill_status;
std::mutex _spill_status_lock;
@@ -68,6 +70,10 @@ protected:
RuntimeProfile::Counter* _partition_timer = nullptr;
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
+ RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
+ RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
+ RuntimeProfile::Counter* _spill_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_block_count = nullptr;
};
class PartitionedHashJoinSinkOperatorX
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index db2b0341fe4..7815d4a9ce0 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -578,6 +578,7 @@ struct HashJoinSharedState : public JoinSharedState {
struct PartitionedHashJoinSharedState : public HashJoinSharedState {
std::vector<std::unique_ptr<vectorized::MutableBlock>>
partitioned_build_blocks;
std::vector<vectorized::SpillStreamSPtr> spilled_streams;
+ bool need_to_spill = false;
};
struct NestedLoopJoinSharedState : public JoinSharedState {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1df715176ac..ecb7023be87 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1031,7 +1031,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
tnode.hash_join_node.is_broadcast_join;
const auto enable_join_spill = _runtime_state->enable_join_spill();
if (enable_join_spill && !is_broadcast_join) {
- const uint32_t partition_count = 16;
+ const uint32_t partition_count = 32;
op.reset(new PartitionedHashJoinProbeOperatorX(pool, tnode,
next_operator_id(), descs,
partition_count));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]