This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new b98d0e21f ORC-1969: [C++] Support async I/O prefetch of next stripe
b98d0e21f is described below

commit b98d0e21f73b9ce91cc5ee270d7a8f127f19d708
Author: Gang Wu <[email protected]>
AuthorDate: Mon Aug 11 15:56:53 2025 +0800

    ORC-1969: [C++] Support async I/O prefetch of next stripe
    
    ### What changes were proposed in this pull request?
    
    Add a reader option to support prefetch next stripe automatically.
    
    ### Why are the changes needed?
    
    The C++ reader has added two functions (preBuffer and releaseBuffer) to 
support manual async I/O prefetch. This is not easy to use since it requires 
manual control of I/O operations.
    
    ### How was this patch tested?
    
    Added test cases to verify that async prefetch is triggered and worked as 
expected.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #2353 from wgtmac/async_prefetch.
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 c++/include/orc/Reader.hh |  10 +++
 c++/src/Options.hh        |  13 ++++
 c++/src/Reader.cc         | 132 +++++++++++++++++++-----------------
 c++/src/Reader.hh         |   8 +++
 c++/test/TestReader.cc    | 168 ++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 270 insertions(+), 61 deletions(-)

diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index e9f420f11..46b3c3983 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -388,6 +388,16 @@ namespace orc {
      * Whether reader throws or returns null when value overflows for schema 
evolution.
      */
     bool getThrowOnSchemaEvolutionOverflow() const;
+
+    /**
+     * Set whether to enable async I/O prefetch of next stripe.
+     */
+    RowReaderOptions& setEnableAsyncPrefetch(bool enable);
+
+    /**
+     * Whether to enable async I/O prefetch of next stripe.
+     */
+    bool getEnableAsyncPrefetch() const;
   };
 
   class RowReader;
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 0a4bd56d8..2dca5e7f5 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -25,6 +25,7 @@
 
 #include "io/Cache.hh"
 
