This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 57672d180d2 [fix](be) Continue sorted merge when sender queue is ready 
(#65004)
57672d180d2 is described below

commit 57672d180d229d80617be96284e12c6bc03619d2
Author: HappenLee <[email protected]>
AuthorDate: Wed Jul 1 17:18:58 2026 +0800

    [fix](be) Continue sorted merge when sender queue is ready (#65004)
    
    ### What problem does this PR solve?
    
    When a sorted-run merge cursor reaches the end of its current block and
    the sender already has the next block ready, the merger may fetch the
    next block before flushing rows that have already been selected into the
    output block.
    
    For variable-length columns, the pending row addresses still point to
    the previous cursor block. Reusing the cursor block before `do_insert()`
    can make the output read rows from the wrong block and corrupt string
    offsets.
    
    ### What is changed?
    
    Flush pending output rows before loading the next ready block from the
    exhausted cursor.
    
    This keeps the existing non-ready path behavior unchanged: when the next
    block is not ready, the cursor is saved as pending and the current
    output block is returned first.
    
    The PR also adds regression coverage for both:
    
    - continuing merge when the next sender block is already ready
    - preserving string column values before the cursor block is reused
---
 be/src/exec/exchange/vdata_stream_recvr.cpp |  12 +-
 be/src/exec/exchange/vdata_stream_recvr.h   |   2 +
 be/src/exec/sort/sort_cursor.h              |  22 ++-
 be/src/exec/sort/vsorted_run_merger.cpp     |  34 ++++-
 be/src/exec/sort/vsorted_run_merger.h       |   8 +-
 be/test/core/value/sort_merger_test.cpp     | 201 +++++++++++++++++++++++++++-
 6 files changed, 264 insertions(+), 15 deletions(-)

diff --git a/be/src/exec/exchange/vdata_stream_recvr.cpp 
b/be/src/exec/exchange/vdata_stream_recvr.cpp
index 7b77c24ecbd..f54baf2221d 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.cpp
+++ b/be/src/exec/exchange/vdata_stream_recvr.cpp
@@ -113,6 +113,11 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* 
block, bool* eos) {
     return Status::OK();
 }
 
+bool VDataStreamRecvr::SenderQueue::has_data_or_finished() {
+    std::lock_guard<std::mutex> l(_lock);
+    return _is_cancelled || !_block_queue.empty() || _num_remaining_senders == 
0;
+}
+
 void 
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
     // Here, it is necessary to check if _source_dependency is not nullptr.
     // This is because the queue might be closed before setting the source 
dependency.
@@ -431,16 +436,21 @@ Status VDataStreamRecvr::create_merger(const 
VExprContextSPtrs& ordering_expr,
     DCHECK(_is_merging);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     std::vector<BlockSupplier> child_block_suppliers;
+    std::vector<BlockSupplierReadyChecker> child_block_supplier_ready_checkers;
     // Create the merger that will a single stream of sorted rows.
     _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size, limit,
                                        offset, _profile));
 
+    child_block_suppliers.reserve(_sender_queues.size());
+    child_block_supplier_ready_checkers.reserve(_sender_queues.size());
     for (int i = 0; i < _sender_queues.size(); ++i) {
         
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
                                                      _sender_queues[i], 
std::placeholders::_1,
                                                      std::placeholders::_2));
+        child_block_supplier_ready_checkers.emplace_back(
+                std::bind(std::mem_fn(&SenderQueue::has_data_or_finished), 
_sender_queues[i]));
     }
-    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers, 
child_block_supplier_ready_checkers));
     return Status::OK();
 }
 
diff --git a/be/src/exec/exchange/vdata_stream_recvr.h 
b/be/src/exec/exchange/vdata_stream_recvr.h
index 12650e780a1..ccdb849c576 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -181,6 +181,8 @@ public:
 
     Status get_batch(Block* next_block, bool* eos);
 
+    bool has_data_or_finished();
+
     Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t 
packet_seq,
                      ::google::protobuf::Closure** done, const int64_t 
wait_for_worker,
                      const uint64_t time_to_find_recvr);
diff --git a/be/src/exec/sort/sort_cursor.h b/be/src/exec/sort/sort_cursor.h
index a13906c2be5..c0ae4a02edd 100644
--- a/be/src/exec/sort/sort_cursor.h
+++ b/be/src/exec/sort/sort_cursor.h
@@ -22,6 +22,7 @@
 
 #include <glog/logging.h>
 
+#include <functional>
 #include <utility>
 
 #include "core/block/block.h"
