This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7621008ad4a4e522aa754cd8e50213e809228770
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Apr 17 15:50:29 2024 +0800

    [improvement](spill) avoid occuping too much memory while spill build block 
during the hash join build phase (#33747)
---
 .../exec/partitioned_hash_join_probe_operator.cpp  |   3 +-
 .../exec/partitioned_hash_join_probe_operator.h    |   2 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 166 ++++++++++++++++-----
 .../exec/partitioned_hash_join_sink_operator.h     |   4 +-
 4 files changed, 135 insertions(+), 40 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 35750ee5a55..78dcaf1e6c5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -535,7 +535,7 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
     }
 
     std::vector<uint32_t> partition_indexes[_partition_count];
-    auto* channel_ids = 
reinterpret_cast<uint64_t*>(local_state._partitioner->get_channel_ids());
+    auto* channel_ids = 
reinterpret_cast<uint32_t*>(local_state._partitioner->get_channel_ids());
     for (uint32_t i = 0; i != rows; ++i) {
         partition_indexes[channel_ids[i]].emplace_back(i);
     }
@@ -862,6 +862,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
             RETURN_IF_ERROR(
                     
_inner_probe_operator->pull(local_state._runtime_state.get(), block, eos));
             if (*eos) {
+                _update_profile_from_internal_states(local_state);
                 local_state._runtime_state.reset();
             }
         }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 96a5cf96e34..5bdc5278ffc 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -33,7 +33,7 @@ class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType = 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
 
 class PartitionedHashJoinProbeOperatorX;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a0adf0505f8..c9d61757461 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -45,8 +45,21 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
 
 Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    for (uint32_t i = 0; i != p._partition_count; ++i) {
+        auto& spilling_stream = _shared_state->spilled_streams[i];
+        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                state, spilling_stream, print_id(state->query_id()),
+                fmt::format("hash_build_sink_{}", i), _parent->id(),
+                std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(), _profile));
+        RETURN_IF_ERROR(spilling_stream->prepare_spill());
+        spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
+                                            _spill_data_size, 
_spill_write_disk_timer,
+                                            _spill_write_wait_io_timer);
+    }
     return _partitioner->open(state);
 }
+
 Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
     SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
     SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
