This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a8fd0a117ab [fix](be) Move partitioned agg shared cleanup to shared
state (#63253)
a8fd0a117ab is described below
commit a8fd0a117abf74670d7e57feda42f054aa56b9b9
Author: Jerry Hu <[email protected]>
AuthorDate: Fri May 15 15:05:15 2026 +0800
[fix](be) Move partitioned agg shared cleanup to shared state (#63253)
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Partitioned aggregation source close cleaned a shared
state owned by both sink and source pipelines. This could make shared
spill cleanup depend on a local source operator lifecycle even though
the state is shared. This PR moves leftover spill file cleanup to
`PartitionedAggSharedState` itself and makes `close()` idempotent.
### Release note
None
### Check List (For Author)
- Test: Manual test
- `build-support/clang-format.sh`
- `build-support/check-format.sh`
- `git diff --check`
- Behavior changed: No
- Does this need documentation: No
---
be/src/exec/operator/partitioned_aggregation_source_operator.cpp | 7 -------
be/src/exec/pipeline/dependency.cpp | 5 +++++
be/src/exec/pipeline/dependency.h | 6 +++++-
be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp | 4 +++-
4 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index 6425cb3b928..4d69c49bb7a 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -151,13 +151,6 @@ Status
PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::close(state));
- // Centralize shared_state cleanup here so resources are released when
- // the pipeline task finishes, matching the Sort operator pattern.
- auto& local_state = get_local_state(state);
- if (local_state._shared_state) {
- local_state._shared_state->close();
- }
-
return _agg_source_operator->close(state);
}
diff --git a/be/src/exec/pipeline/dependency.cpp
b/be/src/exec/pipeline/dependency.cpp
index 8ef37de1d49..97fd6d037d4 100644
--- a/be/src/exec/pipeline/dependency.cpp
+++ b/be/src/exec/pipeline/dependency.cpp
@@ -313,6 +313,11 @@ Status AggSharedState::reset_hash_table() {
}
void PartitionedAggSharedState::close() {
+ bool false_close = false;
+ if (!is_closed.compare_exchange_strong(false_close, true)) {
+ return;
+ }
+
for (auto& partition : _spill_partitions) {
if (partition) {
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition);
diff --git a/be/src/exec/pipeline/dependency.h
b/be/src/exec/pipeline/dependency.h
index dbb4b938ad4..9ffbb2c5c7a 100644
--- a/be/src/exec/pipeline/dependency.h
+++ b/be/src/exec/pipeline/dependency.h
@@ -606,7 +606,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
PartitionedAggSharedState() = default;
- ~PartitionedAggSharedState() override = default;
+ ~PartitionedAggSharedState() override { close(); }
void close();
@@ -615,6 +615,10 @@ struct PartitionedAggSharedState : public BasicSharedState,
// partition count is no longer stored in shared state; operators maintain
their own
std::atomic<bool> _is_spilled = false;
+ // This state is shared by the partitioned agg sink and source pipelines.
Spill files left
+ // here are owned by the shared state until the source moves them into its
local queue, so the
+ // cleanup must be tied to the shared state's lifetime and must be
idempotent.
+ std::atomic_bool is_closed = false;
std::deque<SpillFileSPtr> _spill_partitions;
};
diff --git a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
index 4118d923e57..b07b5527637 100644
--- a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
+++ b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
@@ -130,7 +130,9 @@ TEST_F(PartitionedAggSharedStateTest,
CloseCalledMultipleTimes) {
for (int round = 0; round < 5; ++round) {
state._spill_partitions.emplace_back(nullptr);
state.close();
- ASSERT_TRUE(state._spill_partitions.empty());
+
+ // repeatly calling close should not cause issues but also should not
do anything after the first call.
+ ASSERT_EQ(state._spill_partitions.empty(), round == 0) << "After round
" << round;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]