@@ -107,6 +108,7 @@ struct MergeSortCursorImpl {
     virtual void process_next() {}
     virtual Block* block_ptr() { return nullptr; }
     virtual bool eof() const { return false; }
+    virtual bool has_ready_block_or_eos() const { return false; }
 
     Field get_top_value() const {
         Field field {PrimitiveType::TYPE_NULL};
@@ -116,14 +118,18 @@ struct MergeSortCursorImpl {
 };
 
 using BlockSupplier = std::function<Status(Block*, bool* eos)>;
+using BlockSupplierReadyChecker = std::function<bool()>;
 
 struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
     ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
     BlockSupplierSortCursorImpl(BlockSupplier block_supplier,
                                 const VExprContextSPtrs& ordering_expr,
                                 const std::vector<bool>& is_asc_order,
-                                const std::vector<bool>& nulls_first)
-            : _ordering_expr(ordering_expr), 
_block_supplier(std::move(block_supplier)) {
+                                const std::vector<bool>& nulls_first,
+                                BlockSupplierReadyChecker 
block_supplier_ready_checker = {})
+            : _ordering_expr(ordering_expr),
+              _block_supplier(std::move(block_supplier)),
+              
_block_supplier_ready_checker(std::move(block_supplier_ready_checker)) {
         block = Block::create_shared();
         sort_columns_size = ordering_expr.size();
 
@@ -135,11 +141,18 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
         process_next();
     }
 
-    BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const 
SortDescription& desc_)
-            : MergeSortCursorImpl(desc_), 
_block_supplier(std::move(block_supplier)) {
+    BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const 
SortDescription& desc_,
+                                BlockSupplierReadyChecker 
block_supplier_ready_checker = {})
+            : MergeSortCursorImpl(desc_),
+              _block_supplier(std::move(block_supplier)),
+              
_block_supplier_ready_checker(std::move(block_supplier_ready_checker)) {
         process_next();
     }
 
+    bool has_ready_block_or_eos() const override {
+        return _block_supplier_ready_checker && 
_block_supplier_ready_checker();
+    }
+
     void process_next() override {
         if (_is_eof) {
             return;
@@ -166,6 +179,7 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
 
     VExprContextSPtrs _ordering_expr;
     BlockSupplier _block_supplier;
+    BlockSupplierReadyChecker _block_supplier_ready_checker;
     bool _is_eof = false;
 };
 
diff --git a/be/src/exec/sort/vsorted_run_merger.cpp 
b/be/src/exec/sort/vsorted_run_merger.cpp
index 8323490031d..353fb949656 100644
--- a/be/src/exec/sort/vsorted_run_merger.cpp
+++ b/be/src/exec/sort/vsorted_run_merger.cpp
@@ -61,14 +61,22 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) 
{
     _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
 }
 
-Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs) 
{
+Status VSortedRunMerger::prepare(
+        const std::vector<BlockSupplier>& input_runs,
+        const std::vector<BlockSupplierReadyChecker>& 
input_run_ready_checkers) {
     try {
-        for (const auto& supplier : input_runs) {
+        for (size_t i = 0; i < input_runs.size(); ++i) {
+            const auto& supplier = input_runs[i];
+            BlockSupplierReadyChecker ready_checker;
+            if (i < input_run_ready_checkers.size()) {
+                ready_checker = input_run_ready_checkers[i];
+            }
             if (_use_sort_desc) {
-                
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, 
_desc));
+                _cursors.emplace_back(
+                        BlockSupplierSortCursorImpl::create_shared(supplier, 
_desc, ready_checker));
             } else {
                 
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
-                        supplier, _ordering_expr, _is_asc_order, 
_nulls_first));
+                        supplier, _ordering_expr, _is_asc_order, _nulls_first, 
ready_checker));
             }
         }
     } catch (const std::exception& e) {
@@ -193,7 +201,12 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
             }
 
             current->next();
