This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new b98d0e21f ORC-1969: [C++] Support async I/O prefetch of next stripe
b98d0e21f is described below
commit b98d0e21f73b9ce91cc5ee270d7a8f127f19d708
Author: Gang Wu <[email protected]>
AuthorDate: Mon Aug 11 15:56:53 2025 +0800
ORC-1969: [C++] Support async I/O prefetch of next stripe
### What changes were proposed in this pull request?
Add a reader option to support prefetch next stripe automatically.
### Why are the changes needed?
The C++ reader has added two functions (preBuffer and releaseBuffer) to
support manual async I/O prefetch. This is not easy to use since it requires
manual control of I/O operations.
### How was this patch tested?
Added test cases to verify that async prefetch is triggered and worked as
expected.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #2353 from wgtmac/async_prefetch.
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
c++/include/orc/Reader.hh | 10 +++
c++/src/Options.hh | 13 ++++
c++/src/Reader.cc | 132 +++++++++++++++++++-----------------
c++/src/Reader.hh | 8 +++
c++/test/TestReader.cc | 168 ++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 270 insertions(+), 61 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index e9f420f11..46b3c3983 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -388,6 +388,16 @@ namespace orc {
* Whether reader throws or returns null when value overflows for schema
evolution.
*/
bool getThrowOnSchemaEvolutionOverflow() const;
+
+ /**
+ * Set whether to enable async I/O prefetch of next stripe.
+ */
+ RowReaderOptions& setEnableAsyncPrefetch(bool enable);
+
+ /**
+ * Whether to enable async I/O prefetch of next stripe.
+ */
+ bool getEnableAsyncPrefetch() const;
};
class RowReader;
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 0a4bd56d8..2dca5e7f5 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -25,6 +25,7 @@
#include "io/Cache.hh"
+#include <iostream>
#include <limits>
namespace orc {
@@ -153,6 +154,7 @@ namespace orc {
bool useTightNumericVector;
std::shared_ptr<Type> readType;
bool throwOnSchemaEvolutionOverflow;
+ bool enableAsyncPrefetch;
RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
@@ -164,6 +166,7 @@ namespace orc {
readerTimezone = "GMT";
useTightNumericVector = false;
throwOnSchemaEvolutionOverflow = false;
+ enableAsyncPrefetch = false;
}
};
@@ -338,6 +341,16 @@ namespace orc {
std::shared_ptr<Type>& RowReaderOptions::getReadType() const {
return privateBits_->readType;
}
+
+ RowReaderOptions& RowReaderOptions::setEnableAsyncPrefetch(bool enable) {
+ privateBits_->enableAsyncPrefetch = enable;
+ return *this;
+ }
+
+ bool RowReaderOptions::getEnableAsyncPrefetch() const {
+ return privateBits_->enableAsyncPrefetch;
+ }
+
} // namespace orc
#endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 349ae1b40..c571fb5d6 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -251,6 +251,7 @@ namespace orc {
contents_(contents),
throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
forcedScaleOnHive11Decimal_(opts.getForcedScaleOnHive11Decimal()),
+ enableAsyncPrefetch_(opts.getEnableAsyncPrefetch()),
footer_(contents_->footer.get()),
firstRowOfStripe_(*contents_->pool, 0),
enableEncodedBlock_(opts.getEnableLazyDecoding()),
@@ -541,6 +542,43 @@ namespace orc {
return result;
}
+ std::vector<ReadRange> extractReadRangesForStripe(uint64_t stripeIndex,
+ const
proto::StripeInformation& stripeInfo,
+ const proto::StripeFooter&
stripeFooter,
+ const std::vector<bool>&
selectedColumns) {
+ std::vector<ReadRange> ranges;
+
+ uint64_t stripeFooterStart =
+ stripeInfo.offset() + stripeInfo.index_length() +
stripeInfo.data_length();
+ uint64_t offset = stripeInfo.offset();
+
+ for (int i = 0; i < stripeFooter.streams_size(); i++) {
+ const proto::Stream& stream = stripeFooter.streams(i);
+ if (offset + stream.length() > stripeFooterStart) {
+ std::stringstream msg;
+ msg << "Malformed stream meta at stream index " << i << " in stripe "
<< stripeIndex
+ << ": streamOffset=" << offset << ", streamLength=" <<
stream.length()
+ << ", stripeOffset=" << stripeInfo.offset()
+ << ", stripeIndexLength=" << stripeInfo.index_length()
+ << ", stripeDataLength=" << stripeInfo.data_length();
+ throw ParseError(msg.str());
+ }
+
+ if (stream.has_kind() && selectedColumns[stream.column()]) {
+ const auto& kind = stream.kind();
+ if (kind == proto::Stream_Kind_DATA || kind ==
proto::Stream_Kind_DICTIONARY_DATA ||
+ kind == proto::Stream_Kind_PRESENT || kind ==
proto::Stream_Kind_LENGTH ||
+ kind == proto::Stream_Kind_SECONDARY) {
+ ranges.emplace_back(offset, stream.length());
+ }
+ }
+
+ offset += stream.length();
+ }
+
+ return ranges;
+ }
+
ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> contents, const
ReaderOptions& opts,
uint64_t fileLength, uint64_t postscriptLength)
: contents_(std::move(contents)),
@@ -1099,6 +1137,16 @@ namespace orc {
} while (sargsApplier_ && currentStripe_ < lastStripe_);
if (currentStripe_ < lastStripe_) {
+ if (enableAsyncPrefetch_) {
+ // FIXME: this is very coarse since I/O ranges of all selected columns
are about to
+ // prefetch. We can further evaluate index stream with knowledge of
pruned row groups
+ // to issue less I/O ranges.
+ auto ranges = extractReadRangesForStripe(currentStripe_,
currentStripeInfo_,
+ currentStripeFooter_,
selectedColumns_);
+ contents_->evictCache(currentStripeInfo_.offset());
+ contents_->cacheRanges(std::move(ranges));
+ }
+
// get writer timezone info from stripe footer to help understand
timestamp values.
const Timezone& writerTimezone =
currentStripeFooter_.has_writer_timezone()
@@ -1374,6 +1422,7 @@ namespace orc {
contents->pool = options.getMemoryPool();
contents->errorStream = options.getErrorStream();
contents->readerMetrics = options.getReaderMetrics();
+ contents->cacheOptions = options.getCacheOptions();
std::string serializedFooter = options.getSerializedFileTail();
uint64_t fileLength;
uint64_t postscriptLength;
@@ -1534,11 +1583,7 @@ namespace orc {
}
void ReaderImpl::releaseBuffer(uint64_t boundary) {
- std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
-
- if (contents_->readCache) {
- contents_->readCache->evictEntriesBefore(boundary);
- }
+ contents_->evictCache(boundary);
}
void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
@@ -1563,61 +1608,11 @@ namespace orc {
std::vector<bool> selectedColumns;
columnSelector.updateSelected(selectedColumns, rowReaderOptions);
- std::vector<ReadRange> ranges;
- ranges.reserve(newIncludeTypes.size());
for (auto stripe : newStripes) {
- // get stripe information
const auto& stripeInfo = footer_->stripes(stripe);
- uint64_t stripeFooterStart =
- stripeInfo.offset() + stripeInfo.index_length() +
stripeInfo.data_length();
- uint64_t stripeFooterLength = stripeInfo.footer_length();
-
- // get stripe footer
- std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
- contents_->compression,
- std::make_unique<SeekableFileInputStream>(contents_->stream.get(),
stripeFooterStart,
- stripeFooterLength,
*contents_->pool),
- contents_->blockSize, *contents_->pool, contents_->readerMetrics);
- proto::StripeFooter stripeFooter;
- if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
- throw ParseError(std::string("bad StripeFooter from ") +
pbStream->getName());
- }
-
- // traverse all streams in stripe footer, choose selected streams to
prebuffer
- uint64_t offset = stripeInfo.offset();
- for (int i = 0; i < stripeFooter.streams_size(); i++) {
- const proto::Stream& stream = stripeFooter.streams(i);
- if (offset + stream.length() > stripeFooterStart) {
- std::stringstream msg;
- msg << "Malformed stream meta at stream index " << i << " in stripe
" << stripe
- << ": streamOffset=" << offset << ", streamLength=" <<
stream.length()
- << ", stripeOffset=" << stripeInfo.offset()
- << ", stripeIndexLength=" << stripeInfo.index_length()
- << ", stripeDataLength=" << stripeInfo.data_length();
- throw ParseError(msg.str());
- }
-
- if (stream.has_kind() && selectedColumns[stream.column()]) {
- const auto& kind = stream.kind();
- if (kind == proto::Stream_Kind_DATA || kind ==
proto::Stream_Kind_DICTIONARY_DATA ||
- kind == proto::Stream_Kind_PRESENT || kind ==
proto::Stream_Kind_LENGTH ||
- kind == proto::Stream_Kind_SECONDARY) {
- ranges.emplace_back(offset, stream.length());
- }
- }
-
- offset += stream.length();
- }
-
- {
- std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
-
- if (!contents_->readCache) {
- contents_->readCache = std::make_shared<ReadRangeCache>(
- getStream(), options_.getCacheOptions(), contents_->pool,
contents_->readerMetrics);
- }
- contents_->readCache->cache(std::move(ranges));
- }
+ proto::StripeFooter stripeFooter = getStripeFooter(stripeInfo,
*contents_);
+ auto ranges = extractReadRangesForStripe(stripe, stripeInfo,
stripeFooter, selectedColumns);
+ contents_->cacheRanges(std::move(ranges));
}
}
@@ -1629,8 +1624,23 @@ namespace orc {
// PASS
}
- InputStream::~InputStream(){
- // PASS
- };
+ InputStream::~InputStream() {
+ // PASS
+ }
+
+ void FileContents::cacheRanges(std::vector<ReadRange> ranges) {
+ std::lock_guard<std::mutex> lock(readCacheMutex);
+ if (!readCache) {
+ readCache = std::make_shared<ReadRangeCache>(stream.get(), cacheOptions,
pool, readerMetrics);
+ }
+ readCache->cache(std::move(ranges));
+ }
+
+ void FileContents::evictCache(uint64_t boundary) {
+ std::lock_guard<std::mutex> lock(readCacheMutex);
+ if (readCache) {
+ readCache->evictEntriesBefore(boundary);
+ }
+ }
} // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 3d81d2692..58dcb9062 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -73,10 +73,17 @@ namespace orc {
std::unique_ptr<proto::Metadata> metadata;
ReaderMetrics* readerMetrics;
+ // cache options to advise io coalescing in the read cache.
+ CacheOptions cacheOptions;
// mutex to protect readCache_ from concurrent access
std::mutex readCacheMutex;
// cached io ranges. only valid when preBuffer is invoked.
std::shared_ptr<ReadRangeCache> readCache;
+
+ // A thread-safe convenience method to cache ranges.
+ void cacheRanges(std::vector<ReadRange> ranges);
+ // A thread-safe convenience method to evict cache entries fully before a
given boundary.
+ void evictCache(uint64_t boundary);
};
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -140,6 +147,7 @@ namespace orc {
std::shared_ptr<FileContents> contents_;
const bool throwOnHive11DecimalOverflow_;
const int32_t forcedScaleOnHive11Decimal_;
+ const bool enableAsyncPrefetch_;
// inputs
std::vector<bool> selectedColumns_;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index f9df6edc9..348a0ad69 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -935,4 +935,172 @@ namespace orc {
}
}
}
+
+ namespace {
+
+ uint64_t writeSampleData(MemoryOutputStream& memStream, uint64_t
stripeSize = 1024,
+ uint64_t rowsPerStripe = 1000) {
+ auto type =
Type::buildTypeFromString("struct<id:int,name:string,value:double>");
+ WriterOptions options;
+ options.setStripeSize(stripeSize)
+ .setCompressionBlockSize(1024)
+ .setMemoryBlockSize(64)
+ .setCompression(CompressionKind_NONE)
+ .setRowIndexStride(100);
+ auto writer = createWriter(*type, &memStream, options);
+
+ uint64_t totalRows = rowsPerStripe * 3;
+ uint64_t batchSize = 100;
+ std::vector<std::string> names(totalRows, "");
+
+ for (uint64_t startRow = 0; startRow < totalRows; startRow += batchSize)
{
+ uint64_t currentBatchSize = std::min(batchSize, totalRows - startRow);
+ auto batch = writer->createRowBatch(currentBatchSize);
+ auto& structBatch = static_cast<StructVectorBatch&>(*batch);
+ auto& idBatch = static_cast<LongVectorBatch&>(*structBatch.fields[0]);
+ auto& nameBatch =
static_cast<StringVectorBatch&>(*structBatch.fields[1]);
+ auto& valueBatch =
static_cast<DoubleVectorBatch&>(*structBatch.fields[2]);
+
+ for (uint64_t i = 0; i < currentBatchSize; ++i) {
+ idBatch.data[i] = static_cast<int64_t>(startRow + i);
+ names[startRow + i] = "name_" + std::to_string(startRow + i);
+ nameBatch.data[i] = const_cast<char*>(names[startRow + i].c_str());
+ nameBatch.length[i] = static_cast<int64_t>(names[startRow +
i].length());
+ valueBatch.data[i] = static_cast<double>(startRow + i) * 1.5;
+ }
+
+ structBatch.numElements = currentBatchSize;
+ writer->add(*batch);
+ }
+
+ writer->close();
+ return totalRows;
+ }
+
+ uint64_t readAllRows(RowReader& rowReader, uint64_t batchSize = 1000) {
+ auto batch = rowReader.createRowBatch(batchSize);
+ uint64_t totalRows = 0;
+ while (rowReader.next(*batch)) {
+ totalRows += batch->numElements;
+ }
+ return totalRows;
+ }
+
+ } // namespace
+
+ TEST(TestAsyncPrefetch, testAsyncPrefetchCorrectnessWithMultipleStripes) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ uint64_t totalRows = writeSampleData(memStream);
+
+ auto reader = createReader(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()), {});
+ ASSERT_GE(reader->getNumberOfStripes(), 2UL);
+
+ auto rowReader =
reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true));
+ EXPECT_EQ(readAllRows(*rowReader), totalRows);
+ }
+
+ TEST(TestAsyncPrefetch, testAsyncPrefetchWithVariousConfigurations) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ std::ignore = writeSampleData(memStream);
+ auto reader = createReader(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()), {});
+ ASSERT_GE(reader->getNumberOfStripes(), 2UL);
+
+ struct TestCase {
+ std::string name;
+ std::function<void(RowReaderOptions&)> configureOptions;
+ std::function<void(RowReader&, uint64_t)> validateReader;
+ };
+
+ std::vector<TestCase> testCases = {
+ {"WithColumnSelection",
+ [](RowReaderOptions& opts) {
+ opts.include({0, 2}); // Select only id and value columns
+ },
+ [](RowReader& reader, uint64_t expectedRows) {
+ auto batch = reader.createRowBatch(1000);
+ uint64_t totalRows = 0;
+ while (reader.next(*batch)) {
+ totalRows += batch->numElements;
+ // Verify that only selected columns are present
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ EXPECT_EQ(2, structBatch.fields.size());
+ }
+ EXPECT_EQ(totalRows, expectedRows);
+ }},
+ {"WithSeek", [](RowReaderOptions& /*opts*/) { /* no additional config
*/ },
+ [](RowReader& reader, uint64_t expectedRows) {
+ auto batch = reader.createRowBatch(500);
+
+ // Read some data first
+ EXPECT_TRUE(reader.next(*batch));
+ EXPECT_GT(batch->numElements, 0UL);
+
+ // Seek to middle and continue reading
+ uint64_t seekPosition = expectedRows / 2;
+ reader.seekToRow(seekPosition);
+
+ EXPECT_TRUE(reader.next(*batch));
+ EXPECT_GT(batch->numElements, 0UL);
+ EXPECT_GE(reader.getRowNumber(), seekPosition);
+ }},
+ {"WithRange",
+ [](RowReaderOptions& /*opts*/) {
+ // Read only middle portion - will be set dynamically in test
+ },
+ [](RowReader& reader, uint64_t expectedRows) {
+ uint64_t totalRows = readAllRows(reader);
+ // Should read approximately the specified range (allowing
tolerance for stripe
+ // boundaries)
+ EXPECT_LE(totalRows, expectedRows / 2 + 1000);
+ }}};
+
+ for (const auto& testCase : testCases) {
+ SCOPED_TRACE("Testing async prefetch " + testCase.name);
+
+ RowReaderOptions options;
+ options.setEnableAsyncPrefetch(true);
+
+ if (testCase.name == "WithRange") {
+ uint64_t startRow = reader->getNumberOfRows() / 4;
+ uint64_t endRow = 3 * reader->getNumberOfRows() / 4;
+ options.range(startRow, endRow - startRow);
+ } else {
+ testCase.configureOptions(options);
+ }
+
+ auto rowReader = reader->createRowReader(options);
+ testCase.validateReader(*rowReader, reader->getNumberOfRows());
+ }
+ }
+
+ TEST(TestAsyncPrefetch, testAsyncPrefetchEdgeCases) {
+ // Test with single stripe
+ {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ uint64_t totalRows = writeSampleData(memStream, 1024 * 1024, 100);
+
+ auto reader = createReader(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()), {});
+ ASSERT_EQ(1UL, reader->getNumberOfStripes());
+
+ auto rowReader =
reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true));
+ ASSERT_EQ(readAllRows(*rowReader), totalRows);
+ }
+
+ // Test basic functionality with reader metrics (ensure no crashes)
+ {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ uint64_t totalRows = writeSampleData(memStream);
+
+ auto reader = createReader(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()), {});
+ ASSERT_GE(reader->getNumberOfStripes(), 2UL);
+
+ auto rowReader =
reader->createRowReader(RowReaderOptions{}.setEnableAsyncPrefetch(true));
+ ASSERT_EQ(readAllRows(*rowReader), totalRows);
+ }
+ }
+
} // namespace orc