+#include <iostream>
 #include <limits>
 
 namespace orc {
@@ -153,6 +154,7 @@ namespace orc {
     bool useTightNumericVector;
     std::shared_ptr<Type> readType;
     bool throwOnSchemaEvolutionOverflow;
+    bool enableAsyncPrefetch;
 
     RowReaderOptionsPrivate() {
       selection = ColumnSelection_NONE;
@@ -164,6 +166,7 @@ namespace orc {
       readerTimezone = "GMT";
       useTightNumericVector = false;
       throwOnSchemaEvolutionOverflow = false;
+      enableAsyncPrefetch = false;
     }
   };
 
@@ -338,6 +341,16 @@ namespace orc {
   std::shared_ptr<Type>& RowReaderOptions::getReadType() const {
     return privateBits_->readType;
   }
+
+  RowReaderOptions& RowReaderOptions::setEnableAsyncPrefetch(bool enable) {
+    privateBits_->enableAsyncPrefetch = enable;
+    return *this;
+  }
+
+  bool RowReaderOptions::getEnableAsyncPrefetch() const {
+    return privateBits_->enableAsyncPrefetch;
+  }
+
 }  // namespace orc
 
 #endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 349ae1b40..c571fb5d6 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -251,6 +251,7 @@ namespace orc {
         contents_(contents),
         throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
         forcedScaleOnHive11Decimal_(opts.getForcedScaleOnHive11Decimal()),
+        enableAsyncPrefetch_(opts.getEnableAsyncPrefetch()),
         footer_(contents_->footer.get()),
         firstRowOfStripe_(*contents_->pool, 0),
         enableEncodedBlock_(opts.getEnableLazyDecoding()),
@@ -541,6 +542,43 @@ namespace orc {
     return result;
   }
 
+  std::vector<ReadRange> extractReadRangesForStripe(uint64_t stripeIndex,
+                                                    const 
proto::StripeInformation& stripeInfo,
+                                                    const proto::StripeFooter& 
stripeFooter,
+                                                    const std::vector<bool>& 
selectedColumns) {
+    std::vector<ReadRange> ranges;
+
+    uint64_t stripeFooterStart =
+        stripeInfo.offset() + stripeInfo.index_length() + 
stripeInfo.data_length();
+    uint64_t offset = stripeInfo.offset();
+
+    for (int i = 0; i < stripeFooter.streams_size(); i++) {
+      const proto::Stream& stream = stripeFooter.streams(i);
+      if (offset + stream.length() > stripeFooterStart) {
+        std::stringstream msg;
+        msg << "Malformed stream meta at stream index " << i << " in stripe " 
<< stripeIndex
+            << ": streamOffset=" << offset << ", streamLength=" << 
stream.length()
+            << ", stripeOffset=" << stripeInfo.offset()
+            << ", stripeIndexLength=" << stripeInfo.index_length()
+            << ", stripeDataLength=" << stripeInfo.data_length();
+        throw ParseError(msg.str());
+      }
+
+      if (stream.has_kind() && selectedColumns[stream.column()]) {
+        const auto& kind = stream.kind();
+        if (kind == proto::Stream_Kind_DATA || kind == 
proto::Stream_Kind_DICTIONARY_DATA ||
+            kind == proto::Stream_Kind_PRESENT || kind == 
proto::Stream_Kind_LENGTH ||
+            kind == proto::Stream_Kind_SECONDARY) {
+          ranges.emplace_back(offset, stream.length());
+        }
+      }
+
+      offset += stream.length();
+    }
+
+    return ranges;
+  }
+
   ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> contents, const 
ReaderOptions& opts,
                          uint64_t fileLength, uint64_t postscriptLength)
       : contents_(std::move(contents)),
@@ -1099,6 +1137,16 @@ namespace orc {
     } while (sargsApplier_ && currentStripe_ < lastStripe_);
 
     if (currentStripe_ < lastStripe_) {
+      if (enableAsyncPrefetch_) {
+        // FIXME: this is very coarse since I/O ranges of all selected columns 
are about to
+        // prefetch. We can further evaluate index stream with knowledge of 
pruned row groups
+        // to issue less I/O ranges.
+        auto ranges = extractReadRangesForStripe(currentStripe_, 
currentStripeInfo_,
+                                                 currentStripeFooter_, 
selectedColumns_);
+        contents_->evictCache(currentStripeInfo_.offset());
+        contents_->cacheRanges(std::move(ranges));
+      }
+
       // get writer timezone info from stripe footer to help understand 
timestamp values.
       const Timezone& writerTimezone =
           currentStripeFooter_.has_writer_timezone()
@@ -1374,6 +1422,7 @@ namespace orc {
     contents->pool = options.getMemoryPool();
     contents->errorStream = options.getErrorStream();
     contents->readerMetrics = options.getReaderMetrics();
+    contents->cacheOptions = options.getCacheOptions();
     std::string serializedFooter = options.getSerializedFileTail();
     uint64_t fileLength;
     uint64_t postscriptLength;
@@ -1534,11 +1583,7 @@ namespace orc {
   }
 
   void ReaderImpl::releaseBuffer(uint64_t boundary) {
-    std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
-
-    if (contents_->readCache) {
-      contents_->readCache->evictEntriesBefore(boundary);
-    }
+    contents_->evictCache(boundary);
   }
 
   void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
@@ -1563,61 +1608,11 @@ namespace orc {
     std::vector<bool> selectedColumns;
     columnSelector.updateSelected(selectedColumns, rowReaderOptions);
 
-    std::vector<ReadRange> ranges;
-    ranges.reserve(newIncludeTypes.size());
     for (auto stripe : newStripes) {
-      // get stripe information
       const auto& stripeInfo = footer_->stripes(stripe);
-      uint64_t stripeFooterStart =
-          stripeInfo.offset() + stripeInfo.index_length() + 
stripeInfo.data_length();
-      uint64_t stripeFooterLength = stripeInfo.footer_length();
-
-      // get stripe footer
-      std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
-          contents_->compression,
-          std::make_unique<SeekableFileInputStream>(contents_->stream.get(), 
stripeFooterStart,
-                                                    stripeFooterLength, 
*contents_->pool),
-          contents_->blockSize, *contents_->pool, contents_->readerMetrics);
-      proto::StripeFooter stripeFooter;
-      if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
-        throw ParseError(std::string("bad StripeFooter from ") + 
pbStream->getName());
-      }
-
-      // traverse all streams in stripe footer, choose selected streams to 
prebuffer
-      uint64_t offset = stripeInfo.offset();
-      for (int i = 0; i < stripeFooter.streams_size(); i++) {
-        const proto::Stream& stream = stripeFooter.streams(i);
-        if (offset + stream.length() > stripeFooterStart) {
-          std::stringstream msg;
-          msg << "Malformed stream meta at stream index " << i << " in stripe 
" << stripe
-              << ": streamOffset=" << offset << ", streamLength=" << 
stream.length()
-              << ", stripeOffset=" << stripeInfo.offset()
-              << ", stripeIndexLength=" << stripeInfo.index_length()
-              << ", stripeDataLength=" << stripeInfo.data_length();
-          throw ParseError(msg.str());
-        }
-
-        if (stream.has_kind() && selectedColumns[stream.column()]) {
-          const auto& kind = stream.kind();
-          if (kind == proto::Stream_Kind_DATA || kind == 
proto::Stream_Kind_DICTIONARY_DATA ||
-              kind == proto::Stream_Kind_PRESENT || kind == 
proto::Stream_Kind_LENGTH ||
-              kind == proto::Stream_Kind_SECONDARY) {
-            ranges.emplace_back(offset, stream.length());
-          }
-        }
-
-        offset += stream.length();
-      }
-
-      {
-        std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
-
-        if (!contents_->readCache) {
-          contents_->readCache = std::make_shared<ReadRangeCache>(
-              getStream(), options_.getCacheOptions(), contents_->pool, 
contents_->readerMetrics);
-        }
-        contents_->readCache->cache(std::move(ranges));
-      }
+      proto::StripeFooter stripeFooter = getStripeFooter(stripeInfo, 
*contents_);
+      auto ranges = extractReadRangesForStripe(stripe, stripeInfo, 
stripeFooter, selectedColumns);
+      contents_->cacheRanges(std::move(ranges));
     }
   }
 
@@ -1629,8 +1624,23 @@ namespace orc {
     // PASS
   }
 
-  InputStream::~InputStream(){
-      // PASS
-  };
+  InputStream::~InputStream() {
+    // PASS
+  }
+
+  void FileContents::cacheRanges(std::vector<ReadRange> ranges) {
+    std::lock_guard<std::mutex> lock(readCacheMutex);
+    if (!readCache) {
+      readCache = std::make_shared<ReadRangeCache>(stream.get(), cacheOptions, 
pool, readerMetrics);
+    }
+    readCache->cache(std::move(ranges));
+  }
+
+  void FileContents::evictCache(uint64_t boundary) {
+    std::lock_guard<std::mutex> lock(readCacheMutex);
+    if (readCache) {
+      readCache->evictEntriesBefore(boundary);
+    }
+  }
 
 }  // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 3d81d2692..58dcb9062 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -73,10 +73,17 @@ namespace orc {
     std::unique_ptr<proto::Metadata> metadata;
     ReaderMetrics* readerMetrics;
 
