This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 173d43cff76 [feature](pipeline)add local merge sort exchanger (#35682)
173d43cff76 is described below

commit 173d43cff76321fd057eba64d1a8122034348480
Author: Mryange <[email protected]>
AuthorDate: Wed Jun 5 20:06:54 2024 +0800

    [feature](pipeline)add local merge sort exchanger (#35682)
    
    ```
                          DATA_STREAM_SINK_OPERATOR  (id=2,dst_id=2):
                                -  BlocksProduced:  sum  2,  avg  1,  max  2,  
min  0
                                -  CloseTime:  avg  50.352us,  max  59.404us,  
min  41.300us
                                -  ExecTime:  avg  357.197us,  max  428.119us,  
min  286.275us
                                -  InitTime:  avg  103.853us,  max  104.291us,  
min  103.416us
                                -  InputRows:  sum  60,  avg  30,  max  60,  
min  0
                                -  MemoryUsage:  sum  ,  avg  ,  max  ,  min
                                    -  PeakMemoryUsage:  sum  0.00  ,  avg  
0.00  ,  max  0.00  ,  min  0.00
                                -  OpenTime:  avg  128.654us,  max  163.939us,  
min  93.370us
                                -  RowsProduced:  sum  60,  avg  30,  max  60,  
min  0
                                -  WaitForDependencyTime:  avg  0ns,  max  0ns, 
 min  0ns
                                    -  WaitForRpcBufferQueue:  avg  0ns,  max  
0ns,  min  0ns
                              LOCAL_EXCHANGE_OPERATOR  (LOCAL_MERGE_SORT)  
(id=-3):
                                    -  BlocksProduced:  sum  2,  avg  1,  max  
2,  min  0
                                    -  CloseTime:  avg  0ns,  max  0ns,  min  
0ns
                                    -  ExecTime:  avg  873.747us,  max  
1.737ms,  min  9.663us
                                    -  GetBlockFailedTime:  sum  0,  avg  0,  
max  0,  min  0
                                    -  InitTime:  avg  466ns,  max  495ns,  min 
 437ns
                                    -  MemoryUsage:  sum  ,  avg  ,  max  ,  min
                                        -  PeakMemoryUsage:  sum  0.00  ,  avg  
0.00  ,  max  0.00  ,  min  0.00
                                    -  OpenTime:  avg  19.531us,  max  
31.460us,  min  7.602us
                                    -  ProjectionTime:  avg  0ns,  max  0ns,  
min  0ns
                                    -  RowsProduced:  sum  60,  avg  30,  max  
60,  min  0
                                    -  
WaitForDependency[LOCAL_EXCHANGE_OPERATOR_DEPENDENCY]Time:  avg  14.130ms,  max 
 14.701ms,  min  13.558ms
                      Pipeline  :  1(instance_num=2):
                          LOCAL_EXCHANGE_SINK_OPERATOR  (LOCAL_MERGE_SORT)  
(id=-3):
                                -  CloseTime:  avg  0ns,  max  0ns,  min  0ns
                                -  ExecTime:  avg  59.515us,  max  68.50us,  
min  50.981us
                                -  InitTime:  avg  8.415us,  max  8.695us,  min 
 8.135us
                                -  InputRows:  sum  60,  avg  30,  max  30,  
min  30
                                -  MemoryUsage:  sum  ,  avg  ,  max  ,  min
                                    -  PeakMemoryUsage:  sum  0.00  ,  avg  
0.00  ,  max  0.00  ,  min  0.00
                                -  OpenTime:  avg  1.498us,  max  1.630us,  min 
 1.366us
                                -  
WaitForDependency[LOCAL_EXCHANGE_SINK_DEPENDENCY]Time:  avg  0ns,  max  0ns,  
min  0ns
                              SORT_OPERATOR  (id=1  ,  nereids_id=108):
                                    -  PlanInfo
                                          -  order  by:  s_suppkey  ASC
                                          -  TOPN  OPT
                                          -  offset:  0
                                          -  limit:  30
                                    -  BlocksProduced:  sum  2,  avg  1,  max  
1,  min  1
                                    -  CloseTime:  avg  0ns,  max  0ns,  min  
0ns
                                    -  ExecTime:  avg  10.593us,  max  18.22us, 
 min  3.165us
                                    -  InitTime:  avg  0ns,  max  0ns,  min  0ns
                                    -  MemoryUsage:  sum  ,  avg  ,  max  ,  min
                                        -  PeakMemoryUsage:  sum  0.00  ,  avg  
0.00  ,  max  0.00  ,  min  0.00
                                    -  OpenTime:  avg  0ns,  max  0ns,  min  0ns
                                    -  ProjectionTime:  avg  0ns,  max  0ns,  
min  0ns
    ```
---
 be/src/pipeline/dependency.h                       |  4 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 18 +++-
 be/src/pipeline/exec/exchange_sink_operator.h      |  2 +
 be/src/pipeline/exec/operator.h                    |  1 +
 be/src/pipeline/exec/sort_source_operator.cpp      | 43 +++++++++-
 be/src/pipeline/exec/sort_source_operator.h        | 17 ++++
 .../local_exchange_sink_operator.cpp               |  1 +
 .../local_exchange/local_exchange_sink_operator.h  |  9 ++
 .../local_exchange_source_operator.h               |  2 +
 be/src/pipeline/local_exchange/local_exchanger.cpp | 95 ++++++++++++++++++++++
 be/src/pipeline/local_exchange/local_exchanger.h   | 47 +++++++++++
 be/src/pipeline/pipeline_fragment_context.cpp      | 17 ++++
 be/src/runtime/runtime_state.h                     |  5 ++
 be/src/vec/core/sort_cursor.h                      |  3 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  6 ++
 gensrc/thrift/PaloInternalService.thrift           |  2 +
 16 files changed, 270 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 46335caade5..e5a019b4fa0 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -792,6 +792,8 @@ enum class ExchangeType : uint8_t {
     ADAPTIVE_PASSTHROUGH = 5,
     // Send all data to the first channel.
     PASS_TO_ONE = 6,
+    // merge all data to one channel.
+    LOCAL_MERGE_SORT = 7,
 };
 
 inline std::string get_exchange_type_name(ExchangeType idx) {
@@ -810,6 +812,8 @@ inline std::string get_exchange_type_name(ExchangeType idx) 
{
         return "ADAPTIVE_PASSTHROUGH";
     case ExchangeType::PASS_TO_ONE:
         return "PASS_TO_ONE";
+    case ExchangeType::LOCAL_MERGE_SORT:
+        return "LOCAL_MERGE_SORT";
     }
     LOG(FATAL) << "__builtin_unreachable";
     __builtin_unreachable();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index d00d9521196..11633f4fcf2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -26,7 +26,9 @@
 
 #include "common/status.h"
 #include "exchange_sink_buffer.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "vec/columns/column_const.h"
 #include "vec/exprs/vexpr.h"
@@ -294,7 +296,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
           _tablet_sink_partition(sink.tablet_sink_partition),
           _tablet_sink_location(sink.tablet_sink_location),
           _tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
-          _tablet_sink_txn_id(sink.tablet_sink_txn_id) {
+          _tablet_sink_txn_id(sink.tablet_sink_txn_id),
+          _enable_local_merge_sort(state->enable_local_merge_sort()) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -648,4 +651,17 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     return Base::close(state, exec_status);
 }
 
+DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
+    if (_child_x && _enable_local_merge_sort) {
+        // SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
+        // SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
+        if (auto sort_source = 
std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);
+            sort_source && sort_source->use_local_merge()) {
+            // Sort the data local
+            return ExchangeType::LOCAL_MERGE_SORT;
+        }
+    }
+    return 
DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 0445dafacc0..1c14f04679e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -218,6 +218,7 @@ public:
 
     Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* 
src, PBlock* dest,
                            int num_receivers = 1);
+    DataDistribution required_data_distribution() const override;
 
 private:
     friend class ExchangeSinkLocalState;
@@ -270,6 +271,7 @@ private:
     // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
     size_t _data_processed = 0;
     int _writer_count = 1;
+    const bool _enable_local_merge_sort;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index f78d08eeb9e..38b0b892f2f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -113,6 +113,7 @@ public:
 
     virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
     [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
+    OperatorXPtr child_x() { return _child_x; }
 
 protected:
     OperatorXPtr _child_x = nullptr;
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 34bfffb8d9f..89262828708 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -28,7 +28,35 @@ SortLocalState::SortLocalState(RuntimeState* state, 
OperatorXBase* parent)
 
 SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& 
tnode, int operator_id,
                                          const DescriptorTbl& descs)
-        : OperatorX<SortLocalState>(pool, tnode, operator_id, descs) {}
+        : OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
+          _merge_by_exchange(tnode.sort_node.merge_by_exchange),
+          _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) 
{}
+
+Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
+    _is_asc_order = tnode.sort_node.sort_info.is_asc_order;
+    _nulls_first = tnode.sort_node.sort_info.nulls_first;
+    return Status::OK();
+}
+
+Status SortSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    // spill sort _child_x may be nullptr.
+    if (_child_x) {
+        RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), 
_row_descriptor));
+    }
+    return Status::OK();
+}
+
+Status SortSourceOperatorX::open(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::open(state));
+    // spill sort _child_x may be nullptr.
+    if (_child_x) {
+        RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+    }
+    return Status::OK();
+}
 
 Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
