This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new c562af3ff ORC-2002: [C++] Improve prefetching stripe
c562af3ff is described below

commit c562af3ff48f147f37529107752744298a49565e
Author: Gang Wu <[email protected]>
AuthorDate: Mon Sep 22 10:17:09 2025 +0800

    ORC-2002: [C++] Improve prefetching stripe
    
    ### What changes were proposed in this pull request?
    
    - Fix prefetching when small stripe limit is 0 to prefetch current stripe 
data and next stripe footer.
    - Prefetch next small stripe if current stripe is currently the last small 
stripe that cached fully.
    - Prefetch and coalesce ranges of index.
    - Refactor code to try read from cache and then fall back to original 
stream.
    
    ### Why are the changes needed?
    
    To make stripe prefetching more effective.
    
    ### How was this patch tested?
    
    Added test case for small stripe limit 0. Other fixes are verified manually 
due to not easy to reflect in the test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #2409 from wgtmac/prefetch_small_stripe.
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 c++/src/Reader.cc      | 166 +++++++++++++++++++++++++++++--------------------
 c++/test/TestReader.cc |  36 ++++++++---
 2 files changed, 124 insertions(+), 78 deletions(-)

diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 1cf6940c8..ab4c5047d 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -27,7 +27,6 @@
 #include "wrap/coded-stream-wrapper.h"
 
 #include <algorithm>
-#include <iostream>
 #include <iterator>
 #include <memory>
 #include <set>
@@ -246,6 +245,47 @@ namespace orc {
     buildTypeNameIdMap(contents_->schema.get());
   }
 
+  static const std::unordered_set<proto::Stream_Kind> DATA_STREAM_KINDS = {
+      proto::Stream_Kind_DATA, proto::Stream_Kind_DICTIONARY_DATA, 
proto::Stream_Kind_PRESENT,
+      proto::Stream_Kind_LENGTH, proto::Stream_Kind_SECONDARY};
+
+  static const std::unordered_set<proto::Stream_Kind> INDEX_STREAM_KINDS = {
+      proto::Stream_Kind_ROW_INDEX, proto::Stream_Kind_BLOOM_FILTER_UTF8};
+
+  std::vector<ReadRange> extractReadRangesForStripe(
+      uint64_t stripeIndex, const proto::StripeInformation& stripeInfo,
+      const proto::StripeFooter& stripeFooter, const std::vector<bool>& 
selectedColumns,
+      const std::unordered_set<proto::Stream_Kind>& allowedKinds = 
DATA_STREAM_KINDS) {
+    std::vector<ReadRange> ranges;
+
+    uint64_t stripeFooterStart =
+        stripeInfo.offset() + stripeInfo.index_length() + 
stripeInfo.data_length();
+    uint64_t offset = stripeInfo.offset();
+
+    for (int i = 0; i < stripeFooter.streams_size(); i++) {
+      const proto::Stream& stream = stripeFooter.streams(i);
+      if (offset + stream.length() > stripeFooterStart) {
+        std::stringstream msg;
+        msg << "Malformed stream meta at stream index " << i << " in stripe " 
<< stripeIndex
+            << ": streamOffset=" << offset << ", streamLength=" << 
stream.length()
+            << ", stripeOffset=" << stripeInfo.offset()
+            << ", stripeIndexLength=" << stripeInfo.index_length()
+            << ", stripeDataLength=" << stripeInfo.data_length();
+        throw ParseError(msg.str());
+      }
+
+      if (stream.has_kind() && selectedColumns[stream.column()]) {
+        if (allowedKinds.find(stream.kind()) != allowedKinds.cend()) {
+          ranges.emplace_back(offset, stream.length());
+        }
+      }
+
+      offset += stream.length();
+    }
+
+    return ranges;
+  }
+
   RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const 
RowReaderOptions& opts)
       : localTimezone_(getLocalTimezone()),
         contents_(contents),