-            if (_need_more_data(current)) {
+            const bool has_ready_block_or_eos =
+                    current->is_last(0) && !current->eof() && 
current->has_ready_block_or_eos();
+            if (has_ready_block_or_eos) {
+                do_insert();
+            }
+            if (_need_more_data(current, has_ready_block_or_eos)) {
                 do_insert();
                 scoped_mutable_block.restore();
                 return Status::OK();
@@ -211,12 +224,21 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
     return Status::OK();
 }
 
-bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) {
+bool VSortedRunMerger::_need_more_data(MergeSortCursor& current, bool 
has_ready_block_or_eos) {
     if (!current->is_last(0)) {
         _priority_queue.push(current);
         return false;
     } else if (current->eof()) {
         return false;
+    } else if (has_ready_block_or_eos) {
+        {
+            ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
+            current->process_next();
+        }
+        if (!current->eof()) {
+            _priority_queue.push(current);
+        }
+        return false;
     } else {
         _pending_cursor = current.impl;
         return true;
diff --git a/be/src/exec/sort/vsorted_run_merger.h 
b/be/src/exec/sort/vsorted_run_merger.h
index 191db39f295..11a20a490c4 100644
--- a/be/src/exec/sort/vsorted_run_merger.h
+++ b/be/src/exec/sort/vsorted_run_merger.h
@@ -55,7 +55,8 @@ public:
     // Prepare this merger to merge and return rows from the sorted runs in 
'input_runs'.
     // Retrieves the first batch from each run and sets up the binary heap 
implementing
     // the priority queue.
-    Status prepare(const std::vector<BlockSupplier>& input_runs);
+    Status prepare(const std::vector<BlockSupplier>& input_runs,
+                   const std::vector<BlockSupplierReadyChecker>& 
input_run_ready_checkers = {});
 
     // Return the next block of sorted rows from this merger.
     Status get_next(Block* output_block, bool* eos);
@@ -91,8 +92,9 @@ protected:
 
 private:
     void init_timers(RuntimeProfile* profile);
-    // If current stream is exhausted and not eof, we should break this loop 
and read more blocks.
-    bool _need_more_data(MergeSortCursor& current);
+    // If current stream is exhausted and not eof, keep merging only when the 
next block or eos is
+    // already available. Otherwise break this loop and wait for more data.
+    bool _need_more_data(MergeSortCursor& current, bool 
has_ready_block_or_eos);
 };
 
 } // namespace doris
diff --git a/be/test/core/value/sort_merger_test.cpp 
b/be/test/core/value/sort_merger_test.cpp
index b5c0b8ced95..75a95db7c9b 100644
--- a/be/test/core/value/sort_merger_test.cpp
+++ b/be/test/core/value/sort_merger_test.cpp
@@ -17,8 +17,13 @@
 
 #include <gtest/gtest.h>
 
+#include <cstdint>
+#include <string>
+#include <vector>
+
 #include "core/data_type/data_type_nullable.h"
 #include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
 #include "exec/sort/vsorted_run_merger.h"
 #include "testutil/column_helper.h"
 #include "testutil/mock/mock_slot_ref.h"
@@ -33,6 +38,16 @@ public:
     void TearDown() override {}
 };
 
