This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 895102b3d04 [Chore](be) Stop spill hash join repartition on cancel 
(#63456)
895102b3d04 is described below

commit 895102b3d0418d078f97ef135ae18bc7a900b05c
Author: Pxl <[email protected]>
AuthorDate: Fri May 22 15:56:07 2026 +0800

    [Chore](be) Stop spill hash join repartition on cancel (#63456)
    
    Problem Summary: Partitioned hash join spill recovery could continue
    normal repartition progress after cancellation because some loops
    stopped on `state->is_cancelled()` but then fell through to completion
    handling. This could mark partially recovered or repartitioned spill
    data as complete. This PR returns the cancellation status before
    advancing partition state, clears recovered build data during close, and
    replaces a debug-only child EOS assertion with a runtime error.
---
 .../partitioned_hash_join_probe_operator.cpp       | 19 ++++++++---
 .../partitioned_hash_join_probe_operator_test.cpp  | 39 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 56e80cee3f8..4810aca89fc 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -36,6 +36,7 @@
 #include "exec/spill/spill_repartitioner.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_profile.h"
+#include "runtime/runtime_state.h"
 
 namespace doris {
 
@@ -215,6 +216,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
         }
         _current_probe_reader.reset();
     }
+    _recovered_build_block.reset();
 
     // Clean up any remaining spill partition queue entries
     for (auto& entry : _spill_partition_queue) {
@@ -347,7 +349,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
         RETURN_IF_ERROR(_current_build_reader->open());
     }
     bool eos = false;
-    while (!eos) {
+    while (!eos && !state->is_cancelled()) {
         Block block;
         RETURN_IF_ERROR(_current_build_reader->read(&block, &eos));
         COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -371,6 +373,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
             return Status::OK(); // yield — buffer full, more data may remain
         }
     }
+    RETURN_IF_CANCELLED(state);
     // Build file fully consumed.
     RETURN_IF_ERROR(_current_build_reader->close());
     _current_build_reader.reset();
@@ -407,6 +410,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
             return Status::OK(); // yield — enough data read
         }
     }
+    RETURN_IF_CANCELLED(state);
     // Probe file fully consumed.
     RETURN_IF_ERROR(_current_probe_reader->close());
     _current_probe_reader.reset();
@@ -414,6 +418,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
     return Status::OK();
 }
 
+// 
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
 existing spill repartition state machine handles build/probe phases together.
 Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition) {
     auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -472,6 +477,7 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
         }
     }
     RETURN_IF_ERROR(_repartitioner.finalize());
+    RETURN_IF_CANCELLED(state);
     _recovered_build_block.reset();
     _current_build_reader.reset(); // clear any leftover reader state
     partition.build_file.reset();
@@ -495,9 +501,9 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
         while (!done && !state->is_cancelled()) {
             RETURN_IF_ERROR(_repartitioner.repartition(state, 
partition.probe_file, &done));
         }
-        partition.probe_file.reset();
-
         RETURN_IF_ERROR(_repartitioner.finalize());
+        RETURN_IF_CANCELLED(state);
+        partition.probe_file.reset();
         _current_probe_reader.reset();
     }
 
@@ -696,7 +702,11 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
     // per-partition build and probe spill streams. After this point every 
partition
     // (including the original "level-0" ones) is accessed uniformly via the 
queue.
     if (!local_state._spill_queue_initialized) {
-        DCHECK(local_state._child_eos) << "pull() with is_spilled=true called 
before child EOS";
+        if (UNLIKELY(!local_state._child_eos)) {
+            return Status::InternalError(
+                    "query:{}, node:{}, pull() with is_spilled=true called 
before child EOS",
+                    print_id(state->query_id()), node_id());
+        }
         // There maybe some blocks still in partitioned block or probe blocks. 
Flush them to disk.
         RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
         // Close all probe writers so that SpillFile metadata (part_count, 
etc.)
@@ -729,6 +739,7 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
     return _pull_from_spill_queue(local_state, state, output_block, eos);
 }
 
+// 
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
 existing spill queue pull handles setup, recovery, and probing phases.
 Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state, 
Block* output_block,
         bool* eos) const {
diff --git 
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index e50ccb1ec35..7e69aed26a9 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1139,6 +1139,45 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskEmpty) {
     ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
 }
 
+TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(relative_path, spill_file)
+                        .ok());
+
+    {
+        SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
+
+    _helper.runtime_state->cancel(Status::Cancelled("test cancel"));
+
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+    auto status = 
local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                   
partition_info);
+    ASSERT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    ASSERT_NE(partition_info.build_file, nullptr);
+    ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
+
+    ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok());
+    
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file);
+    partition_info.build_file.reset();
+}
+
 TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskLargeData) {
     // Similar setup as above...
     auto [probe_operator, sink_operator] = _helper.create_operators();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to