This is an automated email from the ASF dual-hosted git repository.

zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 365fea6bc7b [cleanup] remove pipeline dir (#61319)
365fea6bc7b is described below

commit 365fea6bc7bb33893d37d5a313a896705b725db3
Author: ivin <[email protected]>
AuthorDate: Sat Mar 14 16:27:17 2026 +0800

    [cleanup] remove pipeline dir (#61319)
---
 .../exec/partitioned_aggregation_source_operator.h | 161 ---------------------
 1 file changed, 161 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
deleted file mode 100644
index 486676a9509..00000000000
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ /dev/null
@@ -1,161 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include <deque>
-#include <memory>
-#include <vector>
-
-#include "common/status.h"
-#include "operator.h"
-#include "vec/spill/spill_file.h"
-#include "vec/spill/spill_file_reader.h"
-#include "vec/spill/spill_repartitioner.h"
-
-namespace doris {
-#include "common/compile_check_begin.h"
-class RuntimeState;
-
-class PartitionedAggSourceOperatorX;
-class PartitionedAggLocalState;
-
-/// Represents one partition in the multi-level spill queue for aggregation.
-/// Unlike Join (which has build + probe), Agg only has a single data flow:
-/// spilled aggregation intermediate results stored in one SpillFile.
-struct AggSpillPartitionInfo {
-    // The spill file for this partition.
-    SpillFileSPtr spill_file;
-    // The depth level in the repartition tree (level-0 = original).
-    int level = 0;
-
-    AggSpillPartitionInfo() = default;
-    AggSpillPartitionInfo(SpillFileSPtr s, int lvl) : 
spill_file(std::move(s)), level(lvl) {}
-};
-
-class PartitionedAggLocalState MOCK_REMOVE(final)
-        : public PipelineXSpillLocalState<PartitionedAggSharedState> {
-public:
-    ENABLE_FACTORY_CREATOR(PartitionedAggLocalState);
-    using Base = PipelineXSpillLocalState<PartitionedAggSharedState>;
-    using Parent = PartitionedAggSourceOperatorX;
-    PartitionedAggLocalState(RuntimeState* state, OperatorXBase* parent);
-    ~PartitionedAggLocalState() override = default;
-
-    Status init(RuntimeState* state, LocalStateInfo& info) override;
-    Status open(RuntimeState* state) override;
-    Status close(RuntimeState* state) override;
-
-    Status setup_in_memory_agg_op(RuntimeState* state);
-
-    template <bool spilled>
-    void update_profile(RuntimeProfile* child_profile);
-
-    bool is_blockable() const override;
-
-    /// Flush the current in-memory hash table by draining it as blocks and 
routing
-    /// each block through the repartitioner into the output sub-spill-files.
-    Status flush_hash_table_to_sub_spill_files(RuntimeState* state);
-
-    /// Flush the in-memory hash table into FANOUT sub-spill-files, 
repartition remaining
-    /// unread spill files from `remaining_spill_files`, and push resulting 
sub-partitions into
-    /// `_partition_queue`. After this call the hash table is reset and
-    /// `remaining_spill_files` is cleared.
-    Status flush_and_repartition(RuntimeState* state);
-
-private:
-    friend class PartitionedAggSourceOperatorX;
-
-    /// Move all original spill_partitions from shared state into 
`_partition_queue`.
-    /// Called once when spilled get_block is first entered.
-    void _init_partition_queue();
-
-    /// Read up to SpillFile::MAX_SPILL_WRITE_BATCH_MEM bytes from 
`partition.spill_files` into
-    /// `_blocks`. Returns has_data=true if any blocks were read.
-    /// Consumes and deletes exhausted spill files from the partition.
-    Status _recover_blocks_from_partition(RuntimeState* state, 
AggSpillPartitionInfo& partition);
-
-    // ── State ──────────────────────────────────────────────────────────
-    std::unique_ptr<RuntimeState> _runtime_state;
-    bool _opened = false;
-    std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
-
-    // ── Partition queue (unified for original + repartitioned) ────────
-    std::deque<AggSpillPartitionInfo> _partition_queue;
-    AggSpillPartitionInfo _current_partition;
-    // True when we need to pop the next partition from `_partition_queue`.
-    bool _need_to_setup_partition = true;
-
-    // Blocks recovered from disk, pending merge into hash table.
-    std::vector<Block> _blocks;
-
-    // Counters to track spill partition metrics
-    RuntimeProfile::Counter* _max_partition_level = nullptr;
-    RuntimeProfile::Counter* _total_partition_spills = nullptr;
-    int _max_partition_level_seen = 0;
-
-    SpillRepartitioner _repartitioner;
-
-    // Persistent reader for _recover_blocks_from_partition (survives across 
yield calls)
-    SpillFileReaderSPtr _current_reader;
-};
-
-class AggSourceOperatorX;
-class PartitionedAggSourceOperatorX : public 
OperatorX<PartitionedAggLocalState> {
-public:
-    using Base = OperatorX<PartitionedAggLocalState>;
-    PartitionedAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, 
int operator_id,
-                                  const DescriptorTbl& descs);
-    ~PartitionedAggSourceOperatorX() override = default;
-
-    Status init(const TPlanNode& tnode, RuntimeState* state) override;
-
-    Status prepare(RuntimeState* state) override;
-
-    Status close(RuntimeState* state) override;
-
-    Status get_block(RuntimeState* state, Block* block, bool* eos) override;
-
-    bool is_source() const override { return true; }
-
-    bool is_serial_operator() const override;
-    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
-                         bool require_bucket_distribution) override;
-
-    DataDistribution required_data_distribution(RuntimeState* state) const 
override;
-    bool is_colocated_operator() const override;
-    bool is_shuffled_operator() const override;
-
-    // Returns the current in-memory hash table size for the active partition.
-    // The scheduler uses this to decide whether to trigger revoke_memory.
-    size_t revocable_mem_size(RuntimeState* state) const override;
-
-    // Called by the pipeline task scheduler under memory pressure. Flushes the
-    // current in-memory aggregation hash table to sub-spill-files and 
repartitions,
-    // freeing the hash table memory so it can be recovered in smaller slices.
-    Status revoke_memory(RuntimeState* state) override;
-
-private:
-    friend class PartitionedAggLocalState;
-
-    std::unique_ptr<AggSourceOperatorX> _agg_source_operator;
-    // number of spill partitions configured for this operator
-    size_t _partition_count = 0;
-    // max repartition depth (configured from session variable in FE)
-    size_t _repartition_max_depth = SpillRepartitioner::MAX_DEPTH;
-};
-#include "common/compile_check_end.h"
-} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to