This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 758f8c6143e branch-4.1: [fix](be) Move partitioned agg shared cleanup
to shared state #63253 (#63287)
758f8c6143e is described below
commit 758f8c6143e87b42c588a4cc9e0372f27d78bd40
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 20 15:58:30 2026 +0800
branch-4.1: [fix](be) Move partitioned agg shared cleanup to shared state
#63253 (#63287)
Cherry-picked from #63253
Co-authored-by: Jerry Hu <[email protected]>
---
be/src/exec/operator/partitioned_aggregation_source_operator.cpp | 7 -------
be/src/exec/pipeline/dependency.cpp | 5 +++++
be/src/exec/pipeline/dependency.h | 6 +++++-
be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp | 4 +++-
4 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index ea57cc0091d..057915cac93 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -153,13 +153,6 @@ Status
PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::close(state));
- // Centralize shared_state cleanup here so resources are released when
- // the pipeline task finishes, matching the Sort operator pattern.
- auto& local_state = get_local_state(state);
- if (local_state._shared_state) {
- local_state._shared_state->close();
- }
-
return _agg_source_operator->close(state);
}
diff --git a/be/src/exec/pipeline/dependency.cpp
b/be/src/exec/pipeline/dependency.cpp
index 87d53c20991..014f3182183 100644
--- a/be/src/exec/pipeline/dependency.cpp
+++ b/be/src/exec/pipeline/dependency.cpp
@@ -312,6 +312,11 @@ Status AggSharedState::reset_hash_table() {
}
void PartitionedAggSharedState::close() {
+ bool false_close = false;
+ if (!is_closed.compare_exchange_strong(false_close, true)) {
+ return;
+ }
+
for (auto& partition : _spill_partitions) {
if (partition) {
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition);
diff --git a/be/src/exec/pipeline/dependency.h
b/be/src/exec/pipeline/dependency.h
index 2c6e3b9aef2..dec1d3aaeea 100644
--- a/be/src/exec/pipeline/dependency.h
+++ b/be/src/exec/pipeline/dependency.h
@@ -428,7 +428,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
PartitionedAggSharedState() = default;
- ~PartitionedAggSharedState() override = default;
+ ~PartitionedAggSharedState() override { close(); }
void close();
@@ -437,6 +437,10 @@ struct PartitionedAggSharedState : public BasicSharedState,
// partition count is no longer stored in shared state; operators maintain
their own
std::atomic<bool> _is_spilled = false;
+ // This state is shared by the partitioned agg sink and source pipelines.
Spill files left
+ // here are owned by the shared state until the source moves them into its
local queue, so the
+ // cleanup must be tied to the shared state's lifetime and must be
idempotent.
+ std::atomic_bool is_closed = false;
std::deque<SpillFileSPtr> _spill_partitions;
};
diff --git a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
index 4118d923e57..b07b5527637 100644
--- a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
+++ b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
@@ -130,7 +130,9 @@ TEST_F(PartitionedAggSharedStateTest,
CloseCalledMultipleTimes) {
for (int round = 0; round < 5; ++round) {
state._spill_partitions.emplace_back(nullptr);
state.close();
- ASSERT_TRUE(state._spill_partitions.empty());
+
+ // repeatly calling close should not cause issues but also should not
do anything after the first call.
+ ASSERT_EQ(state._spill_partitions.empty(), round == 0) << "After round
" << round;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]