This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 399d9d8f2bb [refactor](be) Remove redundant spill state (#64083)
399d9d8f2bb is described below

commit 399d9d8f2bb1f1ec085183a5f7bc0dce50531aa0
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Jun 10 15:29:33 2026 +0800

    [refactor](be) Remove redundant spill state (#64083)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: None
    
    Problem Summary:
    
    Remove redundant spill-related state that is no longer used by the
    execution path. This includes unused spill task helpers, obsolete spill
    file counters, and stale opened-state tracking. The remaining spill file
    size accounting that is still used by readers/writers is kept intact.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [x] Regression test
    - `./run-regression-test.sh --conf
    /tmp/doris-spill-regression-oldport.groovy --run -d spill_p0 -s
    aggregate_spill` (passed)
        - [x] Unit Test
    - `./run-be-ut.sh --run --filter='*Spill*:*MultiCastDataStreamer*' -j
    64` (passed, 82 tests)
        - [x] Manual test (add detailed scripts or steps below)
    - `./build.sh --be --fe` (passed as part of local regression
    preparation)
    - Targeted BE object compilation for touched spill/operator files
    (passed)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/operator/multi_cast_data_streamer.cpp  |  9 +++----
 be/src/exec/operator/operator.h                    |  7 ------
 .../partitioned_aggregation_sink_operator.cpp      |  4 +--
 .../operator/spill_iceberg_table_sink_operator.cpp |  4 +--
 .../exec/operator/spill_sort_source_operator.cpp   |  4 ---
 be/src/exec/operator/spill_sort_source_operator.h  |  4 +--
 be/src/exec/operator/spill_utils.h                 | 17 -------------
 be/src/exec/spill/spill_file_writer.cpp            |  5 +---
 be/src/exec/spill/spill_file_writer.h              |  3 ---
 be/src/runtime/runtime_profile_counter_names.h     |  1 -
 .../operator/spill_sort_sink_operator_test.cpp     | 29 ----------------------
 .../operator/spillable_operator_test_helper.cpp    |  3 +--
 .../pipeline/multi_cast_data_streamer_test.cpp     |  4 ---
 be/test/vec/spill/spill_file_test.cpp              |  1 -
 be/test/vec/spill/spill_repartitioner_test.cpp     |  1 -
 15 files changed, 9 insertions(+), 87 deletions(-)

diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp 
b/be/src/exec/operator/multi_cast_data_streamer.cpp
index 403d8111018..34b09bc712d 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.cpp
+++ b/be/src/exec/operator/multi_cast_data_streamer.cpp
@@ -30,7 +30,6 @@
 #include "common/status.h"
 #include "core/block/block.h"
 #include "exec/operator/multi_cast_data_stream_source.h"
-#include "exec/operator/spill_utils.h"
 #include "exec/pipeline/dependency.h"
 #include "exec/spill/spill_file_manager.h"
 #include "exec/spill/spill_file_reader.h"
@@ -121,9 +120,8 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int 
sender_idx, Block* b
             };
 
             l.unlock();
-            // spill is synchronous; the profile passed to the runnable was 
only
-            // used for counters that are now tracked externally, so call 
helper
-            return run_spill_task(state, catch_exception_func);
+            RETURN_IF_CANCELLED(state);
+            return catch_exception_func();
         }
 
         auto& pos_to_pull = _sender_pos_to_read[sender_idx];
@@ -279,7 +277,8 @@ Status 
MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillFileSP
         return status;
     };
 
-    return run_spill_task(state, exception_catch_func);
+    RETURN_IF_CANCELLED(state);
+    return exception_catch_func();
 }
 
 Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, 
bool eos) {
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index a09638f6d71..8585f0be741 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -417,8 +417,6 @@ public:
 
         _spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
                 Base::custom_profile(), 
profile::SPILL_WRITE_FILE_CURRENT_BYTES, TUnit::BYTES, 1);
-        _spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
-                Base::custom_profile(), 
profile::SPILL_WRITE_FILE_CURRENT_COUNT, TUnit::UNIT, 1);
     }
 
     // Total time of spill, including spill task scheduling time,
