This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e191478b079 branch-4.1: [Chore](be) Stop spill hash join repartition
on cancel #63456 (#63532)
e191478b079 is described below
commit e191478b0796b95aa052b7bd1e275c6723369362
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 22 22:41:54 2026 +0800
branch-4.1: [Chore](be) Stop spill hash join repartition on cancel #63456
(#63532)
Cherry-picked from #63456
Co-authored-by: Pxl <[email protected]>
---
.../partitioned_hash_join_probe_operator.cpp | 19 ++++++++---
.../partitioned_hash_join_probe_operator_test.cpp | 39 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 1bbc1972bec..25161399c3a 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -36,6 +36,7 @@
#include "exec/spill/spill_repartitioner.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_profile.h"
+#include "runtime/runtime_state.h"
namespace doris {
@@ -217,6 +218,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
}
_current_probe_reader.reset();
}
+ _recovered_build_block.reset();
// Clean up any remaining spill partition queue entries
for (auto& entry : _spill_partition_queue) {
@@ -349,7 +351,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
RETURN_IF_ERROR(_current_build_reader->open());
}
bool eos = false;
- while (!eos) {
+ while (!eos && !state->is_cancelled()) {
Block block;
RETURN_IF_ERROR(_current_build_reader->read(&block, &eos));
COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -373,6 +375,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
return Status::OK(); // yield — buffer full, more data may remain
}
}
+ RETURN_IF_CANCELLED(state);
// Build file fully consumed.
RETURN_IF_ERROR(_current_build_reader->close());
_current_build_reader.reset();
@@ -409,6 +412,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
return Status::OK(); // yield — enough data read
}
}
+ RETURN_IF_CANCELLED(state);
// Probe file fully consumed.
RETURN_IF_ERROR(_current_probe_reader->close());
_current_probe_reader.reset();
@@ -416,6 +420,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
return Status::OK();
}
+//
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
existing spill repartition state machine handles build/probe phases together.
Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
RuntimeState* state, JoinSpillPartitionInfo& partition) {
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -474,6 +479,7 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
}
}
RETURN_IF_ERROR(_repartitioner.finalize());
+ RETURN_IF_CANCELLED(state);
_recovered_build_block.reset();
_current_build_reader.reset(); // clear any leftover reader state
partition.build_file.reset();
@@ -497,9 +503,9 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
while (!done && !state->is_cancelled()) {
RETURN_IF_ERROR(_repartitioner.repartition(state,
partition.probe_file, &done));
}
- partition.probe_file.reset();
-
RETURN_IF_ERROR(_repartitioner.finalize());
+ RETURN_IF_CANCELLED(state);
+ partition.probe_file.reset();
_current_probe_reader.reset();
}
@@ -698,7 +704,11 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
// per-partition build and probe spill streams. After this point every
partition
// (including the original "level-0" ones) is accessed uniformly via the
queue.
if (!local_state._spill_queue_initialized) {
- DCHECK(local_state._child_eos) << "pull() with is_spilled=true called
before child EOS";
+ if (UNLIKELY(!local_state._child_eos)) {
+ return Status::InternalError(
+ "query:{}, node:{}, pull() with is_spilled=true called
before child EOS",
+ print_id(state->query_id()), node_id());
+ }
// There maybe some blocks still in partitioned block or probe blocks.
Flush them to disk.
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
// Close all probe writers so that SpillFile metadata (part_count,
etc.)
@@ -731,6 +741,7 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
return _pull_from_spill_queue(local_state, state, output_block, eos);
}
+//
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
existing spill queue pull handles setup, recovery, and probing phases.
Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state,
Block* output_block,
bool* eos) const {
diff --git
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index 0bcb609438e..2ce5f1921b0 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1139,6 +1139,45 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskEmpty) {
ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
}
+TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) {
+ auto [probe_operator, sink_operator] = _helper.create_operators();
+
+ std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+ auto local_state =
_helper.create_probe_local_state(_helper.runtime_state.get(),
+ probe_operator.get(),
shared_state);
+
+ SpillFileSPtr spill_file;
+ auto relative_path = fmt::format(
+ "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+ probe_operator->node_id(),
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+ ASSERT_TRUE(ExecEnv::GetInstance()
+ ->spill_file_mgr()
+ ->create_spill_file(relative_path, spill_file)
+ .ok());
+
+ {
+ SpillFileWriterSPtr writer;
+ ASSERT_TRUE(spill_file
+ ->create_writer(_helper.runtime_state.get(),
+ local_state->operator_profile(),
writer)
+ .ok());
+ ASSERT_TRUE(writer->close().ok());
+ }
+
+ _helper.runtime_state->cancel(Status::Cancelled("test cancel"));
+
+ JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+ auto status =
local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+
partition_info);
+ ASSERT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ ASSERT_NE(partition_info.build_file, nullptr);
+ ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
+
+ ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok());
+
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file);
+ partition_info.build_file.reset();
+}
+
TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskLargeData) {
// Similar setup as above...
auto [probe_operator, sink_operator] = _helper.create_operators();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]