This is an automated email from the ASF dual-hosted git repository.

liutang123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63e90d34e4b [fix](compaction) Fix incorrect memory availability check 
in RowSourceBuffer during vertical compaction (#63152)
63e90d34e4b is described below

commit 63e90d34e4bd076e3552262a4943981e06dc1bec
Author: Lijia Liu <[email protected]>
AuthorDate: Thu May 14 09:30:59 2026 +0800

    [fix](compaction) Fix incorrect memory availability check in 
RowSourceBuffer during vertical compaction (#63152)
    
    Exception Log:
    ```
    thread_mem_tracker_mgr.h:248] alloc large memory: 4294967296, not in query 
or load, this is just a warning, not prevent memory alloc, stacktrace:
    
            0#  doris::ThreadMemTrackerMgr::consume(long, int)
            1#  Allocator<false, false, false, 
DefaultMemoryAllocator>::realloc_impl(void*, unsigned long, unsigned long, 
unsigned long)
            2#  void doris::vectorized::PODArrayBase<2ul, 4096ul, 
Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 
15ul>::reserve_for_next_size<>()
            3#  
doris::vectorized::RowSourcesBuffer::append(std::vector<doris::vectorized::RowSource,
 std::allocator<doris::vectorized::RowSource> > const&)
            4#  
doris::vectorized::VerticalHeapMergeIterator::next_batch(doris::vectorized::Block*)
            5#  
doris::vectorized::VerticalBlockReader::_direct_next_block(doris::vectorized::Block*,
 bool*)
            6#  
doris::vectorized::VerticalBlockReader::next_block_with_aggregation(doris::vectorized::Block*,
 bool*)
            7#  
doris::Merger::vertical_compact_one_group(std::shared_ptr<doris::BaseTablet>, 
doris::ReaderType, doris::TabletSchema const&, bool, std::vector<unsigned int, 
std::allocator<unsigned int> > const&, doris::vectorized::RowSourcesBuffer*, 
std::vector<std::shared_ptr<doris::RowsetReader>, 
std::allocator<std::shared_ptr<doris::RowsetReader> > > const&, 
doris::RowsetWriter*, long, doris::Merger::Statistics*, std::vector<unsigned 
int, std::allocator<unsigned int> >, long, doris::Co [...]
            8#  
doris::Merger::vertical_merge_rowsets(std::shared_ptr<doris::BaseTablet>, 
doris::ReaderType, doris::TabletSchema const&, 
std::vector<std::shared_ptr<doris::RowsetReader>, 
std::allocator<std::shared_ptr<doris::RowsetReader> > > const&, 
doris::RowsetWriter*, long, long, doris::Merger::Statistics*)
            9#  doris::Compaction::merge_input_rowsets()
            10# doris::CloudCompactionMixin::execute_compact_impl(long)
            11# doris::CloudCompactionMixin::execute_compact()
            12# doris::CloudCumulativeCompaction::execute_compact()
            13# std::_Function_handler<void (), 
doris::CloudStorageEngine::_submit_cumulative_compaction_task(std::shared_ptr<doris::CloudTablet>
 const&)::$_2>::_M_invoke(std::_Any_data const&)
            14# doris::ThreadPool::dispatch_thread()
            15# doris::Thread::supervise_thread(void*)
            16# ?
            17# ?
    ```
    Reason: PaddedPODArray's `allocated_bytes` includes pad_left and
    pad_right, which are NOT usable for storing elements.
    
    Co-authored-by: liutang123 <[email protected]>
---
 .../storage/iterator/vertical_merge_iterator.cpp   | 10 ++-
 .../compaction/vertical_compaction_test.cpp        | 77 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 2 deletions(-)

diff --git a/be/src/storage/iterator/vertical_merge_iterator.cpp 
b/be/src/storage/iterator/vertical_merge_iterator.cpp
index a340adcd3f7..9aedae0b1be 100644
--- a/be/src/storage/iterator/vertical_merge_iterator.cpp
+++ b/be/src/storage/iterator/vertical_merge_iterator.cpp
@@ -69,8 +69,14 @@ uint16_t RowSource::data() const {
 Status RowSourcesBuffer::append(const std::vector<RowSource>& row_sources) {
     if (_buffer.allocated_bytes() + row_sources.size() * sizeof(UInt16) >
         config::vertical_compaction_max_row_source_memory_mb * 1024 * 1024) {
-        if (_buffer.allocated_bytes() - _buffer.size() * sizeof(UInt16) <
-            row_sources.size() * sizeof(UInt16)) {
+        // Use capacity() - size() to get the truly available element slots.
+        // Note: PODArrayBase::allocated_bytes() includes pad_left and 
pad_right,
+        // which are NOT usable for storing elements. Using allocated_bytes() 
here
+        // would over-estimate the available space and lead to a missed spill,
+        // causing _buffer to grow beyond the configured memory limit when
+        // push_back triggers reallocation below.
+        size_t available_slots = _buffer.capacity() - _buffer.size();
+        if (available_slots < row_sources.size()) {
             VLOG_NOTICE << "RowSourceBuffer is too large, serialize and reset 
buffer: "
                         << _buffer.allocated_bytes() << ", total size: " << 
_total_size;
             // serialize current buffer
diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp 
b/be/test/storage/compaction/vertical_compaction_test.cpp
index 3b736857242..9623f230dbb 100644
--- a/be/test/storage/compaction/vertical_compaction_test.cpp
+++ b/be/test/storage/compaction/vertical_compaction_test.cpp
@@ -435,6 +435,83 @@ TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) {
     }
 }
 
+// Regression test for RowSourcesBuffer::append spill threshold.
+//
+// Background:
+// PaddedPODArray::allocated_bytes() returns the total allocated memory which
+// INCLUDES pad_left and pad_right. These padding bytes are NOT usable for
+// storing elements. Earlier, append() used `allocated_bytes() - size*sizeof`
+// as "available room" to decide whether to skip spilling. This over-estimates
+// the truly usable space (by pad_left + pad_right bytes), so when the buffer
+// has already crossed the configured memory limit, append() may incorrectly
+// decide that the upcoming push_back will fit without reallocation, skip the
+// spill, and then push_back triggers a reallocation that doubles the buffer,
+// exceeding the configured `vertical_compaction_max_row_source_memory_mb`.
+//
+// This test simulates the case by setting a very small memory limit (1 MB) and
+// repeatedly appending row sources. After the first time the buffer crosses
+// the limit, the next append must trigger a spill (file write + reset) instead
+// of silently growing the in-memory buffer beyond the limit.
+TEST_F(VerticalCompactionTest, TestRowSourcesBufferSpillThreshold) {
+    // 1 MB limit (set in SetUp as well, but make it explicit here).
+    config::vertical_compaction_max_row_source_memory_mb = 1;
+    const size_t mem_limit_bytes =
+            
static_cast<size_t>(config::vertical_compaction_max_row_source_memory_mb) * 
1024 * 1024;
+
+    RowSourcesBuffer buffer(200, absolute_dir, 
ReaderType::READER_CUMULATIVE_COMPACTION);
+
+    // Build a batch of row sources. Use a moderate batch size so that the
+    // buffer's allocated_bytes() can become very close to the limit before
+    // a single append crosses it.
+    constexpr size_t kBatchSize = 4096;
+    std::vector<RowSource> batch;
+    batch.reserve(kBatchSize);
+    for (size_t i = 0; i < kBatchSize; ++i) {
+        batch.emplace_back(static_cast<uint16_t>(i % 8), false);
+    }
+
+    // Total elements that fit in the memory limit (a safe upper bound).
+    // Each element is 2 bytes (UInt16), so ~512K elements per MB.
+    const size_t total_appends = (mem_limit_bytes / sizeof(uint16_t)) * 4 / 
kBatchSize + 8;
+
+    size_t expected_total = 0;
+    for (size_t i = 0; i < total_appends; ++i) {
+        ASSERT_TRUE(buffer.append(batch).ok());
+        expected_total += kBatchSize;
+
+        // Invariant: in-memory buffered_size() must never exceed what the
+        // memory limit allows (in elements). Otherwise the spill logic is
+        // broken (the bug described above).
+        // Allow a small slack equal to one batch because the spill check is
+        // performed BEFORE the push_back that crosses the threshold.
+        size_t buffered_elems = buffer.buffered_size();
+        size_t buffered_bytes = buffered_elems * sizeof(uint16_t);
+        // After each append, buffered_bytes should be <= mem_limit + one 
batch size.
+        // It must NOT grow unboundedly (e.g., 2x of the limit due to PODArray
+        // reallocation that the buggy version would allow).
+        EXPECT_LE(buffered_bytes, mem_limit_bytes + kBatchSize * 
sizeof(uint16_t))
+                << "RowSourcesBuffer in-memory size exceeded the configured 
limit, "
+                << "spill threshold logic is broken. iter=" << i
+                << ", buffered_elems=" << buffered_elems;
+    }
+
+    EXPECT_EQ(buffer.total_size(), expected_total);
+
+    // Make sure data is persisted and can be read back correctly.
+    ASSERT_TRUE(buffer.flush().ok());
+    ASSERT_TRUE(buffer.seek_to_begin().ok());
+
+    size_t read_back = 0;
+    while (buffer.has_remaining().ok()) {
+        // Verify that the source num matches the pattern we wrote.
+        auto cur = buffer.current().get_source_num();
+        EXPECT_EQ(cur, (read_back % kBatchSize) % 8);
+        buffer.advance(1);
+        ++read_back;
+    }
+    EXPECT_EQ(read_back, expected_total);
+}
+
 TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
     auto num_input_rowset = 2;
     auto num_segments = 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to