This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 11a038d87d1 [fix](be) Stop extra operator work after cancellation 
(#64077)
11a038d87d1 is described below

commit 11a038d87d1cbf2a47ba5b6ca51ee49fc330f3c1
Author: zclllyybb <[email protected]>
AuthorDate: Mon Jun 8 12:14:44 2026 +0800

    [fix](be) Stop extra operator work after cancellation (#64077)
    
    Problem Summary: Some BE operator paths could keep doing unnecessary
    work after query cancellation had already been observed by RuntimeState.
    This covers three focused cases:
    
    - Spill revoke and recovery paths could still enter spill or repartition
    work after cancellation.
    - Nested loop join build close could still process cross runtime filters
    after cancellation, scanning build blocks and evaluating filter
    expressions.
    - Analytic sink could advance multiple buffered output blocks in one
    sink call without checking cancellation between blocks.
    
    This PR adds cancellation checks at the retained public entry points or
    block boundaries. The analytic sink change is intentionally
    conservative: it stops before advancing to another buffered output
    block, while preserving current-block processing semantics.
---
 be/src/exec/operator/analytic_sink_operator.cpp    |   5 +-
 be/src/exec/operator/analytic_sink_operator.h      |   2 +-
 .../operator/nested_loop_join_build_operator.cpp   |   5 +-
 .../partitioned_aggregation_sink_operator.cpp      |   2 +
 .../partitioned_aggregation_source_operator.cpp    |   4 +-
 .../partitioned_hash_join_probe_operator.cpp       |   3 +
 .../partitioned_hash_join_sink_operator.cpp        |   1 +
 .../operator/spill_iceberg_table_sink_operator.cpp |   1 +
 be/src/exec/operator/spill_sort_sink_operator.cpp  |   4 +-
 .../exec/operator/spill_sort_source_operator.cpp   |   3 +-
 be/src/exec/operator/spill_utils.h                 |   2 +
 .../exec/operator/analytic_sink_operator_test.cpp  |  62 ++++++++-
 .../nested_loop_join_build_operator_test.cpp       | 141 +++++++++++++++++++++
 .../partitioned_aggregation_sink_operator_test.cpp |  28 +++-
 ...artitioned_aggregation_source_operator_test.cpp |  48 +++++++
 .../partitioned_hash_join_probe_operator_test.cpp  |  33 +++++
 .../partitioned_hash_join_sink_operator_test.cpp   |  26 ++++
 .../operator/spill_sort_sink_operator_test.cpp     |  79 ++++++++++++
 .../operator/spill_sort_source_operator_test.cpp   |  20 +++
 19 files changed, 460 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/operator/analytic_sink_operator.cpp 
b/be/src/exec/operator/analytic_sink_operator.cpp
index cd15757a6eb..c7f2e42a57b 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -340,8 +340,9 @@ bool 
AnalyticSinkLocalState::_get_next_for_range_between(int64_t current_block_r
     return false;
 }
 
-Status AnalyticSinkLocalState::_execute_impl() {
+Status AnalyticSinkLocalState::_execute_impl(RuntimeState* state) {
     while (_output_block_index < _input_blocks.size()) {
+        RETURN_IF_CANCELLED(state);
         {
             _get_partition_by_end();
             // streaming_mode means no need get all parition data, could 
calculate data when it's arrived
@@ -753,7 +754,7 @@ Status 
AnalyticSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* input
     local_state._reserve_mem_size = 0;
     SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
     RETURN_IF_ERROR(_add_input_block(state, input_block));
-    RETURN_IF_ERROR(local_state._execute_impl());
+    RETURN_IF_ERROR(local_state._execute_impl(state));
     if (local_state._input_eos) {
         LockGuard lc(local_state._shared_state->sink_eos_lock);
         local_state._shared_state->sink_eos = true;
diff --git a/be/src/exec/operator/analytic_sink_operator.h 
b/be/src/exec/operator/analytic_sink_operator.h
index 62d8bcde692..31acbb80edd 100644
--- a/be/src/exec/operator/analytic_sink_operator.h
+++ b/be/src/exec/operator/analytic_sink_operator.h
@@ -73,7 +73,7 @@ public:
 
 private:
     friend class AnalyticSinkOperatorX;
-    Status _execute_impl();
+    Status _execute_impl(RuntimeState* state);
     // over(partition by k1 order by k2 range|rows unbounded preceding and 
unbounded following)
     bool _get_next_for_partition(int64_t current_block_rows, int64_t 
current_block_base_pos);
     // over(partition by k1 order by k2 range between unbounded preceding and 
current row)
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.cpp 
b/be/src/exec/operator/nested_loop_join_build_operator.cpp
index 857ac9318f2..286c7e5e053 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.cpp
+++ b/be/src/exec/operator/nested_loop_join_build_operator.cpp
@@ -53,7 +53,10 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 }
 
 Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
-    RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state, 
_shared_state->build_blocks));
+    if (!state->is_cancelled()) {
+        RETURN_IF_ERROR(
+                _runtime_filter_producer_helper->process(state, 
_shared_state->build_blocks));
+    }
     
_runtime_filter_producer_helper->collect_realtime_profile(custom_profile());
     RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
     return Status::OK();
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index 5563eafdc50..d26838345f2 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -356,6 +356,7 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
                                                        HashTableCtxType& 
context,
                                                        HashTableType& 
hash_table,
                                                        const size_t 
size_to_revoke, bool eos) {
+    RETURN_IF_CANCELLED(state);
     Status status;
 
     context.init_iterator();
@@ -427,6 +428,7 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
 }
 
 Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     if (_eos) {
         return Status::OK();
     }
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index c23d3c83dda..14acb727052 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -355,8 +355,9 @@ void PartitionedAggLocalState::_init_partition_queue() {
 
 Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* 
state,
                                                                 
AggSpillPartitionInfo& partition) {
+    RETURN_IF_CANCELLED(state);
     size_t accumulated_bytes = 0;
-    if (!partition.spill_file || state->is_cancelled()) {
+    if (!partition.spill_file) {
         return Status::OK();
     }
 
@@ -446,6 +447,7 @@ Status 
PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeSta
 }
 
 Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
     const int new_level = _current_partition.level + 1;
 
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 410556911f0..561f0873adc 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -333,6 +333,7 @@ bool PartitionedHashJoinProbeLocalState::is_blockable() 
const {
 
 Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
+    RETURN_IF_CANCELLED(state);
     if (!partition_info.build_file) {
         // Build file is already exhausted for this partition.
         return Status::OK();
@@ -383,6 +384,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
 
 Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
+    RETURN_IF_CANCELLED(state);
     if (!partition_info.probe_file) {
         // Probe file is already exhausted for this partition.
         return Status::OK();
@@ -995,6 +997,7 @@ Status 
PartitionedHashJoinProbeLocalState::revoke_build_data(RuntimeState* state
 //   repartitioned and pushed back to the queue so the hash table build can
 //   proceed later with a smaller footprint.
 Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, 
revoke_memory, child_eos:{}",
                               print_id(state->query_id()), node_id(), 
state->task_id(),
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
index 11c137ba8c2..acd69a2ef24 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
@@ -299,6 +299,7 @@ Status 
PartitionedHashJoinSinkLocalState::_finish_spilling(RuntimeState* state)
 /// because we use limit 1MB here. So we need to force spill all memory to 
disk to make sure we can make progress.
 Status 
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
 state,
                                                                             
bool force_spill) {
+    RETURN_IF_CANCELLED(state);
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
         auto status = Status::InternalError(
                 "fault_inject partitioned_hash_join_sink revoke_memory 
canceled");
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 5bc2bab0d14..46abbf5ebe3 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -79,6 +79,7 @@ size_t 
SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
 }
 
 Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     if (!_writer) {
         return Status::OK();
     }
diff --git a/be/src/exec/operator/spill_sort_sink_operator.cpp 
b/be/src/exec/operator/spill_sort_sink_operator.cpp
index 82a1bea731c..87098451e26 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.cpp
+++ b/be/src/exec/operator/spill_sort_sink_operator.cpp
@@ -187,6 +187,7 @@ size_t 
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
 }
 
 Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& parent = Base::_parent->template cast<Parent>();
     
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
     Defer defer {[&]() {
@@ -219,6 +220,7 @@ Status 
SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
 }
 
 Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& parent = Base::_parent->template cast<Parent>();
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
@@ -241,4 +243,4 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
     _shared_state->sorted_spill_groups.emplace_back(_spilling_file);
     return _execute_spill_sort(state);
 }
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp 
b/be/src/exec/operator/spill_sort_source_operator.cpp
index c40ab3e255a..9d1138553b2 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -82,6 +82,7 @@ int 
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
 }
 
 Status SpillSortLocalState::execute_merge_sort_spill_files(RuntimeState* 
state) {
+    RETURN_IF_CANCELLED(state);
     auto& parent = Base::_parent->template cast<Parent>();
     SCOPED_TIMER(_spill_merge_sort_timer);
     Status status;
@@ -262,4 +263,4 @@ Status 
SpillSortSourceOperatorX::get_block_impl(RuntimeState* state, Block* bloc
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/spill_utils.h 
b/be/src/exec/operator/spill_utils.h
index 131c1c9b8b7..7d9cb200bd1 100644
--- a/be/src/exec/operator/spill_utils.h
+++ b/be/src/exec/operator/spill_utils.h
@@ -78,8 +78,10 @@ struct SpillContext {
 // small utility to run the provided callbacks and forward cancellation.
 inline Status run_spill_task(RuntimeState* state, std::function<Status()> 
exec_func,
                              std::function<Status()> fin_cb = {}) {
+    RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(exec_func());
     if (fin_cb) {
+        RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(fin_cb());
     }
     return Status::OK();
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp 
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index bd865b9aca9..b5e5787e8a6 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -21,6 +21,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
 
 #include "core/block/block.h"
 #include "core/data_type/data_type.h"
@@ -53,6 +54,35 @@ private:
     std::unique_ptr<MockRowDescriptor> _mock_row_desc;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "analytic sink cancelled";
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+class CancelAfterChecksRuntimeState : public MockRuntimeState {
+public:
+    void reset_cancel_after(int64_t check_count) {
+        _check_count = 0;
+        _cancel_after = check_count;
+    }
+
+    bool is_cancelled() const override {
+        return _cancel_after >= 0 && ++_check_count > _cancel_after;
+    }
+
+    Status cancel_reason() const override { return 
Status::Cancelled(CANCEL_REASON); }
+
+private:
+    mutable int64_t _check_count = 0;
+    int64_t _cancel_after = -1;
+};
+
+} // namespace
+
 struct AnalyticSinkOperatorTest : public ::testing::Test {
     void Initialize(int batch_size) {
         sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
@@ -169,6 +199,36 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
     std::vector<int64_t> _data_vals;
 };
 
+TEST_F(AnalyticSinkOperatorTest, SinkReturnsCancelBeforeOutputBlock) {
+    Initialize(10);
+    create_operator(false, 0, "", {}, nullptr);
+    create_local_state();
+
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+    Block block = ColumnHelper::create_block<DataTypeInt64>({2, 3, 1});
+    expect_cancelled(sink->sink(state.get(), &block, true));
+}
+
+TEST_F(AnalyticSinkOperatorTest, 
SinkStopsBeforeNextBufferedBlockWhenCancelled) {
+    const int batch_size = 2;
+    Initialize(batch_size);
+    auto cancel_state = std::make_shared<CancelAfterChecksRuntimeState>();
+    cancel_state->_batch_size = batch_size;
+    state = cancel_state;
+    create_operator(false, 0, "", {}, nullptr);
+    create_local_state();
+
+    {
+        Block block = ColumnHelper::create_block<DataTypeInt64>({0, 1});
+        auto status = sink->sink(state.get(), &block, false);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    cancel_state->reset_cancel_after(1);
+    Block block = ColumnHelper::create_block<DataTypeInt64>({2, 3});
+    expect_cancelled(sink->sink(state.get(), &block, true));
+}
+
 TEST_F(AnalyticSinkOperatorTest, withoutAggFunction) {
     Initialize(10);
     create_operator(false, 0, "", {}, nullptr);
@@ -819,4 +879,4 @@ TEST_F(AnalyticSinkOperatorTest, AggFunction8) {
     std::cout << "######### AggFunction with row_number test end #########" << 
std::endl;
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/operator/nested_loop_join_build_operator_test.cpp 
b/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
new file mode 100644
index 00000000000..a4c1be9bc65
--- /dev/null
+++ b/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/operator/nested_loop_join_build_operator.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gtest/gtest.h>
+
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "exec/runtime_filter/runtime_filter_test_utils.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_profile.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris {
+namespace {
+
+TDescriptorTable create_desc_table() {
+    TDescriptorTableBuilder builder;
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .nullable(false)
+                              .column_name("probe_col")
+                              .column_pos(0)
+                              .build())
+            .build(&builder);
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .nullable(false)
+                              .column_name("build_col")
+                              .column_pos(0)
+                              .build())
+            .build(&builder);
+    return builder.desc_tbl();
+}
+
+TExpr build_slot_ref_expr() {
+    auto expr = TRuntimeFilterDescBuilder::get_default_expr();
+    expr.nodes[0].slot_ref.__set_slot_id(1);
+    expr.nodes[0].slot_ref.__set_tuple_id(1);
+    return expr;
+}
+
+TPlanNode create_nested_loop_join_plan_node() {
+    TPlanNode node;
+    node.node_id = 0;
+    node.node_type = TPlanNodeType::CROSS_JOIN_NODE;
+    node.num_children = 2;
+    node.limit = -1;
+
+    TNestedLoopJoinNode join_node;
+    join_node.__set_join_op(TJoinOp::INNER_JOIN);
+    node.__set_nested_loop_join_node(join_node);
+    node.row_tuples.push_back(0);
+    node.row_tuples.push_back(1);
+
+    auto src_expr = build_slot_ref_expr();
+    TRuntimeFilterDescBuilder runtime_filter_builder(0, src_expr, 0);
+    node.__isset.runtime_filters = true;
+    node.runtime_filters.push_back(runtime_filter_builder.build());
+    return node;
+}
+
+} // namespace
+
+class NestedLoopJoinBuildOperatorTest : public RuntimeFilterTest {};
+
+TEST_F(NestedLoopJoinBuildOperatorTest, 
CloseSkipsRuntimeFilterProcessWhenCancelled) {
+    ObjectPool pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    auto desc_table = create_desc_table();
+    auto status = DescriptorTbl::create(&pool, desc_table, &desc_tbl);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+
+    auto* state = _runtime_states[0].get();
+    state->set_desc_tbl(desc_tbl);
+
+    auto plan_node = create_nested_loop_join_plan_node();
+    auto sink_operator =
+            std::make_shared<NestedLoopJoinBuildSinkOperatorX>(&pool, 0, 1, 
plan_node, *desc_tbl);
+
+    auto child = std::make_shared<MockSourceOperator>();
+    child->_row_descriptor = RowDescriptor(*desc_tbl, {1});
+    ASSERT_TRUE(sink_operator->set_child(child));
+
+    status = sink_operator->init(plan_node, state);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+    status = sink_operator->prepare(state);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    ASSERT_NE(shared_state, nullptr);
+
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = &_profile,
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = shared_state_map,
+                             .tsink = TDataSink()};
+    status = sink_operator->setup_local_state(state, info);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+
+    auto* local_state =
+            
dynamic_cast<NestedLoopJoinBuildSinkLocalState*>(state->get_sink_local_state());
+    ASSERT_NE(local_state, nullptr);
+
+    status = local_state->open(state);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+    local_state->build_blocks().emplace_back();
+
+    state->cancel(Status::Cancelled("nested loop join build close cancelled"));
+    status = local_state->close(state, Status::OK());
+    ASSERT_TRUE(status.ok()) << status.to_string();
+}
+
+} // namespace doris
diff --git 
a/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp 
b/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
index 42cc320f183..15559910494 100644
--- a/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -43,6 +43,32 @@ protected:
     PartitionedAggregationTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned aggregation sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedAggregationSinkOperatorTest, 
RevokeMemoryReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_sink_local_state(_helper.runtime_state.get(),
+                                                        sink_operator.get(), 
shared_state);
+    ASSERT_NE(local_state, nullptr);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
@@ -709,4 +735,4 @@ TEST_F(PartitionedAggregationNullableKeySinkTest, 
SinkEOSFlushNullKeyOnly) {
     ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git 
a/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp 
b/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
index 2b13ad97f4c..14b567d7e31 100644
--- a/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
@@ -46,6 +46,54 @@ protected:
     PartitionedAggregationTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned aggregation source cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedAggregationSourceOperatorTest, 
RecoverBlocksFromPartitionReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+    AggSpillPartitionInfo partition;
+
+    cancel_state(_helper.runtime_state.get());
+    expect_cancelled(
+            
local_state->_recover_blocks_from_partition(_helper.runtime_state.get(), 
partition));
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, 
RevokeMemoryReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+    ASSERT_NE(local_state, nullptr);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(source_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, 
FlushAndRepartitionReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(local_state->_flush_and_repartition(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
diff --git 
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index 7e69aed26a9..4e782713b91 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -49,6 +49,17 @@ protected:
 
 namespace {
 
+constexpr auto CANCEL_REASON = "partitioned hash join probe cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
 SpillFileSPtr create_probe_test_spill_file(RuntimeState* state, 
RuntimeProfile* profile,
                                            int node_id, const std::string& 
prefix,
                                            const 
std::vector<std::vector<int32_t>>& batches) {
@@ -124,6 +135,28 @@ Status 
prepare_probe_local_state_for_repartition(PartitionedHashJoinProbeOperato
 
 } // namespace
 
+TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildAndProbeReturnCancelAtEntry) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto* local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                         probe_operator.get(), 
shared_state);
+    JoinSpillPartitionInfo build_partition(nullptr, nullptr, 0);
+    JoinSpillPartitionInfo probe_partition(nullptr, nullptr, 0);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                      
build_partition));
+    
expect_cancelled(local_state->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                                      
probe_partition));
+}
+
+TEST_F(PartitionedHashJoinProbeOperatorTest, RevokeMemoryReturnsCancelAtEntry) 
{
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(probe_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) {
     auto [probe_operator, sink_operator] = _helper.create_operators();
 
diff --git a/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp 
b/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
index 4beced723e0..6bd0653da49 100644
--- a/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -60,6 +60,32 @@ protected:
     PartitionedHashJoinTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned hash join sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto* local_state = 
_helper.create_sink_local_state(_helper.runtime_state.get(),
+                                                        sink_operator.get(), 
shared_state);
+    ASSERT_NE(local_state, nullptr);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedHashJoinSinkOperatorTest, Init) {
     TPlanNode tnode = _helper.create_test_plan_node();
     const DescriptorTbl& desc_tbl = _helper.runtime_state->desc_tbl();
diff --git a/be/test/exec/operator/spill_sort_sink_operator_test.cpp 
b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
index db47d9565a3..ead792a1e32 100644
--- a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
@@ -25,6 +25,7 @@
 #include "core/block/block.h"
 #include "core/data_type/data_type_number.h"
 #include "exec/operator/spill_sort_test_helper.h"
+#include "exec/operator/spill_utils.h"
 #include "exec/pipeline/dependency.h"
 #include "exec/pipeline/pipeline_task.h"
 #include "testutil/column_helper.h"
@@ -38,6 +39,84 @@ protected:
     SpillSortTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "spill sort sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(SpillSortSinkOperatorTest, ExecuteSpillSortReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    auto sink_local_state = 
SpillSortSinkLocalState::create_unique(sink_operator.get(),
+                                                                   
_helper.runtime_state.get());
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_local_state->_execute_spill_sort(_helper.runtime_state.get()));
+}
+
+TEST_F(SpillSortSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    ASSERT_TRUE(shared_state != nullptr);
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.operator_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .tsink = {}};
+
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
+TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAtEntry) {
+    cancel_state(_helper.runtime_state.get());
+
+    bool executed = false;
+    expect_cancelled(run_spill_task(_helper.runtime_state.get(), [&]() {
+        executed = true;
+        return Status::OK();
+    }));
+    EXPECT_FALSE(executed);
+}
+
+TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAfterCallback) {
+    bool finalized = false;
+
+    auto status = run_spill_task(
+            _helper.runtime_state.get(),
+            [&]() {
+                cancel_state(_helper.runtime_state.get());
+                return Status::OK();
+            },
+            [&]() {
+                finalized = true;
+                return Status::OK();
+            });
+    expect_cancelled(status);
+    EXPECT_FALSE(finalized);
+}
+
 TEST_F(SpillSortSinkOperatorTest, Basic) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
diff --git a/be/test/exec/operator/spill_sort_source_operator_test.cpp 
b/be/test/exec/operator/spill_sort_source_operator_test.cpp
index 9f458699fa9..e4eed08b42c 100644
--- a/be/test/exec/operator/spill_sort_source_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_source_operator_test.cpp
@@ -188,8 +188,28 @@ void delete_spill_files(const std::vector<SpillFileSPtr>& 
spill_files) {
     }
 }
 
+constexpr auto CANCEL_REASON = "spill sort source cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
 } // namespace
 
+TEST_F(SpillSortSourceOperatorTest, 
ExecuteMergeSortSpillFilesReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    auto local_state = 
std::make_unique<SpillSortLocalState>(_helper.runtime_state.get(),
+                                                             
source_operator.get());
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(local_state->execute_merge_sort_spill_files(_helper.runtime_state.get()));
+}
+
 TEST_F(SpillSortSourceOperatorTest, Basic) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to