@@ -441,9 +439,6 @@ public:
     // Total bytes of spill data written to disk file(after serialized)
     RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
     RuntimeProfile::Counter* _spill_file_total_count = nullptr;
-    RuntimeProfile::Counter* _spill_file_current_count = nullptr;
-    // Spilled file total size
-    RuntimeProfile::Counter* _spill_file_total_size = nullptr;
     // Current spilled file size
     RuntimeProfile::Counter* _spill_file_current_size = nullptr;
 
@@ -806,8 +801,6 @@ public:
     RuntimeProfile::Counter*& _spill_write_rows_count = 
_write_counters.spill_write_rows_count;
 
     // Sink-only counters
-    // Spilled file total size
-    RuntimeProfile::Counter* _spill_file_total_size = nullptr;
     // Total bytes written to spill files (required by SpillFileWriter)
     RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
     // Total number of spill files created (required by SpillFileWriter)
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index d26838345f2..677077f4766 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -504,9 +504,7 @@ Status 
PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
         return status;
     };
 
-    // old code used SpillSinkRunnable, but spills are synchronous and counters
-    // are tracked externally.  Call the spill function directly.
-    return run_spill_task(state, std::move(spill_func));
+    return spill_func();
 }
 
 void PartitionedAggSinkLocalState::_reset_tmp_data() {
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 46abbf5ebe3..cf7c8e6e1a1 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -19,7 +19,6 @@
 
 #include "common/status.h"
 #include "exec/operator/iceberg_table_sink_operator.h"
-#include "exec/operator/spill_utils.h"
 #include "exec/sink/writer/iceberg/viceberg_sort_writer.h"
 #include "exec/sink/writer/iceberg/viceberg_table_writer.h"
 
@@ -96,7 +95,7 @@ Status 
SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
         return status;
     };
 
-    return run_spill_task(state, exception_catch_func);
+    return exception_catch_func();
 }
 
 SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX(
@@ -170,7 +169,6 @@ void 
SpillIcebergTableSinkLocalState::_init_spill_counters() {
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes", 
TUnit::BYTES, 1);
-    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 
1);
 }
 
 } // namespace doris
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp 
b/be/src/exec/operator/spill_sort_source_operator.cpp
index 9d1138553b2..40f15832cfe 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -50,10 +50,6 @@ Status SpillSortLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 Status SpillSortLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    if (_opened) {
-        return Status::OK();
-    }
-
     RETURN_IF_ERROR(setup_in_memory_sort_op(state));
     return Base::open(state);
 }
diff --git a/be/src/exec/operator/spill_sort_source_operator.h 
b/be/src/exec/operator/spill_sort_source_operator.h
index dbc8c3120ea..25fdd306b4b 100644
--- a/be/src/exec/operator/spill_sort_source_operator.h
+++ b/be/src/exec/operator/spill_sort_source_operator.h
@@ -57,8 +57,6 @@ protected:
     friend class SpillSortSourceOperatorX;
     std::unique_ptr<RuntimeState> _runtime_state;
 
-    bool _opened = false;
-
     std::vector<SpillFileSPtr> _current_merging_files;
     /// Readers held alive during merge; one per SpillFile, reads parts 
sequentially.
     std::vector<SpillFileReaderSPtr> _current_merging_readers;
@@ -90,4 +88,4 @@ private:
 
     std::unique_ptr<SortSourceOperatorX> _sort_source_operator;
 };
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/spill_utils.h 
b/be/src/exec/operator/spill_utils.h
index 7d9cb200bd1..dff39d5212f 100644
--- a/be/src/exec/operator/spill_utils.h
+++ b/be/src/exec/operator/spill_utils.h
@@ -70,23 +70,6 @@ struct SpillContext {
     }
 };
 