@@ -45,4 +73,17 @@ const vectorized::SortDescription& 
SortSourceOperatorX::get_sort_description(
     return local_state._shared_state->sorter->get_sort_description();
 }
 
+Status SortSourceOperatorX::build_merger(RuntimeState* state,
+                                         
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
+                                         RuntimeProfile* profile) {
+    // now only use in LocalMergeSortExchanger::get_block
+    vectorized::VSortExecExprs vsort_exec_exprs;
+    // clone vsort_exec_exprs in LocalMergeSortExchanger
+    RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs));
+    merger = std::make_unique<vectorized::VSortedRunMerger>(
+            vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, 
_nulls_first,
+            state->batch_size(), _limit, _offset, profile);
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h 
b/be/src/pipeline/exec/sort_source_operator.h
index f20e8b9314b..86832e04ae0 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -28,6 +28,8 @@ class RuntimeState;
 namespace pipeline {
 
 class SortSourceOperatorX;
+class SortSinkOperatorX;
+
 class SortLocalState final : public PipelineXLocalState<SortSharedState> {
 public:
     ENABLE_FACTORY_CREATOR(SortLocalState);
@@ -40,16 +42,31 @@ private:
 
 class SortSourceOperatorX final : public OperatorX<SortLocalState> {
 public:
+    using Base = OperatorX<SortLocalState>;
     SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                         const DescriptorTbl& descs);
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+
     bool is_source() const override { return true; }
 
+    bool use_local_merge() const { return _merge_by_exchange; }
     const vectorized::SortDescription& get_sort_description(RuntimeState* 
state) const;
 
+    Status build_merger(RuntimeState* state, 
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
+                        RuntimeProfile* profile);
+
 private:
     friend class SortLocalState;
+    const bool _merge_by_exchange;
+    std::vector<bool> _is_asc_order;
+    std::vector<bool> _nulls_first;
+    // Expressions and parameters used for build _sort_description
+    vectorized::VSortExecExprs _vsort_exec_exprs;
+    const int64_t _offset;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index c8c165e4c90..a310b921b18 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -80,6 +80,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     SCOPED_TIMER(_init_timer);
     _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+    _channel_id = info.task_idx;
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 10234fe3043..36530bc8ef1 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -30,6 +30,7 @@ class ShuffleExchanger;
 class PassthroughExchanger;
 class BroadcastExchanger;
 class PassToOneExchanger;
+class LocalMergeSortExchanger;
 class LocalExchangeSinkOperatorX;
 class LocalExchangeSinkLocalState final : public 
PipelineXSinkLocalState<LocalExchangeSharedState> {
 public:
@@ -44,6 +45,13 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
     std::string debug_string(int indentation_level) const override;
+    std::vector<Dependency*> dependencies() const override {
+        auto deps = Base::dependencies();
+        if (auto local_state_sink_dep = 
_exchanger->get_local_state_dependency(_channel_id)) {
+            deps.push_back(local_state_sink_dep.get());
+        }
+        return deps;
+    }
 
 private:
     friend class LocalExchangeSinkOperatorX;
@@ -52,6 +60,7 @@ private:
     friend class PassthroughExchanger;
     friend class BroadcastExchanger;
     friend class PassToOneExchanger;
+    friend class LocalMergeSortExchanger;
     friend class AdaptivePassthroughExchanger;
 
     Exchanger* _exchanger = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index ec662178dea..f32261cd574 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -26,6 +26,7 @@ class ShuffleExchanger;
 class PassthroughExchanger;
 class BroadcastExchanger;
 class PassToOneExchanger;
+class LocalMergeSortExchanger;
 class LocalExchangeSourceOperatorX;
 class LocalExchangeSourceLocalState final : public 
PipelineXLocalState<LocalExchangeSharedState> {
 public:
@@ -46,6 +47,7 @@ private:
     friend class PassthroughExchanger;
     friend class BroadcastExchanger;
     friend class PassToOneExchanger;
+    friend class LocalMergeSortExchanger;
     friend class AdaptivePassthroughExchanger;
 
     Exchanger* _exchanger = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index a7a21edbb58..980078b8fe8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -17,6 +17,9 @@
 
 #include "pipeline/local_exchange/local_exchanger.h"
 
+#include "common/status.h"
+#include "pipeline/exec/sort_sink_operator.h"
+#include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_source_operator.h"
 #include "vec/runtime/partitioner.h"
@@ -262,6 +265,98 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    vectorized::Block new_block;
+    if (!_free_blocks.try_dequeue(new_block)) {
+        new_block = {in_block->clone_empty()};
+    }
+    new_block.swap(*in_block);
+    DCHECK_LE(local_state._channel_id, _data_queue.size());
+    _data_queue[local_state._channel_id].enqueue(std::move(new_block));
+    add_mem_usage(local_state, new_block.allocated_bytes());
+    local_state._shared_state->set_ready_to_read(0);
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
+                                             LocalExchangeSourceLocalState& 
local_state) {
+    RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, 
local_state.profile()));
+    std::vector<vectorized::BlockSupplier> child_block_suppliers;
+    for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
+        vectorized::BlockSupplier block_supplier = [&, id = 
channel_id](vectorized::Block* block,
+                                                                        bool* 
eos) {
+            vectorized::Block next_block;
+            if (_running_sink_operators == 0) {
+                if (_data_queue[id].try_dequeue(next_block)) {
+                    block->swap(next_block);
+                    if (_free_block_limit == 0 ||
+                        _free_blocks.size_approx() < _free_block_limit * 
_num_sources) {
+                        _free_blocks.enqueue(std::move(next_block));
+                    }
+                    sub_mem_usage(local_state, id, block->allocated_bytes());
+                } else {
+                    *eos = true;
+                }
+            } else if (_data_queue[id].try_dequeue(next_block)) {
+                block->swap(next_block);
+                if (_free_block_limit == 0 ||
+                    _free_blocks.size_approx() < _free_block_limit * 
_num_sources) {
+                    _free_blocks.enqueue(std::move(next_block));
+                }
+                sub_mem_usage(local_state, id, block->allocated_bytes());
+            }
+            return Status::OK();
+        };
+        child_block_suppliers.push_back(block_supplier);
+    }
+    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+    _merger->set_pipeline_engine_enabled(true);
+    return Status::OK();
+}
+
+/*
+before
+    sort(8) --> datasink(8) [0,7].  ---->
+    sort(8) --> datasink(8) [8,15]. ---->        [0,23]global merge ---->   
Exchange(1)
+    sort(8) --> datasink(8) [16,23].---->
+
+now
+
+    sort(8) --> local merge(1) ---> datasink(1) [0] ---->
+    sort(8) --> local merge(1) ---> datasink(1) [1] ---->     [0,2]global 
merge ---->   Exchange(1)
+    sort(8) --> local merge(1) ---> datasink(1) [2] ---->
+*/
+Status LocalMergeSortExchanger::get_block(RuntimeState* state, 
vectorized::Block* block, bool* eos,
+                                          LocalExchangeSourceLocalState& 
local_state) {
+    if (local_state._channel_id != 0) {
+        *eos = true;
+        return Status::OK();
+    }
+    if (!_merger) {
+        RETURN_IF_ERROR(build_merger(state, local_state));
+    }
+    RETURN_IF_ERROR(_merger->get_next(block, eos));
+    return Status::OK();
+}
+
+void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& 
local_state,
+                                            int64_t delta) {
+    const auto channel_id = local_state._channel_id;
+    local_state._shared_state->mem_trackers[channel_id]->consume(delta);
+    if (_queues_mem_usege[channel_id].fetch_add(delta) > _each_queue_limit) {
+        _sink_deps[channel_id]->block();
+    }
+}
+
+void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState& 
local_state,
+                                            int channel_id, int64_t delta) {
+    local_state._shared_state->mem_trackers[channel_id]->release(delta);
+    if (_queues_mem_usege[channel_id].fetch_sub(delta) <= _each_queue_limit) {
+        _sink_deps[channel_id]->set_ready();
+    }
+}
+
 Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                                 LocalExchangeSinkLocalState& local_state) {
     for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index bc07c806094..806ac8b9131 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -25,6 +25,7 @@ namespace doris::pipeline {
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 struct ShuffleBlockWrapper;
+class SortSourceOperatorX;
 
 class Exchanger {
 public:
@@ -50,6 +51,8 @@ public:
     virtual ExchangeType get_type() const = 0;
     virtual void close(LocalExchangeSourceLocalState& local_state) {}
 
+    virtual DependencySPtr get_local_state_dependency(int _channel_id) { 
return nullptr; }
+
 protected:
     friend struct LocalExchangeSharedState;
     friend struct ShuffleBlockWrapper;
@@ -177,6 +180,50 @@ private:
     std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
+class LocalMergeSortExchanger final : public Exchanger {
+public:
+    ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
+    LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
+                            int running_sink_operators, int num_partitions, 
int free_block_limit)
+            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit),
+              _sort_source(std::move(sort_source)),
+              _queues_mem_usege(num_partitions),
+              _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_partitions) {
+        _data_queue.resize(num_partitions);
+        for (size_t i = 0; i < num_partitions; i++) {
+            _queues_mem_usege[i] = 0;
+            _sink_deps.push_back(
+                    std::make_shared<Dependency>(0, 0, 
"LOCAL_MERGE_SORT_DEPENDENCY", true));
+        }
+    }
+    ~LocalMergeSortExchanger() override = default;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
+                LocalExchangeSinkLocalState& local_state) override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
+                     LocalExchangeSourceLocalState& local_state) override;
+    ExchangeType get_type() const override { return 
ExchangeType::LOCAL_MERGE_SORT; }
+
+    Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState& 
local_state);
+
+    DependencySPtr get_local_state_dependency(int channel_id) override {
+        DCHECK(_sink_deps[channel_id]);
+        return _sink_deps[channel_id];
+    }
+
+    void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
+
+    void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int 
channel_id, int64_t delta);
+
+private:
+    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+    std::unique_ptr<vectorized::VSortedRunMerger> _merger;
+    std::shared_ptr<SortSourceOperatorX> _sort_source;
+    std::vector<DependencySPtr> _sink_deps;
+    std::vector<std::atomic_int64_t> _queues_mem_usege;
+    const int64_t _each_queue_limit;
+};
+
 class BroadcastExchanger final : public Exchanger {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9ddbd1b9150..a85b64c9154 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -37,6 +37,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/stream_load_pipe.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/aggregation_sink_operator.h"
 #include "pipeline/exec/aggregation_source_operator.h"
 #include "pipeline/exec/analytic_sink_operator.h"
@@ -90,6 +91,7 @@
 #include "pipeline/exec/union_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/local_exchange/local_exchanger.h"
 #include "pipeline/task_scheduler.h"
 #include "pipeline_task.h"
 #include "runtime/exec_env.h"
@@ -742,6 +744,21 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                         ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
                         : 0);
         break;