@@ -442,6 +482,15 @@ namespace orc {
     rowIndexes_.clear();
     bloomFilterIndex_.clear();
 
+    if (enableAsyncPrefetch_ &&
+        fullyCachedStripes_.find(currentStripe_) == fullyCachedStripes_.end()) 
{
+      // Cache required ranges of index which are usually very small
+      auto ranges =
+          extractReadRangesForStripe(currentStripe_, currentStripeInfo_, 
currentStripeFooter_,
+                                     selectedColumns_, INDEX_STREAM_KINDS);
+      contents_->cacheRanges(std::move(ranges));
+    }
+
     // obtain row indexes for selected columns
     uint64_t offset = currentStripeInfo_.offset();
     for (int i = 0; i < currentStripeFooter_.streams_size(); ++i) {
@@ -450,11 +499,24 @@ namespace orc {
       if (selectedColumns_[colId] && pbStream.has_kind() &&
           (pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
            pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
-        std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
-            getCompression(),
-            std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
-                contents_->stream.get(), offset, pbStream.length(), 
*contents_->pool)),
-            getCompressionSize(), *contents_->pool, contents_->readerMetrics);
+        std::unique_ptr<SeekableInputStream> inStream;
+        BufferSlice slice;
+
+        {
+          std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+          if (contents_->readCache) {
+            slice = contents_->readCache->read(ReadRange(offset, 
pbStream.length()));
+          }
+        }
+        if (slice.buffer) {
+          inStream = 
std::make_unique<SeekableArrayInputStream>(slice.buffer->data() + slice.offset,
+                                                                slice.length);
+        } else {
+          inStream = 
std::make_unique<SeekableFileInputStream>(contents_->stream.get(), offset,
+                                                               
pbStream.length(), *contents_->pool);
+        }
+        inStream = createDecompressor(getCompression(), std::move(inStream), 
getCompressionSize(),
+                                      *contents_->pool, 
contents_->readerMetrics);
 
         if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
           proto::RowIndex rowIndex;
@@ -526,32 +588,23 @@ namespace orc {
     uint64_t stripeFooterLength = info.footer_length();
 
     std::unique_ptr<SeekableInputStream> pbStream;
+    BufferSlice slice;
 
-    // Try to read from cache first
     {
       std::lock_guard<std::mutex> lock(contents.readCacheMutex);
       if (contents.readCache) {
-        ReadRange footerRange(stripeFooterStart, stripeFooterLength);
-        BufferSlice cachedData = contents.readCache->read(footerRange);
-        if (cachedData.buffer != nullptr) {
-          // Create a stream from cached data
-          pbStream = createDecompressor(
-              contents.compression,
-              std::make_unique<SeekableArrayInputStream>(
-                  cachedData.buffer->data() + cachedData.offset, 
cachedData.length),
-              contents.blockSize, *contents.pool, contents.readerMetrics);
-        }
+        slice = contents.readCache->read(ReadRange(stripeFooterStart, 
stripeFooterLength));
       }
     }
-
-    // Fall back to reading from disk if not in cache
-    if (!pbStream) {
-      pbStream = createDecompressor(
-          contents.compression,
-          std::make_unique<SeekableFileInputStream>(contents.stream.get(), 
stripeFooterStart,
-                                                    stripeFooterLength, 
*contents.pool),
-          contents.blockSize, *contents.pool, contents.readerMetrics);
+    if (slice.buffer) {
+      pbStream = 
std::make_unique<SeekableArrayInputStream>(slice.buffer->data() + slice.offset,
+                                                            slice.length);
+    } else {
+      pbStream = 
std::make_unique<SeekableFileInputStream>(contents.stream.get(), 
stripeFooterStart,
+                                                           stripeFooterLength, 
*contents.pool);
     }
+    pbStream = createDecompressor(contents.compression, std::move(pbStream), 
contents.blockSize,
+                                  *contents.pool, contents.readerMetrics);
 
     proto::StripeFooter result;
     if (!result.ParseFromZeroCopyStream(pbStream.get())) {
@@ -567,43 +620,6 @@ namespace orc {
     return result;
   }
 
-  std::vector<ReadRange> extractReadRangesForStripe(uint64_t stripeIndex,
-                                                    const 
proto::StripeInformation& stripeInfo,
-                                                    const proto::StripeFooter& 
stripeFooter,
-                                                    const std::vector<bool>& 
selectedColumns) {
-    std::vector<ReadRange> ranges;
-
-    uint64_t stripeFooterStart =
-        stripeInfo.offset() + stripeInfo.index_length() + 
stripeInfo.data_length();
-    uint64_t offset = stripeInfo.offset();
-
-    for (int i = 0; i < stripeFooter.streams_size(); i++) {
-      const proto::Stream& stream = stripeFooter.streams(i);
-      if (offset + stream.length() > stripeFooterStart) {
-        std::stringstream msg;
-        msg << "Malformed stream meta at stream index " << i << " in stripe " 
<< stripeIndex
-            << ": streamOffset=" << offset << ", streamLength=" << 
stream.length()
-            << ", stripeOffset=" << stripeInfo.offset()
-            << ", stripeIndexLength=" << stripeInfo.index_length()
-            << ", stripeDataLength=" << stripeInfo.data_length();
-        throw ParseError(msg.str());
-      }
-
-      if (stream.has_kind() && selectedColumns[stream.column()]) {
-        const auto& kind = stream.kind();
-        if (kind == proto::Stream_Kind_DATA || kind == 
proto::Stream_Kind_DICTIONARY_DATA ||
-            kind == proto::Stream_Kind_PRESENT || kind == 
proto::Stream_Kind_LENGTH ||
-            kind == proto::Stream_Kind_SECONDARY) {
-          ranges.emplace_back(offset, stream.length());
-        }
-      }
-
-      offset += stream.length();
-    }
-
-    return ranges;
-  }
-
   ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> contents, const 
ReaderOptions& opts,
                          uint64_t fileLength, uint64_t postscriptLength)
       : contents_(std::move(contents)),
@@ -1173,13 +1189,10 @@ namespace orc {
       if (enableAsyncPrefetch_) {
         contents_->evictCache(currentStripeInfo_.offset());
 
-        if (fullyCachedStripes_.find(currentStripe_) != 
fullyCachedStripes_.cend()) {
-          // Current stripe has been fully cached, do nothing.
-        } else if (isSmallStripe(currentStripeInfo_, 
contents_->cacheOptions.rangeSizeLimit)) {
+        auto extractSmallStripeRanges = [this](uint64_t startStripe) {
           std::vector<ReadRange> ranges;
-          uint64_t maxStripe =
-              std::min(lastStripe_, currentStripe_ + 
smallStripeLookAheadLimit_ + 1);
-          for (uint64_t stripe = currentStripe_; stripe < maxStripe; stripe++) 
{
+          uint64_t maxStripe = std::min(lastStripe_, startStripe + 
smallStripeLookAheadLimit_ + 1);
+          for (uint64_t stripe = startStripe; stripe < maxStripe; stripe++) {
             const auto& stripeInfo = 
footer_->stripes(static_cast<int>(stripe));
             if (!isSmallStripe(stripeInfo, 
contents_->cacheOptions.rangeSizeLimit)) {
               break;
@@ -1187,7 +1200,22 @@ namespace orc {
             ranges.push_back(ReadRange{stripeInfo.offset(), 
getStripeSize(stripeInfo)});
             fullyCachedStripes_.insert(stripe);
           }
-          contents_->cacheRanges(std::move(ranges));
+          return ranges;
+        };
+
+        if (fullyCachedStripes_.find(currentStripe_) != 
fullyCachedStripes_.cend()) {
+          // Current stripe has been fully cached, only prefetch next (small) 
stripe if not cached
+          auto nextStripe = currentStripe_ + 1;
+          if (nextStripe < lastStripe_ &&
+              fullyCachedStripes_.find(nextStripe) == 
fullyCachedStripes_.cend()) {
+            std::vector<ReadRange> ranges = 
extractSmallStripeRanges(nextStripe);
+            if (!ranges.empty()) {
+              contents_->cacheRanges(std::move(ranges));
+            }
+          }
+        } else if (smallStripeLookAheadLimit_ > 0 &&
+                   isSmallStripe(currentStripeInfo_, 
contents_->cacheOptions.rangeSizeLimit)) {
+          contents_->cacheRanges(extractSmallStripeRanges(currentStripe_));
         } else {
           // This is very coarse since I/O ranges of all selected columns are 
about to prefetch.
           // We can further evaluate index stream with knowledge of pruned row 
groups to issue
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 12931a5eb..ab6e5ac66 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -1146,6 +1146,7 @@ namespace orc {
     uint64_t noPrefetchIOCount = 0;
     uint64_t smallLimitIOCount = 0;
     uint64_t largeLimitIOCount = 0;
+    uint64_t zeroLimitIOCount = 0;
 
     // Test 1: No async prefetch - should have most I/O operations
     {
@@ -1154,7 +1155,7 @@ namespace orc {
       auto* countingPtr = countingStream.get();
 
       ReaderOptions readerOptions;
-      std::unique_ptr<Reader> reader = createReader(std::move(countingStream), 
readerOptions);
+      auto reader = createReader(std::move(countingStream), readerOptions);
 
       RowReaderOptions rowReaderOptions;
       rowReaderOptions.setEnableAsyncPrefetch(false);
@@ -1166,8 +1167,6 @@ namespace orc {
       noPrefetchIOCount = countingPtr->getReadCount();
       EXPECT_EQ(readRows, totalRows);
       EXPECT_GT(noPrefetchIOCount, 0UL);
-
-      std::cout << "No async prefetch I/O count: " << noPrefetchIOCount << 
std::endl;
     }
 
     // Test 2: Async prefetch with small look ahead limit
@@ -1177,7 +1176,7 @@ namespace orc {
       auto* countingPtr = countingStream.get();
 
       ReaderOptions readerOptions;
-      std::unique_ptr<Reader> reader = createReader(std::move(countingStream), 
readerOptions);
+      auto reader = createReader(std::move(countingStream), readerOptions);
 
       RowReaderOptions rowReaderOptions;
       rowReaderOptions.setEnableAsyncPrefetch(true);
@@ -1190,8 +1189,6 @@ namespace orc {
       smallLimitIOCount = countingPtr->getReadCount();
       EXPECT_EQ(readRows, totalRows);
       EXPECT_GT(smallLimitIOCount, 0UL);
-
-      std::cout << "Small limit (1) prefetch I/O count: " << smallLimitIOCount 
<< std::endl;
     }
 
     // Test 3: Async prefetch with large look ahead limit
@@ -1201,7 +1198,7 @@ namespace orc {
       auto* countingPtr = countingStream.get();
 
       ReaderOptions readerOptions;
-      std::unique_ptr<Reader> reader = createReader(std::move(countingStream), 
readerOptions);
+      auto reader = createReader(std::move(countingStream), readerOptions);
 
       RowReaderOptions rowReaderOptions;
       rowReaderOptions.setEnableAsyncPrefetch(true);
@@ -1214,11 +1211,32 @@ namespace orc {
       largeLimitIOCount = countingPtr->getReadCount();
       EXPECT_EQ(readRows, totalRows);
       EXPECT_GT(largeLimitIOCount, 0UL);
+    }
+
+    // Test 4: Async prefetch with zero look ahead limit
+    {
+      auto countingStream = std::make_unique<IOCountingInputStream>(
+          std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()));
+      auto* countingPtr = countingStream.get();
+
+      ReaderOptions readerOptions;
+      auto reader = createReader(std::move(countingStream), readerOptions);
+
+      RowReaderOptions rowReaderOptions;
+      rowReaderOptions.setEnableAsyncPrefetch(true);
+      rowReaderOptions.setSmallStripeLookAheadLimit(0);  // Zero limit
+      auto rowReader = reader->createRowReader(rowReaderOptions);
 
-      std::cout << "Large limit (3) prefetch I/O count: " << largeLimitIOCount 
<< std::endl;
+      countingPtr->resetReadCount();
+      uint64_t readRows = readAllRows(*rowReader);
+
+      zeroLimitIOCount = countingPtr->getReadCount();
+      EXPECT_EQ(readRows, totalRows);
+      EXPECT_GT(zeroLimitIOCount, 0UL);
     }
 
-    EXPECT_LT(smallLimitIOCount, noPrefetchIOCount);
+    EXPECT_LT(zeroLimitIOCount, noPrefetchIOCount);
+    EXPECT_LT(smallLimitIOCount, zeroLimitIOCount);
     EXPECT_LT(largeLimitIOCount, smallLimitIOCount);
   }
 

Reply via email to