This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 895102b3d04 [Chore](be) Stop spill hash join repartition on cancel
(#63456)
895102b3d04 is described below
commit 895102b3d0418d078f97ef135ae18bc7a900b05c
Author: Pxl <[email protected]>
AuthorDate: Fri May 22 15:56:07 2026 +0800
[Chore](be) Stop spill hash join repartition on cancel (#63456)
Problem Summary: Partitioned hash join spill recovery could continue
normal repartition progress after cancellation because some loops
stopped on `state->is_cancelled()` but then fell through to completion
handling. This could mark partially recovered or repartitioned spill
data as complete. This PR returns the cancellation status before
advancing partition state, clears recovered build data during close, and
replaces a debug-only child EOS assertion with a runtime error.
---
.../partitioned_hash_join_probe_operator.cpp | 19 ++++++++---
.../partitioned_hash_join_probe_operator_test.cpp | 39 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 56e80cee3f8..4810aca89fc 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -36,6 +36,7 @@
#include "exec/spill/spill_repartitioner.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_profile.h"
+#include "runtime/runtime_state.h"
namespace doris {
@@ -215,6 +216,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
}
_current_probe_reader.reset();
}
+ _recovered_build_block.reset();
// Clean up any remaining spill partition queue entries
for (auto& entry : _spill_partition_queue) {
@@ -347,7 +349,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
RETURN_IF_ERROR(_current_build_reader->open());
}
bool eos = false;
- while (!eos) {
+ while (!eos && !state->is_cancelled()) {
Block block;
RETURN_IF_ERROR(_current_build_reader->read(&block, &eos));
COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -371,6 +373,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
return Status::OK(); // yield — buffer full, more data may remain
}
}
+ RETURN_IF_CANCELLED(state);
// Build file fully consumed.
RETURN_IF_ERROR(_current_build_reader->close());
_current_build_reader.reset();
@@ -407,6 +410,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
return Status::OK(); // yield — enough data read
}
}
+ RETURN_IF_CANCELLED(state);
// Probe file fully consumed.
RETURN_IF_ERROR(_current_probe_reader->close());
_current_probe_reader.reset();
@@ -414,6 +418,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
return Status::OK();
}
+//
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
existing spill repartition state machine handles build/probe phases together.
Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
RuntimeState* state, JoinSpillPartitionInfo& partition) {
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -472,6 +477,7 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
}
}
RETURN_IF_ERROR(_repartitioner.finalize());
+ RETURN_IF_CANCELLED(state);
_recovered_build_block.reset();
_current_build_reader.reset(); // clear any leftover reader state
partition.build_file.reset();
@@ -495,9 +501,9 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
while (!done && !state->is_cancelled()) {
RETURN_IF_ERROR(_repartitioner.repartition(state,
partition.probe_file, &done));
}
- partition.probe_file.reset();
-
RETURN_IF_ERROR(_repartitioner.finalize());
+ RETURN_IF_CANCELLED(state);
+ partition.probe_file.reset();
_current_probe_reader.reset();
}
@@ -696,7 +702,11 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
// per-partition build and probe spill streams. After this point every
partition
// (including the original "level-0" ones) is accessed uniformly via the
queue.
if (!local_state._spill_queue_initialized) {
- DCHECK(local_state._child_eos) << "pull() with is_spilled=true called
before child EOS";
+ if (UNLIKELY(!local_state._child_eos)) {
+ return Status::InternalError(
+ "query:{}, node:{}, pull() with is_spilled=true called
before child EOS",
+ print_id(state->query_id()), node_id());
+ }
// There maybe some blocks still in partitioned block or probe blocks.
Flush them to disk.
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
// Close all probe writers so that SpillFile metadata (part_count,
etc.)
@@ -729,6 +739,7 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
return _pull_from_spill_queue(local_state, state, output_block, eos);
}
+//
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
existing spill queue pull handles setup, recovery, and probing phases.
Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state,
Block* output_block,
bool* eos) const {
diff --git
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index e50ccb1ec35..7e69aed26a9 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1139,6 +1139,45 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskEmpty) {
ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
}
+TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) {
+ auto [probe_operator, sink_operator] = _helper.create_operators();
+
+ std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+ auto local_state =
_helper.create_probe_local_state(_helper.runtime_state.get(),
+ probe_operator.get(),
shared_state);
+
+ SpillFileSPtr spill_file;
+ auto relative_path = fmt::format(
+ "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+ probe_operator->node_id(),
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+ ASSERT_TRUE(ExecEnv::GetInstance()
+ ->spill_file_mgr()
+ ->create_spill_file(relative_path, spill_file)
+ .ok());
+
+ {
+ SpillFileWriterSPtr writer;
+ ASSERT_TRUE(spill_file
+ ->create_writer(_helper.runtime_state.get(),
+ local_state->operator_profile(),
writer)
+ .ok());
+ ASSERT_TRUE(writer->close().ok());
+ }
+
+ _helper.runtime_state->cancel(Status::Cancelled("test cancel"));
+
+ JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+ auto status =
local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+
partition_info);
+ ASSERT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ ASSERT_NE(partition_info.build_file, nullptr);
+ ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
+
+ ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok());
+
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file);
+ partition_info.build_file.reset();
+}
+
TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskLargeData) {
// Similar setup as above...
auto [probe_operator, sink_operator] = _helper.create_operators();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]