+    case ExchangeType::LOCAL_MERGE_SORT: {
+        auto child_op = cur_pipe->sink_x()->child_x();
+        auto sort_source = 
std::dynamic_pointer_cast<SortSourceOperatorX>(child_op);
+        if (!sort_source) {
+            return Status::InternalError(
+                    "LOCAL_MERGE_SORT must use in SortSourceOperatorX , but 
now is {} ",
+                    child_op->get_name());
+        }
+        shared_state->exchanger = LocalMergeSortExchanger::create_unique(
+                sort_source, cur_pipe->num_tasks(), _num_instances,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
+        break;
+    }
     case ExchangeType::ADAPTIVE_PASSTHROUGH:
         shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
                 cur_pipe->num_tasks(), _num_instances,
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 565e5ccdc20..2b303603e7a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -597,6 +597,11 @@ public:
         return _query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill;
     }
 
+    bool enable_local_merge_sort() const {
+        return _query_options.__isset.enable_local_merge_sort &&
+               _query_options.enable_local_merge_sort;
+    }
+
     int64_t min_revocable_mem() const {
         if (_query_options.__isset.min_revocable_mem) {
             return _query_options.min_revocable_mem;
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 8681dc4ffc6..d92718298cb 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -195,6 +195,9 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
     }
 
     bool has_next_block() override {
+        if (_is_eof) {
+            return false;
+        }
         _block.clear();
         Status status;
         do {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c3d9ce287c2..ab412b7dd08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -252,6 +252,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_TO_LOCAL_SHUFFLE = 
"force_to_local_shuffle";
 
+    public static final String ENABLE_LOCAL_MERGE_SORT = 
"enable_local_merge_sort";
+
     public static final String ENABLE_AGG_STATE = "enable_agg_state";
 
     public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = 
"enable_bucket_shuffle_downgrade";
@@ -973,6 +975,9 @@ public class SessionVariable implements Serializable, 
Writable {
                         "Whether to force to local shuffle on pipelineX 
engine."})
     private boolean forceToLocalShuffle = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
+    private boolean enableLocalMergeSort = true;
+
     @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
             needForward = true)
     public boolean enableAggState = false;
@@ -3391,6 +3396,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setMinRevocableMem(minRevocableMem);
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 
+        tResult.setEnableLocalMergeSort(enableLocalMergeSort);
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 586d40b8648..3619bf9d97d 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -299,6 +299,8 @@ struct TQueryOptions {
   111: optional bool enable_orc_filter_by_min_max = true
 
   112: optional i32 max_column_reader_num = 0
+
+  113: optional bool enable_local_merge_sort = false;
   
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to