This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 399d9d8f2bb [refactor](be) Remove redundant spill state (#64083)
399d9d8f2bb is described below
commit 399d9d8f2bb1f1ec085183a5f7bc0dce50531aa0
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Jun 10 15:29:33 2026 +0800
[refactor](be) Remove redundant spill state (#64083)
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary:
Remove redundant spill-related state that is no longer used by the
execution path. This includes unused spill task helpers, obsolete spill
file counters, and stale opened-state tracking. The remaining spill file
size accounting that is still used by readers/writers is kept intact.
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [x] Regression test
- `./run-regression-test.sh --conf
/tmp/doris-spill-regression-oldport.groovy --run -d spill_p0 -s
aggregate_spill` (passed)
- [x] Unit Test
- `./run-be-ut.sh --run --filter='*Spill*:*MultiCastDataStreamer*' -j
64` (passed, 82 tests)
- [x] Manual test (add detailed scripts or steps below)
- `./build.sh --be --fe` (passed as part of local regression
preparation)
- Targeted BE object compilation for touched spill/operator files
(passed)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/operator/multi_cast_data_streamer.cpp | 9 +++----
be/src/exec/operator/operator.h | 7 ------
.../partitioned_aggregation_sink_operator.cpp | 4 +--
.../operator/spill_iceberg_table_sink_operator.cpp | 4 +--
.../exec/operator/spill_sort_source_operator.cpp | 4 ---
be/src/exec/operator/spill_sort_source_operator.h | 4 +--
be/src/exec/operator/spill_utils.h | 17 -------------
be/src/exec/spill/spill_file_writer.cpp | 5 +---
be/src/exec/spill/spill_file_writer.h | 3 ---
be/src/runtime/runtime_profile_counter_names.h | 1 -
.../operator/spill_sort_sink_operator_test.cpp | 29 ----------------------
.../operator/spillable_operator_test_helper.cpp | 3 +--
.../pipeline/multi_cast_data_streamer_test.cpp | 4 ---
be/test/vec/spill/spill_file_test.cpp | 1 -
be/test/vec/spill/spill_repartitioner_test.cpp | 1 -
15 files changed, 9 insertions(+), 87 deletions(-)
diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp
b/be/src/exec/operator/multi_cast_data_streamer.cpp
index 403d8111018..34b09bc712d 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.cpp
+++ b/be/src/exec/operator/multi_cast_data_streamer.cpp
@@ -30,7 +30,6 @@
#include "common/status.h"
#include "core/block/block.h"
#include "exec/operator/multi_cast_data_stream_source.h"
-#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
#include "exec/spill/spill_file_manager.h"
#include "exec/spill/spill_file_reader.h"
@@ -121,9 +120,8 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int
sender_idx, Block* b
};
l.unlock();
- // spill is synchronous; the profile passed to the runnable was
only
- // used for counters that are now tracked externally, so call
helper
- return run_spill_task(state, catch_exception_func);
+ RETURN_IF_CANCELLED(state);
+ return catch_exception_func();
}
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
@@ -279,7 +277,8 @@ Status
MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillFileSP
return status;
};
- return run_spill_task(state, exception_catch_func);
+ RETURN_IF_CANCELLED(state);
+ return exception_catch_func();
}
Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block,
bool eos) {
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index a09638f6d71..8585f0be741 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -417,8 +417,6 @@ public:
_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::custom_profile(),
profile::SPILL_WRITE_FILE_CURRENT_BYTES, TUnit::BYTES, 1);
- _spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
- Base::custom_profile(),
profile::SPILL_WRITE_FILE_CURRENT_COUNT, TUnit::UNIT, 1);
}
// Total time of spill, including spill task scheduling time,
@@ -441,9 +439,6 @@ public:
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
- RuntimeProfile::Counter* _spill_file_current_count = nullptr;
- // Spilled file total size
- RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Current spilled file size
RuntimeProfile::Counter* _spill_file_current_size = nullptr;
@@ -806,8 +801,6 @@ public:
RuntimeProfile::Counter*& _spill_write_rows_count =
_write_counters.spill_write_rows_count;
// Sink-only counters
- // Spilled file total size
- RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Total bytes written to spill files (required by SpillFileWriter)
RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
// Total number of spill files created (required by SpillFileWriter)
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index d26838345f2..677077f4766 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -504,9 +504,7 @@ Status
PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
return status;
};
- // old code used SpillSinkRunnable, but spills are synchronous and counters
- // are tracked externally. Call the spill function directly.
- return run_spill_task(state, std::move(spill_func));
+ return spill_func();
}
void PartitionedAggSinkLocalState::_reset_tmp_data() {
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 46abbf5ebe3..cf7c8e6e1a1 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -19,7 +19,6 @@
#include "common/status.h"
#include "exec/operator/iceberg_table_sink_operator.h"
-#include "exec/operator/spill_utils.h"
#include "exec/sink/writer/iceberg/viceberg_sort_writer.h"
#include "exec/sink/writer/iceberg/viceberg_table_writer.h"
@@ -96,7 +95,7 @@ Status
SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
return status;
};
- return run_spill_task(state, exception_catch_func);
+ return exception_catch_func();
}
SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX(
@@ -170,7 +169,6 @@ void
SpillIcebergTableSinkLocalState::_init_spill_counters() {
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes",
TUnit::BYTES, 1);
- ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT,
1);
}
} // namespace doris
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp
b/be/src/exec/operator/spill_sort_source_operator.cpp
index 9d1138553b2..40f15832cfe 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -50,10 +50,6 @@ Status SpillSortLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
Status SpillSortLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- if (_opened) {
- return Status::OK();
- }
-
RETURN_IF_ERROR(setup_in_memory_sort_op(state));
return Base::open(state);
}
diff --git a/be/src/exec/operator/spill_sort_source_operator.h
b/be/src/exec/operator/spill_sort_source_operator.h
index dbc8c3120ea..25fdd306b4b 100644
--- a/be/src/exec/operator/spill_sort_source_operator.h
+++ b/be/src/exec/operator/spill_sort_source_operator.h
@@ -57,8 +57,6 @@ protected:
friend class SpillSortSourceOperatorX;
std::unique_ptr<RuntimeState> _runtime_state;
- bool _opened = false;
-
std::vector<SpillFileSPtr> _current_merging_files;
/// Readers held alive during merge; one per SpillFile, reads parts
sequentially.
std::vector<SpillFileReaderSPtr> _current_merging_readers;
@@ -90,4 +88,4 @@ private:
std::unique_ptr<SortSourceOperatorX> _sort_source_operator;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/spill_utils.h
b/be/src/exec/operator/spill_utils.h
index 7d9cb200bd1..dff39d5212f 100644
--- a/be/src/exec/operator/spill_utils.h
+++ b/be/src/exec/operator/spill_utils.h
@@ -70,23 +70,6 @@ struct SpillContext {
}
};
-// helper to execute a spill function synchronously. The old code used
-// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track
-// counters and optionally notify a SpillContext. Since spill operations are
-// now performed synchronously and external code already maintains any
-// necessary counters, those wrappers are no longer necessary. We keep a
-// small utility to run the provided callbacks and forward cancellation.
-inline Status run_spill_task(RuntimeState* state, std::function<Status()>
exec_func,
- std::function<Status()> fin_cb = {}) {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(exec_func());
- if (fin_cb) {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(fin_cb());
- }
- return Status::OK();
-}
-
template <bool accumulating>
inline void update_profile_from_inner_profile(const std::string& name,
RuntimeProfile* runtime_profile,
diff --git a/be/src/exec/spill/spill_file_writer.cpp
b/be/src/exec/spill/spill_file_writer.cpp
index 60ddb68c26b..d68d52c9640 100644
--- a/be/src/exec/spill/spill_file_writer.cpp
+++ b/be/src/exec/spill/spill_file_writer.cpp
@@ -97,7 +97,6 @@ Status SpillFileWriter::_close_current_part(const
std::shared_ptr<SpillFile>& sp
int64_t meta_size = _part_meta.size();
_part_written_bytes += meta_size;
- _total_written_bytes += meta_size;
COUNTER_UPDATE(_write_file_total_size, meta_size);
if (_resource_ctx) {
_resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_size);
@@ -118,7 +117,6 @@ Status SpillFileWriter::_close_current_part(const
std::shared_ptr<SpillFile>& sp
// Advance to next part
++_current_part_index;
- ++_total_parts;
if (spill_file) {
spill_file->increment_part_count();
}
@@ -251,7 +249,6 @@ Status SpillFileWriter::_write_internal(const Block& block,
}
COUNTER_UPDATE(_write_block_counter, 1);
_part_written_bytes += buff_size;
- _total_written_bytes += buff_size;
++_part_written_blocks;
// Incrementally update SpillFile so gc() can always
// decrement the correct amount from _data_dir.
@@ -269,4 +266,4 @@ Status SpillFileWriter::_write_internal(const Block& block,
return status;
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/spill/spill_file_writer.h
b/be/src/exec/spill/spill_file_writer.h
index 215685933f4..eb8c7a9d9eb 100644
--- a/be/src/exec/spill/spill_file_writer.h
+++ b/be/src/exec/spill/spill_file_writer.h
@@ -89,9 +89,6 @@ private:
size_t _part_max_sub_block_size = 0;
std::string _part_meta;
- // ── Cumulative state ──
- int64_t _total_written_bytes = 0;
- size_t _total_parts = 0;
bool _closed = false;
// ── Counters ──
diff --git a/be/src/runtime/runtime_profile_counter_names.h
b/be/src/runtime/runtime_profile_counter_names.h
index bd4d2e878a8..5813d5e0ea9 100644
--- a/be/src/runtime/runtime_profile_counter_names.h
+++ b/be/src/runtime/runtime_profile_counter_names.h
@@ -104,7 +104,6 @@ inline constexpr char SPILL_WRITE_ROWS[] = "SpillWriteRows";
inline constexpr char SPILL_WRITE_FILE_BYTES[] = "SpillWriteFileBytes";
inline constexpr char SPILL_WRITE_FILE_TOTAL_COUNT[] =
"SpillWriteFileTotalCount";
inline constexpr char SPILL_WRITE_FILE_CURRENT_BYTES[] =
"SpillWriteFileCurrentBytes";
-inline constexpr char SPILL_WRITE_FILE_CURRENT_COUNT[] =
"SpillWriteFileCurrentCount";
// ============================================================
// Spill read counters (Source-only)
diff --git a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
index ead792a1e32..77ac2785cc7 100644
--- a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
@@ -25,7 +25,6 @@
#include "core/block/block.h"
#include "core/data_type/data_type_number.h"
#include "exec/operator/spill_sort_test_helper.h"
-#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
#include "exec/pipeline/pipeline_task.h"
#include "testutil/column_helper.h"
@@ -89,34 +88,6 @@ TEST_F(SpillSortSinkOperatorTest,
RevokeMemoryReturnsCancelAtEntry) {
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
}
-TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAtEntry) {
- cancel_state(_helper.runtime_state.get());
-
- bool executed = false;
- expect_cancelled(run_spill_task(_helper.runtime_state.get(), [&]() {
- executed = true;
- return Status::OK();
- }));
- EXPECT_FALSE(executed);
-}
-
-TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAfterCallback) {
- bool finalized = false;
-
- auto status = run_spill_task(
- _helper.runtime_state.get(),
- [&]() {
- cancel_state(_helper.runtime_state.get());
- return Status::OK();
- },
- [&]() {
- finalized = true;
- return Status::OK();
- });
- expect_cancelled(status);
- EXPECT_FALSE(finalized);
-}
-
TEST_F(SpillSortSinkOperatorTest, Basic) {
auto [source_operator, sink_operator] = _helper.create_operators();
ASSERT_TRUE(source_operator != nullptr);
diff --git a/be/test/exec/operator/spillable_operator_test_helper.cpp
b/be/test/exec/operator/spillable_operator_test_helper.cpp
index fb87b156545..d51d6d17be1 100644
--- a/be/test/exec/operator/spillable_operator_test_helper.cpp
+++ b/be/test/exec/operator/spillable_operator_test_helper.cpp
@@ -60,7 +60,6 @@ void SpillableOperatorTestHelper::SetUp() {
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadRows", TUnit::UNIT,
1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount",
TUnit::UNIT, 1);
- ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes",
TUnit::UNIT, 1);
operator_profile->add_child(custom_profile.get(), true);
@@ -107,4 +106,4 @@ void SpillableOperatorTestHelper::TearDown() {
SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
index 61589327909..e9f1a58aecc 100644
--- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
+++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
@@ -65,8 +65,6 @@ public:
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillWriteFileTotalCount", TUnit::UNIT,
1);
- ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillWriteFileCurrentCount", TUnit::UNIT,
- 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillWriteFileCurrentBytes", TUnit::UNIT,
1);
}
@@ -111,8 +109,6 @@ public:
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillWriteFileCurrentBytes",
TUnit::BYTES, 1);
- ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillWriteFileCurrentCount",
- TUnit::UNIT, 1);
multi_cast_data_streamer->set_source_profile(i,
source_profiles[i].get());
}
diff --git a/be/test/vec/spill/spill_file_test.cpp
b/be/test/vec/spill/spill_file_test.cpp
index 67740acdcd2..09173a7a343 100644
--- a/be/test/vec/spill/spill_file_test.cpp
+++ b/be/test/vec/spill/spill_file_test.cpp
@@ -72,7 +72,6 @@ protected:
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
- ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileCurrentCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileCurrentBytes", TUnit::UNIT, 1);
_profile->add_child(_custom_profile.get(), true);
diff --git a/be/test/vec/spill/spill_repartitioner_test.cpp
b/be/test/vec/spill/spill_repartitioner_test.cpp
index 53ffcf9f0ac..01da4719cad 100644
--- a/be/test/vec/spill/spill_repartitioner_test.cpp
+++ b/be/test/vec/spill/spill_repartitioner_test.cpp
@@ -74,7 +74,6 @@ protected:
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
- ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileCurrentCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileCurrentBytes", TUnit::UNIT, 1);
_profile->add_child(_custom_profile.get(), true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]