+static Block create_int_string_block(const std::vector<int64_t>& keys,
+                                     const std::vector<std::string>& values) {
+    DCHECK_EQ(keys.size(), values.size());
+    auto key_column = ColumnHelper::create_column<DataTypeInt64>(keys);
+    auto value_column = ColumnHelper::create_column<DataTypeString>(values);
+    Block block({ColumnWithTypeAndName(key_column, 
std::make_shared<DataTypeInt64>(), "key"),
+                 ColumnWithTypeAndName(value_column, 
std::make_shared<DataTypeString>(), "value")});
+    return block;
+}
+
 TEST(SortMergerTest, NULL_FIRST_ASC) {
     /**
      * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
@@ -478,4 +493,188 @@ TEST(SortMergerTest, TEST_SINGLE_STREAM) {
     }
 }
 
-} // namespace doris
\ No newline at end of file
+TEST(SortMergerTest, CONTINUE_MERGE_WHEN_NEXT_BLOCK_READY) {
+    const int batch_size = 4;
+    std::vector<std::vector<std::vector<int64_t>>> input_blocks = {{{1}, {3}}, 
{{2}, {4}}};
+    std::vector<size_t> next_block_index(input_blocks.size(), 0);
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = 
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = -1;
+        const int offset = 0;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<BlockSupplier> child_block_suppliers;
+        std::vector<BlockSupplierReadyChecker> ready_checkers;
+        for (size_t child_idx = 0; child_idx < input_blocks.size(); 
child_idx++) {
+            BlockSupplier block_supplier = [&, id = child_idx](Block* block, 
bool* eos) {
+                if (next_block_index[id] >= input_blocks[id].size()) {
+                    *eos = true;
+                    return Status::OK();
+                }
+                *block = ColumnHelper::create_block<DataTypeInt64>(
+                        input_blocks[id][next_block_index[id]++]);
+                *eos = false;
+                return Status::OK();
+            };
+            child_block_suppliers.push_back(block_supplier);
+            ready_checkers.emplace_back([] { return true; });
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers, 
ready_checkers).ok());
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_block = ColumnHelper::create_column<DataTypeInt64>({1, 2, 
3, 4});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_block));
+        EXPECT_EQ(block.rows(), batch_size);
+        EXPECT_FALSE(eos);
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_TRUE(eos);
+    }
+}
+
+TEST(SortMergerTest, CONTINUE_MERGE_WITH_STRING_COLUMN_WHEN_NEXT_BLOCK_READY) {
+    const int batch_size = 4;
+    std::vector<std::vector<Block>> input_blocks;
+    input_blocks.emplace_back(
+            std::vector<Block> {create_int_string_block({1, 3}, {"old-1", 
"old-3"}),
+                                create_int_string_block({5, 7}, {"new-5", 
"new-7"})});
+    input_blocks.emplace_back(
+            std::vector<Block> {create_int_string_block({2, 4}, {"other-2", 
"other-4"})});
+    std::vector<size_t> next_block_index(input_blocks.size(), 0);
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = 
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = -1;
+        const int offset = 0;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<BlockSupplier> child_block_suppliers;
+        std::vector<BlockSupplierReadyChecker> ready_checkers;
+        for (size_t child_idx = 0; child_idx < input_blocks.size(); 
child_idx++) {
+            BlockSupplier block_supplier = [&, id = child_idx](Block* block, 
bool* eos) {
+                if (next_block_index[id] >= input_blocks[id].size()) {
+                    *eos = true;
+                    return Status::OK();
+                }
+                block->swap(input_blocks[id][next_block_index[id]++]);
+                *eos = false;
+                return Status::OK();
+            };
+            child_block_suppliers.push_back(block_supplier);
+            ready_checkers.emplace_back([] { return true; });
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers, 
ready_checkers).ok());
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_key_column = 
ColumnHelper::create_column<DataTypeInt64>({1, 2, 3, 4});
+        auto expect_value_column = ColumnHelper::create_column<DataTypeString>(
+                {"old-1", "other-2", "old-3", "other-4"});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_key_column));
+        EXPECT_TRUE(
+                ColumnHelper::column_equal(block.get_by_position(1).column, 
expect_value_column));
+        EXPECT_EQ(block.rows(), batch_size);
+        EXPECT_FALSE(eos);
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_key_column = 
ColumnHelper::create_column<DataTypeInt64>({5, 7});
+        auto expect_value_column = 
ColumnHelper::create_column<DataTypeString>({"new-5", "new-7"});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_key_column));
+        EXPECT_TRUE(
+                ColumnHelper::column_equal(block.get_by_position(1).column, 
expect_value_column));
+        EXPECT_EQ(block.rows(), 2);
+        EXPECT_FALSE(eos);
+    }
+}
+
+TEST(SortMergerTest, KEEP_PENDING_CURSOR_WHEN_NEXT_BLOCK_NOT_READY) {
+    const int batch_size = 4;
+    std::vector<std::vector<std::vector<int64_t>>> input_blocks = {{{1}, {3}}, 
{{2}, {4}}};
+    std::vector<size_t> next_block_index(input_blocks.size(), 0);
+    std::vector<bool> ready(input_blocks.size(), false);
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = 
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = -1;
+        const int offset = 0;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<BlockSupplier> child_block_suppliers;
+        std::vector<BlockSupplierReadyChecker> ready_checkers;
+        for (size_t child_idx = 0; child_idx < input_blocks.size(); 
child_idx++) {
+            BlockSupplier block_supplier = [&, id = child_idx](Block* block, 
bool* eos) {
+                if (next_block_index[id] >= input_blocks[id].size()) {
+                    *eos = true;
+                    return Status::OK();
+                }
+                *block = ColumnHelper::create_block<DataTypeInt64>(
+                        input_blocks[id][next_block_index[id]++]);
+                *eos = false;
+                return Status::OK();
+            };
+            child_block_suppliers.push_back(block_supplier);
+            ready_checkers.emplace_back([&, id = child_idx] { return 
ready[id]; });
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers, 
ready_checkers).ok());
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_block = ColumnHelper::create_column<DataTypeInt64>({1});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_block));
+        EXPECT_EQ(block.rows(), 1);
+        EXPECT_FALSE(eos);
+    }
+    {
+        ready[0] = true;
+        ready[1] = true;
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_block = ColumnHelper::create_column<DataTypeInt64>({2, 3, 
4});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_block));
+        EXPECT_EQ(block.rows(), 3);
+        EXPECT_FALSE(eos);
+    }
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_TRUE(eos);
+    }
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to