-// helper to execute a spill function synchronously.  The old code used
-// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track
-// counters and optionally notify a SpillContext.  Since spill operations are
-// now performed synchronously and external code already maintains any
-// necessary counters, those wrappers are no longer necessary.  We keep a
-// small utility to run the provided callbacks and forward cancellation.
-inline Status run_spill_task(RuntimeState* state, std::function<Status()> 
exec_func,
-                             std::function<Status()> fin_cb = {}) {
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(exec_func());
-    if (fin_cb) {
-        RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(fin_cb());
-    }
-    return Status::OK();
-}
-
 template <bool accumulating>
 inline void update_profile_from_inner_profile(const std::string& name,
                                               RuntimeProfile* runtime_profile,
diff --git a/be/src/exec/spill/spill_file_writer.cpp 
b/be/src/exec/spill/spill_file_writer.cpp
index 60ddb68c26b..d68d52c9640 100644
--- a/be/src/exec/spill/spill_file_writer.cpp
+++ b/be/src/exec/spill/spill_file_writer.cpp
@@ -97,7 +97,6 @@ Status SpillFileWriter::_close_current_part(const 
std::shared_ptr<SpillFile>& sp
 
     int64_t meta_size = _part_meta.size();
     _part_written_bytes += meta_size;
-    _total_written_bytes += meta_size;
     COUNTER_UPDATE(_write_file_total_size, meta_size);
     if (_resource_ctx) {
         
_resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_size);
@@ -118,7 +117,6 @@ Status SpillFileWriter::_close_current_part(const 
std::shared_ptr<SpillFile>& sp
 
     // Advance to next part
     ++_current_part_index;
-    ++_total_parts;
     if (spill_file) {
         spill_file->increment_part_count();
     }
@@ -251,7 +249,6 @@ Status SpillFileWriter::_write_internal(const Block& block,
                     }
                     COUNTER_UPDATE(_write_block_counter, 1);
                     _part_written_bytes += buff_size;
-                    _total_written_bytes += buff_size;
                     ++_part_written_blocks;
                     // Incrementally update SpillFile so gc() can always
                     // decrement the correct amount from _data_dir.
@@ -269,4 +266,4 @@ Status SpillFileWriter::_write_internal(const Block& block,
     return status;
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/spill/spill_file_writer.h 
b/be/src/exec/spill/spill_file_writer.h
index 215685933f4..eb8c7a9d9eb 100644
--- a/be/src/exec/spill/spill_file_writer.h
+++ b/be/src/exec/spill/spill_file_writer.h
@@ -89,9 +89,6 @@ private:
     size_t _part_max_sub_block_size = 0;
     std::string _part_meta;
 
-    // ── Cumulative state ──
-    int64_t _total_written_bytes = 0;
-    size_t _total_parts = 0;
     bool _closed = false;
 
     // ── Counters ──
diff --git a/be/src/runtime/runtime_profile_counter_names.h 
b/be/src/runtime/runtime_profile_counter_names.h
index bd4d2e878a8..5813d5e0ea9 100644
--- a/be/src/runtime/runtime_profile_counter_names.h
+++ b/be/src/runtime/runtime_profile_counter_names.h
@@ -104,7 +104,6 @@ inline constexpr char SPILL_WRITE_ROWS[] = "SpillWriteRows";
 inline constexpr char SPILL_WRITE_FILE_BYTES[] = "SpillWriteFileBytes";
 inline constexpr char SPILL_WRITE_FILE_TOTAL_COUNT[] = 
"SpillWriteFileTotalCount";
 inline constexpr char SPILL_WRITE_FILE_CURRENT_BYTES[] = 
"SpillWriteFileCurrentBytes";
-inline constexpr char SPILL_WRITE_FILE_CURRENT_COUNT[] = 
"SpillWriteFileCurrentCount";
 
 // ============================================================
 // Spill read counters (Source-only)
diff --git a/be/test/exec/operator/spill_sort_sink_operator_test.cpp 
b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
index ead792a1e32..77ac2785cc7 100644
--- a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
@@ -25,7 +25,6 @@
 #include "core/block/block.h"
 #include "core/data_type/data_type_number.h"
 #include "exec/operator/spill_sort_test_helper.h"
-#include "exec/operator/spill_utils.h"
 #include "exec/pipeline/dependency.h"
 #include "exec/pipeline/pipeline_task.h"
 #include "testutil/column_helper.h"
@@ -89,34 +88,6 @@ TEST_F(SpillSortSinkOperatorTest, 
RevokeMemoryReturnsCancelAtEntry) {
     
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
 }
 
-TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAtEntry) {
-    cancel_state(_helper.runtime_state.get());
-
-    bool executed = false;
-    expect_cancelled(run_spill_task(_helper.runtime_state.get(), [&]() {
-        executed = true;
-        return Status::OK();
-    }));
-    EXPECT_FALSE(executed);
-}
-
-TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAfterCallback) {
-    bool finalized = false;
-
-    auto status = run_spill_task(
-            _helper.runtime_state.get(),
-            [&]() {
-                cancel_state(_helper.runtime_state.get());
-                return Status::OK();
-            },
-            [&]() {
-                finalized = true;
-                return Status::OK();
-            });
-    expect_cancelled(status);
-    EXPECT_FALSE(finalized);
-}
-
 TEST_F(SpillSortSinkOperatorTest, Basic) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
diff --git a/be/test/exec/operator/spillable_operator_test_helper.cpp 
b/be/test/exec/operator/spillable_operator_test_helper.cpp
index fb87b156545..d51d6d17be1 100644
--- a/be/test/exec/operator/spillable_operator_test_helper.cpp
+++ b/be/test/exec/operator/spillable_operator_test_helper.cpp
@@ -60,7 +60,6 @@ void SpillableOperatorTestHelper::SetUp() {
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadRows", TUnit::UNIT, 
1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", 
TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", 
TUnit::UNIT, 1);
-    ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", 
TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", 
TUnit::UNIT, 1);
 
     operator_profile->add_child(custom_profile.get(), true);
@@ -107,4 +106,4 @@ void SpillableOperatorTestHelper::TearDown() {
     SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr);
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp 
b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
index 61589327909..e9f1a58aecc 100644
--- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
+++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
@@ -65,8 +65,6 @@ public:
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", 
TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillWriteFileTotalCount", TUnit::UNIT,
                                    1);
