This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a8fd0a117ab [fix](be) Move partitioned agg shared cleanup to shared 
state (#63253)
a8fd0a117ab is described below

commit a8fd0a117abf74670d7e57feda42f054aa56b9b9
Author: Jerry Hu <[email protected]>
AuthorDate: Fri May 15 15:05:15 2026 +0800

    [fix](be) Move partitioned agg shared cleanup to shared state (#63253)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: None
    
    Problem Summary: Partitioned aggregation source close cleaned a shared
    state owned by both sink and source pipelines. This could make shared
    spill cleanup depend on a local source operator lifecycle even though
    the state is shared. This PR moves leftover spill file cleanup to
    `PartitionedAggSharedState` itself and makes `close()` idempotent.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Manual test
        - `build-support/clang-format.sh`
        - `build-support/check-format.sh`
        - `git diff --check`
    - Behavior changed: No
    - Does this need documentation: No
---
 be/src/exec/operator/partitioned_aggregation_source_operator.cpp | 7 -------
 be/src/exec/pipeline/dependency.cpp                              | 5 +++++
 be/src/exec/pipeline/dependency.h                                | 6 +++++-
 be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp      | 4 +++-
 4 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index 6425cb3b928..4d69c49bb7a 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -151,13 +151,6 @@ Status 
PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
 Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
     RETURN_IF_ERROR(OperatorXBase::close(state));
 
-    // Centralize shared_state cleanup here so resources are released when
-    // the pipeline task finishes, matching the Sort operator pattern.
-    auto& local_state = get_local_state(state);
-    if (local_state._shared_state) {
-        local_state._shared_state->close();
-    }
-
     return _agg_source_operator->close(state);
 }
 
diff --git a/be/src/exec/pipeline/dependency.cpp 
b/be/src/exec/pipeline/dependency.cpp
index 8ef37de1d49..97fd6d037d4 100644
--- a/be/src/exec/pipeline/dependency.cpp
+++ b/be/src/exec/pipeline/dependency.cpp
@@ -313,6 +313,11 @@ Status AggSharedState::reset_hash_table() {
 }
 
 void PartitionedAggSharedState::close() {
+    bool false_close = false;
+    if (!is_closed.compare_exchange_strong(false_close, true)) {
+        return;
+    }
+
     for (auto& partition : _spill_partitions) {
         if (partition) {
             
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition);
diff --git a/be/src/exec/pipeline/dependency.h 
b/be/src/exec/pipeline/dependency.h
index dbb4b938ad4..9ffbb2c5c7a 100644
--- a/be/src/exec/pipeline/dependency.h
+++ b/be/src/exec/pipeline/dependency.h
@@ -606,7 +606,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
     ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
 
     PartitionedAggSharedState() = default;
-    ~PartitionedAggSharedState() override = default;
+    ~PartitionedAggSharedState() override { close(); }
 
     void close();
 
@@ -615,6 +615,10 @@ struct PartitionedAggSharedState : public BasicSharedState,
 
     // partition count is no longer stored in shared state; operators maintain 
their own
     std::atomic<bool> _is_spilled = false;
+    // This state is shared by the partitioned agg sink and source pipelines. 
Spill files left
+    // here are owned by the shared state until the source moves them into its 
local queue, so the
+    // cleanup must be tied to the shared state's lifetime and must be 
idempotent.
+    std::atomic_bool is_closed = false;
     std::deque<SpillFileSPtr> _spill_partitions;
 };
 
diff --git a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp 
b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
index 4118d923e57..b07b5527637 100644
--- a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
+++ b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
@@ -130,7 +130,9 @@ TEST_F(PartitionedAggSharedStateTest, 
CloseCalledMultipleTimes) {
     for (int round = 0; round < 5; ++round) {
         state._spill_partitions.emplace_back(nullptr);
         state.close();
-        ASSERT_TRUE(state._spill_partitions.empty());
+
+        // repeatly calling close should not cause issues but also should not 
do anything after the first call.
+        ASSERT_EQ(state._spill_partitions.empty(), round == 0) << "After round 
" << round;
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to