+    // cache options to advise io coalescing in the read cache.
+    CacheOptions cacheOptions;
     // mutex to protect readCache_ from concurrent access
     std::mutex readCacheMutex;
     // cached io ranges. only valid when preBuffer is invoked.
     std::shared_ptr<ReadRangeCache> readCache;
+
+    // A thread-safe convenience method to cache ranges.
+    void cacheRanges(std::vector<ReadRange> ranges);
+    // A thread-safe convenience method to evict cache entries fully before a 
given boundary.
+    void evictCache(uint64_t boundary);
   };
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -140,6 +147,7 @@ namespace orc {
     std::shared_ptr<FileContents> contents_;
     const bool throwOnHive11DecimalOverflow_;
     const int32_t forcedScaleOnHive11Decimal_;
+    const bool enableAsyncPrefetch_;
 
     // inputs
     std::vector<bool> selectedColumns_;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index f9df6edc9..348a0ad69 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -935,4 +935,172 @@ namespace orc {
       }
     }
   }
+
+  namespace {
+
+    uint64_t writeSampleData(MemoryOutputStream& memStream, uint64_t 
stripeSize = 1024,
+                             uint64_t rowsPerStripe = 1000) {
+      auto type = 
Type::buildTypeFromString("struct<id:int,name:string,value:double>");
+      WriterOptions options;
+      options.setStripeSize(stripeSize)
+          .setCompressionBlockSize(1024)
+          .setMemoryBlockSize(64)
+          .setCompression(CompressionKind_NONE)
+          .setRowIndexStride(100);
+      auto writer = createWriter(*type, &memStream, options);
+
+      uint64_t totalRows = rowsPerStripe * 3;
+      uint64_t batchSize = 100;
+      std::vector<std::string> names(totalRows, "");
+
+      for (uint64_t startRow = 0; startRow < totalRows; startRow += batchSize) 
{
+        uint64_t currentBatchSize = std::min(batchSize, totalRows - startRow);
+        auto batch = writer->createRowBatch(currentBatchSize);
+        auto& structBatch = static_cast<StructVectorBatch&>(*batch);
+        auto& idBatch = static_cast<LongVectorBatch&>(*structBatch.fields[0]);
+        auto& nameBatch = 
static_cast<StringVectorBatch&>(*structBatch.fields[1]);
+        auto& valueBatch = 
static_cast<DoubleVectorBatch&>(*structBatch.fields[2]);
+
+        for (uint64_t i = 0; i < currentBatchSize; ++i) {
+          idBatch.data[i] = static_cast<int64_t>(startRow + i);
+          names[startRow + i] = "name_" + std::to_string(startRow + i);
+          nameBatch.data[i] = const_cast<char*>(names[startRow + i].c_str());
+          nameBatch.length[i] = static_cast<int64_t>(names[startRow + 
i].length());
+          valueBatch.data[i] = static_cast<double>(startRow + i) * 1.5;
+        }
+
+        structBatch.numElements = currentBatchSize;
+        writer->add(*batch);
+      }
+
+      writer->close();
+      return totalRows;
+    }
+
+    uint64_t readAllRows(RowReader& rowReader, uint64_t batchSize = 1000) {
+      auto batch = rowReader.createRowBatch(batchSize);
+      uint64_t totalRows = 0;
+      while (rowReader.next(*batch)) {
+        totalRows += batch->numElements;
+      }
+      return totalRows;
+    }
+
+  }  // namespace
+
+  TEST(TestAsyncPrefetch, testAsyncPrefetchCorrectnessWithMultipleStripes) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    uint64_t totalRows = writeSampleData(memStream);
+
+    auto reader = createReader(
+        std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()), {});
+    ASSERT_GE(reader->getNumberOfStripes(), 2UL);
+
+    auto rowReader = 
reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true));
+    EXPECT_EQ(readAllRows(*rowReader), totalRows);
+  }
+
+  TEST(TestAsyncPrefetch, testAsyncPrefetchWithVariousConfigurations) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    std::ignore = writeSampleData(memStream);
+    auto reader = createReader(
+        std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()), {});
+    ASSERT_GE(reader->getNumberOfStripes(), 2UL);
+
+    struct TestCase {
+      std::string name;
+      std::function<void(RowReaderOptions&)> configureOptions;
+      std::function<void(RowReader&, uint64_t)> validateReader;
+    };
+
+    std::vector<TestCase> testCases = {
+        {"WithColumnSelection",
+         [](RowReaderOptions& opts) {
+           opts.include({0, 2});  // Select only id and value columns
+         },
+         [](RowReader& reader, uint64_t expectedRows) {
+           auto batch = reader.createRowBatch(1000);
+           uint64_t totalRows = 0;
+           while (reader.next(*batch)) {
+             totalRows += batch->numElements;
+             // Verify that only selected columns are present
+             auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+             EXPECT_EQ(2, structBatch.fields.size());
+           }
+           EXPECT_EQ(totalRows, expectedRows);
+         }},
+        {"WithSeek", [](RowReaderOptions& /*opts*/) { /* no additional config 
*/ },
+         [](RowReader& reader, uint64_t expectedRows) {
+           auto batch = reader.createRowBatch(500);
+
+           // Read some data first
+           EXPECT_TRUE(reader.next(*batch));
+           EXPECT_GT(batch->numElements, 0UL);
+
+           // Seek to middle and continue reading
+           uint64_t seekPosition = expectedRows / 2;
+           reader.seekToRow(seekPosition);
+
+           EXPECT_TRUE(reader.next(*batch));
+           EXPECT_GT(batch->numElements, 0UL);
+           EXPECT_GE(reader.getRowNumber(), seekPosition);
+         }},
+        {"WithRange",
+         [](RowReaderOptions& /*opts*/) {
+           // Read only middle portion - will be set dynamically in test
+         },
+         [](RowReader& reader, uint64_t expectedRows) {
+           uint64_t totalRows = readAllRows(reader);
+           // Should read approximately the specified range (allowing 
tolerance for stripe
+           // boundaries)
+           EXPECT_LE(totalRows, expectedRows / 2 + 1000);
+         }}};
+
+    for (const auto& testCase : testCases) {
+      SCOPED_TRACE("Testing async prefetch " + testCase.name);
+
+      RowReaderOptions options;
+      options.setEnableAsyncPrefetch(true);
+
+      if (testCase.name == "WithRange") {
+        uint64_t startRow = reader->getNumberOfRows() / 4;
+        uint64_t endRow = 3 * reader->getNumberOfRows() / 4;
+        options.range(startRow, endRow - startRow);
+      } else {
+        testCase.configureOptions(options);
+      }
+
+      auto rowReader = reader->createRowReader(options);
+      testCase.validateReader(*rowReader, reader->getNumberOfRows());
+    }
+  }
+
+  TEST(TestAsyncPrefetch, testAsyncPrefetchEdgeCases) {
+    // Test with single stripe
+    {
+      MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+      uint64_t totalRows = writeSampleData(memStream, 1024 * 1024, 100);
+
+      auto reader = createReader(
+          std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()), {});
+      ASSERT_EQ(1UL, reader->getNumberOfStripes());
+
+      auto rowReader = 
reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true));
+      ASSERT_EQ(readAllRows(*rowReader), totalRows);
+    }
+
+    // Test basic functionality with reader metrics (ensure no crashes)
+    {
+      MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+      uint64_t totalRows = writeSampleData(memStream);
+
+      auto reader = createReader(
+          std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength()), {});
+      ASSERT_GE(reader->getNumberOfStripes(), 2UL);
+
+      auto rowReader = 
reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true));
+      ASSERT_EQ(readAllRows(*rowReader), totalRows);
+    }
+  }
+
 }  // namespace orc

Reply via email to