-            ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillWriteFileCurrentCount", TUnit::UNIT,
-                                   1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillWriteFileCurrentBytes", TUnit::UNIT,
                                    1);
         }
@@ -111,8 +109,6 @@ public:
                                    TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillWriteFileCurrentBytes",
                                    TUnit::BYTES, 1);
-            ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillWriteFileCurrentCount",
-                                   TUnit::UNIT, 1);
             multi_cast_data_streamer->set_source_profile(i, 
source_profiles[i].get());
         }
 
diff --git a/be/test/vec/spill/spill_file_test.cpp 
b/be/test/vec/spill/spill_file_test.cpp
index 67740acdcd2..09173a7a343 100644
--- a/be/test/vec/spill/spill_file_test.cpp
+++ b/be/test/vec/spill/spill_file_test.cpp
@@ -72,7 +72,6 @@ protected:
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", 
TUnit::UNIT, 1);
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", 
TUnit::UNIT, 1);
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
-        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileCurrentCount", TUnit::UNIT, 1);
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileCurrentBytes", TUnit::UNIT, 1);
 
         _profile->add_child(_custom_profile.get(), true);
diff --git a/be/test/vec/spill/spill_repartitioner_test.cpp 
b/be/test/vec/spill/spill_repartitioner_test.cpp
index 53ffcf9f0ac..01da4719cad 100644
--- a/be/test/vec/spill/spill_repartitioner_test.cpp
+++ b/be/test/vec/spill/spill_repartitioner_test.cpp
@@ -74,7 +74,6 @@ protected:
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", 
TUnit::UNIT, 1);
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", 
TUnit::UNIT, 1);
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
-        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileCurrentCount", TUnit::UNIT, 1);
         ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileCurrentBytes", TUnit::UNIT, 1);
 
         _profile->add_child(_custom_profile.get(), true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to