@@ -87,39 +100,127 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
+Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+    _shared_state->need_to_spill = true;
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    _shared_state->inner_shared_state->hash_table_variants.reset();
+    auto row_desc = p._child_x->row_desc();
+    auto build_block = 
std::move(_shared_state->inner_shared_state->build_block);
+    if (!build_block) {
+        build_block = vectorized::Block::create_shared();
+        auto inner_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state();
+        if (inner_sink_state) {
+            auto& mutable_block = 
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
+                                          ->_build_side_mutable_block;
+            *build_block = mutable_block.to_block();
+            LOG(INFO) << "hash join sink will revoke build mutable block: "
+                      << build_block->allocated_bytes();
+        }
+    }
+
+    /// Here need to skip the first row in build block.
+    /// The first row in build block is generated by 
`HashJoinBuildSinkOperatorX::sink`.
+    if (build_block->rows() <= 1) {
+        return Status::OK();
+    }
+
+    if (build_block->columns() > row_desc.num_slots()) {
+        build_block->erase(row_desc.num_slots());
+    }
+
+    {
+        /// TODO: DO NOT execute build exprs twice(when partition and building 
hash table)
+        SCOPED_TIMER(_partition_timer);
+        RETURN_IF_ERROR(
+                _partitioner->do_partitioning(state, build_block.get(), 
_mem_tracker.get()));
+    }
+
+    auto execution_context = state->get_task_execution_context();
+    _dependency->block();
+    auto spill_func = [execution_context, build_block, state, this]() {
+        auto execution_context_lock = execution_context.lock();
+        if (!execution_context_lock) {
+            LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+        auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+        SCOPED_TIMER(_partition_shuffle_timer);
+        auto* channel_ids = 
reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
+
+        auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+        std::vector<uint32_t> partition_indices;
+        const auto reserved_size = 4096;
+        partition_indices.reserve(reserved_size);
+
+        auto flush_rows = [&partition_indices, &build_block, &state, this](
+                                  std::unique_ptr<vectorized::MutableBlock>& 
partition_block,
+                                  vectorized::SpillStreamSPtr& 
spilling_stream) {
+            auto* begin = &(partition_indices[0]);
+            const auto count = partition_indices.size();
+            if (!partition_block) {
+                partition_block =
+                        
vectorized::MutableBlock::create_unique(build_block->clone_empty());
+            }
+            partition_block->add_rows(build_block.get(), begin, begin + count);
+            partition_indices.clear();
+
+            if (partition_block->allocated_bytes() >=
+                vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                auto block = partition_block->to_block();
+                partition_block =
+                        
vectorized::MutableBlock::create_unique(build_block->clone_empty());
+                auto status = spilling_stream->spill_block(state, block, 
false);
+
+                if (!status.ok()) {
+                    std::unique_lock<std::mutex> lock(_spill_lock);
+                    _spill_status = status;
+                    _spill_status_ok = false;
+                    _dependency->set_ready();
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        for (uint32_t i = 0; i != p._partition_count; ++i) {
+            vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
+            DCHECK(spilling_stream != nullptr);
+
+            const auto rows = build_block->rows();
+            for (size_t idx = 1; idx != rows; ++idx) {
+                if (channel_ids[idx] == i) {
+                    partition_indices.emplace_back(idx);
+                } else {
+                    continue;
+                }
+
+                const auto count = partition_indices.size();
+                if (UNLIKELY(count >= reserved_size)) {
+                    if (!flush_rows(partitioned_blocks[i], spilling_stream)) {
+                        break;
+                    }
+                }
+            }
+
+            if (!partition_indices.empty()) {
+                flush_rows(partitioned_blocks[i], spilling_stream);
+            }
+        }
+
+        _dependency->set_ready();
+    };
+    auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+    return thread_pool->submit_func(spill_func);
+}
+
 Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
               << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_streams_count, 0);
 
     if (!_shared_state->need_to_spill) {
-        profile()->add_info_string("Spilled", "true");
         _shared_state->need_to_spill = true;
-        auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
-        _shared_state->inner_shared_state->hash_table_variants.reset();
-        auto row_desc = p._child_x->row_desc();
-        auto build_block = 
std::move(_shared_state->inner_shared_state->build_block);
-        if (!build_block) {
-            build_block = vectorized::Block::create_shared();
-            auto inner_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state();
-            if (inner_sink_state) {
-                auto& mutable_block =
-                        
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
-                                ->_build_side_mutable_block;
-                *build_block = mutable_block.to_block();
-                LOG(INFO) << "hash join sink will revoke build mutable block: "
-                          << build_block->allocated_bytes();
-            }
-        }
-
-        /// Here need to skip the first row in build block.
-        /// The first row in build block is generated by 
`HashJoinBuildSinkOperatorX::sink`.
-        if (build_block->rows() > 1) {
-            if (build_block->columns() > row_desc.num_slots()) {
-                build_block->erase(row_desc.num_slots());
-            }
-            RETURN_IF_ERROR(_partition_block(state, build_block.get(), 1, 
build_block->rows()));
-        }
+        return _revoke_unpartitioned_block(state);
     }
 
     _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
@@ -133,16 +234,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
             continue;
         }
 
-        if (!spilling_stream) {
-            
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                    state, spilling_stream, print_id(state->query_id()), 
"hash_build_sink",
-                    _parent->id(), std::numeric_limits<int32_t>::max(),
-                    std::numeric_limits<size_t>::max(), _profile));
-            RETURN_IF_ERROR(spilling_stream->prepare_spill());
-            spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                                _spill_data_size, 
_spill_write_disk_timer,
-                                                _spill_write_wait_io_timer);
-        }
+        DCHECK(spilling_stream != nullptr);
 
         auto* spill_io_pool =
                 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
@@ -201,7 +293,7 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
 
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     SCOPED_TIMER(_partition_shuffle_timer);
-    auto* channel_ids = 
reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
+    auto* channel_ids = 
reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
     std::vector<uint32_t> partition_indexes[p._partition_count];
     DCHECK_LT(begin, end);
     for (size_t i = begin; i != end; ++i) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 3a03c2fc724..3f29e3093b6 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -34,7 +34,7 @@ class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType = 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
 
 class PartitionedHashJoinSinkOperatorX;
 
@@ -60,6 +60,8 @@ protected:
     Status _partition_block(RuntimeState* state, vectorized::Block* in_block, 
size_t begin,
                             size_t end);
 
+    Status _revoke_unpartitioned_block(RuntimeState* state);
+
     friend class PartitionedHashJoinSinkOperatorX;
 
     std::atomic_int _spilling_streams_count {0};


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to