This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit da72d3fc95d515be9c3fffe7c5a01519de06656c
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Sep 6 22:45:31 2023 +0800

    [opt](MergedIO) optimize merge small IO, prevent amplified read (#23849)
    
    There were two vulnerabilities in the previous 
fix(https://github.com/apache/doris/pull/20305):
    1. `next_content` may not necessarily be a truly readable range
    2. The last range of the merged data may be the hollow
    
    This PR fundamentally solves the problem of reading amplification by 
rechecking the calculation range. According to the algorithm, there is only one 
possibility of generating read amplification, with only a small content of data 
within the 4k(`MIN_READ_SIZE `) range. However, 4k is generally the minimum IO 
size and there is no need for further segmentation.
---
 be/src/io/fs/buffered_reader.cpp       | 49 ++++++++++++++++++++++++----------
 be/src/io/fs/buffered_reader.h         | 21 ++++++++-------
 be/test/io/fs/buffered_reader_test.cpp | 45 +++++++++++++++++++++++++++++++
 3 files changed, 92 insertions(+), 23 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 00f88c7515..2a7187cc28 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -49,7 +49,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
     if (result.size == 0) {
         return Status::OK();
     }
-    int range_index = _search_read_range(offset, offset + result.size);
+    const int range_index = _search_read_range(offset, offset + result.size);
     if (range_index < 0) {
         SCOPED_RAW_TIMER(&_statistics.read_time);
         Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
@@ -99,6 +99,8 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
     // merge small IO
     size_t merge_start = offset + has_read;
     const size_t merge_end = merge_start + READ_SLICE_SIZE;
+    // <slice_size, is_content>
+    std::vector<std::pair<size_t, bool>> merged_slice;
     size_t content_size = 0;
     size_t hollow_size = 0;
     if (merge_start > _random_access_ranges[range_index].end_offset) {
@@ -118,12 +120,14 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
             size_t add_content = std::min(merge_end - merge_start, 
content_max);
             content_size += add_content;
             merge_start += add_content;
+            merged_slice.emplace_back(add_content, true);
             break;
         }
         size_t add_content =
                 std::min(_random_access_ranges[merge_index].end_offset - 
merge_start, content_max);
         content_size += add_content;
         merge_start += add_content;
+        merged_slice.emplace_back(add_content, true);
         if (merge_start != _random_access_ranges[merge_index].end_offset) {
             break;
         }
@@ -136,18 +140,9 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
             }
             if (gap < merge_end - merge_start && content_size < _remaining &&
                 !_range_cached_data[merge_index + 1].has_read) {
-                size_t next_content =
-                        std::min(_random_access_ranges[merge_index + 
1].end_offset, merge_end) -
-                        _random_access_ranges[merge_index + 1].start_offset;
-                next_content = std::min(next_content, _remaining - 
content_size);
-                double amplified_ratio = config::max_amplified_read_ratio;
-                if ((content_size + hollow_size) > MIN_READ_SIZE &&
-                    (hollow_size + gap) > (next_content + content_size) * 
amplified_ratio) {
-                    // too large gap
-                    break;
-                }
                 hollow_size += gap;
                 merge_start = _random_access_ranges[merge_index + 
1].start_offset;
+                merged_slice.emplace_back(gap, false);
             } else {
                 // there's no enough memory to read hollow data
                 break;
@@ -155,7 +150,33 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
         }
         merge_index++;
     }
-    if (content_size + hollow_size == to_read) {
+    content_size = 0;
+    hollow_size = 0;
+    double amplified_ratio = config::max_amplified_read_ratio;
+    std::vector<std::pair<double, size_t>> ratio_and_size;
+    // Calculate the read amplified ratio for each merge operation and the 
size of the merged data.
+    // Find the largest size of the merged data whose amplified ratio is less 
than config::max_amplified_read_ratio
+    for (const std::pair<size_t, bool>& slice : merged_slice) {
+        if (slice.second) {
+            content_size += slice.first;
+            if (slice.first > 0) {
+                ratio_and_size.emplace_back((double)hollow_size / content_size,
+                                            content_size + hollow_size);
+            }
+        } else {
+            hollow_size += slice.first;
+        }
+    }
+    size_t best_merged_size = 0;
+    for (const std::pair<double, size_t>& rs : ratio_and_size) {
+        if (rs.second > best_merged_size) {
+            if (rs.first < amplified_ratio || rs.second <= MIN_READ_SIZE) {
+                best_merged_size = rs.second;
+            }
+        }
+    }
+
+    if (best_merged_size == to_read) {
         // read directly to avoid copy operation
         SCOPED_RAW_TIMER(&_statistics.read_time);
         size_t read_size = 0;
@@ -170,8 +191,8 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t* b
 
     merge_start = offset + has_read;
     size_t merge_read_size = 0;
-    RETURN_IF_ERROR(_fill_box(range_index, merge_start, content_size + 
hollow_size,
-                              &merge_read_size, io_ctx));
+    RETURN_IF_ERROR(
+            _fill_box(range_index, merge_start, best_merged_size, 
&merge_read_size, io_ctx));
     if (cached_data.start_offset != merge_start) {
         return Status::IOError("Wrong start offset in merged IO");
     }
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 25a6811330..84235f0a46 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -75,6 +75,15 @@ struct PrefetchRange {
  */
 class MergeRangeFileReader : public io::FileReader {
 public:
+    struct Statistics {
+        int64_t copy_time = 0;
+        int64_t read_time = 0;
+        int64_t request_io = 0;
+        int64_t merged_io = 0;
+        int64_t request_bytes = 0;
+        int64_t read_bytes = 0;
+    };
+
     struct RangeCachedData {
         size_t start_offset;
         size_t end_offset;
@@ -190,20 +199,14 @@ public:
     // for test only
     const std::vector<int16>& box_reference() const { return _box_ref; }
 
+    // for test only
+    const Statistics& statistics() const { return _statistics; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
 private:
-    struct Statistics {
-        int64_t copy_time = 0;
-        int64_t read_time = 0;
-        int64_t request_io = 0;
-        int64_t merged_io = 0;
-        int64_t request_bytes = 0;
-        int64_t read_bytes = 0;
-    };
-
     RuntimeProfile::Counter* _copy_time;
     RuntimeProfile::Counter* _read_time;
     RuntimeProfile::Counter* _request_io;
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index 6a281e125f..97ef217136 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -270,6 +270,51 @@ TEST_F(BufferedReaderTest, test_miss) {
     EXPECT_EQ(45, bytes_read);
 }
 
+TEST_F(BufferedReaderTest, test_read_amplify) {
+    size_t kb = 1024;
+    io::FileReaderSPtr offset_reader = 
std::make_shared<MockOffsetFileReader>(2048 * kb); // 2MB
+    std::vector<io::PrefetchRange> random_access_ranges;
+    random_access_ranges.emplace_back(0, 1 * kb); // column0
+    // if read the follow slice, amplified_ratio = 1, but data size <= 
MIN_READ_SIZE
+    random_access_ranges.emplace_back(3 * kb, 4 * kb); // column1
+    // if read the follow slice, amplified_ratio = 1,
+    // but merge the next rand, amplified_ratio will be decreased
+    random_access_ranges.emplace_back(5 * kb, 6 * kb);  // column2
+    random_access_ranges.emplace_back(7 * kb, 12 * kb); // column3
+    // read the last range first, so we can't merge the last range when 
reading the former ranges,
+    // even if the amplified_ratio < 0.8
+    random_access_ranges.emplace_back(512 * kb, 2048 * kb); // column4
+
+    io::MergeRangeFileReader merge_reader(nullptr, offset_reader, 
random_access_ranges);
+    char data[2048 * kb]; // 2MB
+    Slice result(data, 2048 * kb);
+    size_t bytes_read = 0;
+
+    // read column4
+    result.size = 1024 * kb;
+    merge_reader.read_at(1024 * kb, result, &bytes_read, nullptr);
+    EXPECT_EQ(bytes_read, 1024 * kb);
+    EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb);
+    EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb);
+    // read column0
+    result.size = 1 * kb;
+    // will merge column 0 ~ 3
+    merge_reader.read_at(0, result, &bytes_read, nullptr);
+    EXPECT_EQ(bytes_read, 1 * kb);
+    EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
+    // read column1
+    result.size = 1 * kb;
+    merge_reader.read_at(3 * kb, result, &bytes_read, nullptr);
+    // read column2
+    result.size = 1 * kb;
+    merge_reader.read_at(5 * kb, result, &bytes_read, nullptr);
+    // read column3
+    result.size = 5 * kb;
+    merge_reader.read_at(7 * kb, result, &bytes_read, nullptr);
+    EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb);
+    EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
+}
+
 TEST_F(BufferedReaderTest, test_merged_io) {
     io::FileReaderSPtr offset_reader =
             std::make_shared<MockOffsetFileReader>(128 * 1024 * 1024); // 128MB


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to