This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7621008ad4a4e522aa754cd8e50213e809228770 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Apr 17 15:50:29 2024 +0800 [improvement](spill) avoid occuping too much memory while spill build block during the hash join build phase (#33747) --- .../exec/partitioned_hash_join_probe_operator.cpp | 3 +- .../exec/partitioned_hash_join_probe_operator.h | 2 +- .../exec/partitioned_hash_join_sink_operator.cpp | 166 ++++++++++++++++----- .../exec/partitioned_hash_join_sink_operator.h | 4 +- 4 files changed, 135 insertions(+), 40 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 35750ee5a55..78dcaf1e6c5 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -535,7 +535,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: } std::vector<uint32_t> partition_indexes[_partition_count]; - auto* channel_ids = reinterpret_cast<uint64_t*>(local_state._partitioner->get_channel_ids()); + auto* channel_ids = reinterpret_cast<uint32_t*>(local_state._partitioner->get_channel_ids()); for (uint32_t i = 0; i != rows; ++i) { partition_indexes[channel_ids[i]].emplace_back(i); } @@ -862,6 +862,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori RETURN_IF_ERROR( _inner_probe_operator->pull(local_state._runtime_state.get(), block, eos)); if (*eos) { + _update_profile_from_internal_states(local_state); local_state._runtime_state.reset(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 96a5cf96e34..5bdc5278ffc 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -33,7 +33,7 @@ class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>; +using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>; class PartitionedHashJoinProbeOperatorX; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a0adf0505f8..c9d61757461 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -45,8 +45,21 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state)); + auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + for (uint32_t i = 0; i != p._partition_count; ++i) { + auto& spilling_stream = _shared_state->spilled_streams[i]; + RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + state, spilling_stream, print_id(state->query_id()), + fmt::format("hash_build_sink_{}", i), _parent->id(), + std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile)); + RETURN_IF_ERROR(spilling_stream->prepare_spill()); + spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, + _spill_data_size, _spill_write_disk_timer, + _spill_write_wait_io_timer); + } return _partitioner->open(state); } + Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter()); SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer); @@ -87,39 +100,127 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state return mem_size; } +Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) { + _shared_state->need_to_spill = true; + auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + _shared_state->inner_shared_state->hash_table_variants.reset(); + auto row_desc = p._child_x->row_desc(); + auto build_block = std::move(_shared_state->inner_shared_state->build_block); + if (!build_block) { + build_block = vectorized::Block::create_shared(); + auto inner_sink_state = _shared_state->inner_runtime_state->get_sink_local_state(); + if (inner_sink_state) { + auto& mutable_block = reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state) + ->_build_side_mutable_block; + *build_block = mutable_block.to_block(); + LOG(INFO) << "hash join sink will revoke build mutable block: " + << build_block->allocated_bytes(); + } + } + + /// Here need to skip the first row in build block. + /// The first row in build block is generated by `HashJoinBuildSinkOperatorX::sink`. + if (build_block->rows() <= 1) { + return Status::OK(); + } + + if (build_block->columns() > row_desc.num_slots()) { + build_block->erase(row_desc.num_slots()); + } + + { + /// TODO: DO NOT execute build exprs twice(when partition and building hash table) + SCOPED_TIMER(_partition_timer); + RETURN_IF_ERROR( + _partitioner->do_partitioning(state, build_block.get(), _mem_tracker.get())); + } + + auto execution_context = state->get_task_execution_context(); + _dependency->block(); + auto spill_func = [execution_context, build_block, state, this]() { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } + auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + SCOPED_TIMER(_partition_shuffle_timer); + auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids()); + + auto& partitioned_blocks = _shared_state->partitioned_build_blocks; + std::vector<uint32_t> partition_indices; + const auto reserved_size = 4096; + partition_indices.reserve(reserved_size); + + auto flush_rows = [&partition_indices, &build_block, &state, this]( + std::unique_ptr<vectorized::MutableBlock>& partition_block, + vectorized::SpillStreamSPtr& spilling_stream) { + auto* begin = &(partition_indices[0]); + const auto count = partition_indices.size(); + if (!partition_block) { + partition_block = + vectorized::MutableBlock::create_unique(build_block->clone_empty()); + } + partition_block->add_rows(build_block.get(), begin, begin + count); + partition_indices.clear(); + + if (partition_block->allocated_bytes() >= + vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + auto block = partition_block->to_block(); + partition_block = + vectorized::MutableBlock::create_unique(build_block->clone_empty()); + auto status = spilling_stream->spill_block(state, block, false); + + if (!status.ok()) { + std::unique_lock<std::mutex> lock(_spill_lock); + _spill_status = status; + _spill_status_ok = false; + _dependency->set_ready(); + return false; + } + } + return true; + }; + + for (uint32_t i = 0; i != p._partition_count; ++i) { + vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; + DCHECK(spilling_stream != nullptr); + + const auto rows = build_block->rows(); + for (size_t idx = 1; idx != rows; ++idx) { + if (channel_ids[idx] == i) { + partition_indices.emplace_back(idx); + } else { + continue; + } + + const auto count = partition_indices.size(); + if (UNLIKELY(count >= reserved_size)) { + if (!flush_rows(partitioned_blocks[i], spilling_stream)) { + break; + } + } + } + + if (!partition_indices.empty()) { + flush_rows(partitioned_blocks[i], spilling_stream); + } + } + + _dependency->set_ready(); + }; + auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + return thread_pool->submit_func(spill_func); +} + Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory" << ", eos: " << _child_eos; DCHECK_EQ(_spilling_streams_count, 0); if (!_shared_state->need_to_spill) { - profile()->add_info_string("Spilled", "true"); _shared_state->need_to_spill = true; - auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); - _shared_state->inner_shared_state->hash_table_variants.reset(); - auto row_desc = p._child_x->row_desc(); - auto build_block = std::move(_shared_state->inner_shared_state->build_block); - if (!build_block) { - build_block = vectorized::Block::create_shared(); - auto inner_sink_state = _shared_state->inner_runtime_state->get_sink_local_state(); - if (inner_sink_state) { - auto& mutable_block = - reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state) - ->_build_side_mutable_block; - *build_block = mutable_block.to_block(); - LOG(INFO) << "hash join sink will revoke build mutable block: " - << build_block->allocated_bytes(); - } - } - - /// Here need to skip the first row in build block. - /// The first row in build block is generated by `HashJoinBuildSinkOperatorX::sink`. - if (build_block->rows() > 1) { - if (build_block->columns() > row_desc.num_slots()) { - build_block->erase(row_desc.num_slots()); - } - RETURN_IF_ERROR(_partition_block(state, build_block.get(), 1, build_block->rows())); - } + return _revoke_unpartitioned_block(state); } _spilling_streams_count = _shared_state->partitioned_build_blocks.size(); @@ -133,16 +234,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { continue; } - if (!spilling_stream) { - RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( - state, spilling_stream, print_id(state->query_id()), "hash_build_sink", - _parent->id(), std::numeric_limits<int32_t>::max(), - std::numeric_limits<size_t>::max(), _profile)); - RETURN_IF_ERROR(spilling_stream->prepare_spill()); - spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer); - } + DCHECK(spilling_stream != nullptr); auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); @@ -201,7 +293,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); SCOPED_TIMER(_partition_shuffle_timer); - auto* channel_ids = reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids()); + auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids()); std::vector<uint32_t> partition_indexes[p._partition_count]; DCHECK_LT(begin, end); for (size_t i = begin; i != end; ++i) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 3a03c2fc724..3f29e3093b6 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -34,7 +34,7 @@ class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>; +using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>; class PartitionedHashJoinSinkOperatorX; @@ -60,6 +60,8 @@ protected: Status _partition_block(RuntimeState* state, vectorized::Block* in_block, size_t begin, size_t end); + Status _revoke_unpartitioned_block(RuntimeState* state); + friend class PartitionedHashJoinSinkOperatorX; std::atomic_int _spilling_streams_count {0}; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org