This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new c562af3ff ORC-2002: [C++] Improve prefetching stripe
c562af3ff is described below
commit c562af3ff48f147f37529107752744298a49565e
Author: Gang Wu <[email protected]>
AuthorDate: Mon Sep 22 10:17:09 2025 +0800
ORC-2002: [C++] Improve prefetching stripe
### What changes were proposed in this pull request?
- Fix prefetching when small stripe limit is 0 to prefetch current stripe
data and next stripe footer.
- Prefetch next small stripe if current stripe is currently the last small
stripe that cached fully.
- Prefetch and coalesce ranges of index.
- Refactor code to try read from cache and then fall back to original
stream.
### Why are the changes needed?
To make stripe prefetching more effective.
### How was this patch tested?
Added test case for small stripe limit 0. Other fixes are verified manually
due to not easy to reflect in the test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #2409 from wgtmac/prefetch_small_stripe.
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
c++/src/Reader.cc | 166 +++++++++++++++++++++++++++++--------------------
c++/test/TestReader.cc | 36 ++++++++---
2 files changed, 124 insertions(+), 78 deletions(-)
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 1cf6940c8..ab4c5047d 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -27,7 +27,6 @@
#include "wrap/coded-stream-wrapper.h"
#include <algorithm>
-#include <iostream>
#include <iterator>
#include <memory>
#include <set>
@@ -246,6 +245,47 @@ namespace orc {
buildTypeNameIdMap(contents_->schema.get());
}
+ static const std::unordered_set<proto::Stream_Kind> DATA_STREAM_KINDS = {
+ proto::Stream_Kind_DATA, proto::Stream_Kind_DICTIONARY_DATA,
proto::Stream_Kind_PRESENT,
+ proto::Stream_Kind_LENGTH, proto::Stream_Kind_SECONDARY};
+
+ static const std::unordered_set<proto::Stream_Kind> INDEX_STREAM_KINDS = {
+ proto::Stream_Kind_ROW_INDEX, proto::Stream_Kind_BLOOM_FILTER_UTF8};
+
+ std::vector<ReadRange> extractReadRangesForStripe(
+ uint64_t stripeIndex, const proto::StripeInformation& stripeInfo,
+ const proto::StripeFooter& stripeFooter, const std::vector<bool>&
selectedColumns,
+ const std::unordered_set<proto::Stream_Kind>& allowedKinds =
DATA_STREAM_KINDS) {
+ std::vector<ReadRange> ranges;
+
+ uint64_t stripeFooterStart =
+ stripeInfo.offset() + stripeInfo.index_length() +
stripeInfo.data_length();
+ uint64_t offset = stripeInfo.offset();
+
+ for (int i = 0; i < stripeFooter.streams_size(); i++) {
+ const proto::Stream& stream = stripeFooter.streams(i);
+ if (offset + stream.length() > stripeFooterStart) {
+ std::stringstream msg;
+ msg << "Malformed stream meta at stream index " << i << " in stripe "
<< stripeIndex
+ << ": streamOffset=" << offset << ", streamLength=" <<
stream.length()
+ << ", stripeOffset=" << stripeInfo.offset()
+ << ", stripeIndexLength=" << stripeInfo.index_length()
+ << ", stripeDataLength=" << stripeInfo.data_length();
+ throw ParseError(msg.str());
+ }
+
+ if (stream.has_kind() && selectedColumns[stream.column()]) {
+ if (allowedKinds.find(stream.kind()) != allowedKinds.cend()) {
+ ranges.emplace_back(offset, stream.length());
+ }
+ }
+
+ offset += stream.length();
+ }
+
+ return ranges;
+ }
+
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const
RowReaderOptions& opts)
: localTimezone_(getLocalTimezone()),
contents_(contents),
@@ -442,6 +482,15 @@ namespace orc {
rowIndexes_.clear();
bloomFilterIndex_.clear();
+ if (enableAsyncPrefetch_ &&
+ fullyCachedStripes_.find(currentStripe_) == fullyCachedStripes_.end())
{
+ // Cache required ranges of index which are usually very small
+ auto ranges =
+ extractReadRangesForStripe(currentStripe_, currentStripeInfo_,
currentStripeFooter_,
+ selectedColumns_, INDEX_STREAM_KINDS);
+ contents_->cacheRanges(std::move(ranges));
+ }
+
// obtain row indexes for selected columns
uint64_t offset = currentStripeInfo_.offset();
for (int i = 0; i < currentStripeFooter_.streams_size(); ++i) {
@@ -450,11 +499,24 @@ namespace orc {
if (selectedColumns_[colId] && pbStream.has_kind() &&
(pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
- std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
- getCompression(),
- std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
- contents_->stream.get(), offset, pbStream.length(),
*contents_->pool)),
- getCompressionSize(), *contents_->pool, contents_->readerMetrics);
+ std::unique_ptr<SeekableInputStream> inStream;
+ BufferSlice slice;
+
+ {
+ std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+ if (contents_->readCache) {
+ slice = contents_->readCache->read(ReadRange(offset,
pbStream.length()));
+ }
+ }
+ if (slice.buffer) {
+ inStream =
std::make_unique<SeekableArrayInputStream>(slice.buffer->data() + slice.offset,
+ slice.length);
+ } else {
+ inStream =
std::make_unique<SeekableFileInputStream>(contents_->stream.get(), offset,
+
pbStream.length(), *contents_->pool);
+ }
+ inStream = createDecompressor(getCompression(), std::move(inStream),
getCompressionSize(),
+ *contents_->pool,
contents_->readerMetrics);
if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
proto::RowIndex rowIndex;
@@ -526,32 +588,23 @@ namespace orc {
uint64_t stripeFooterLength = info.footer_length();
std::unique_ptr<SeekableInputStream> pbStream;
+ BufferSlice slice;
- // Try to read from cache first
{
std::lock_guard<std::mutex> lock(contents.readCacheMutex);
if (contents.readCache) {
- ReadRange footerRange(stripeFooterStart, stripeFooterLength);
- BufferSlice cachedData = contents.readCache->read(footerRange);
- if (cachedData.buffer != nullptr) {
- // Create a stream from cached data
- pbStream = createDecompressor(
- contents.compression,
- std::make_unique<SeekableArrayInputStream>(
- cachedData.buffer->data() + cachedData.offset,
cachedData.length),
- contents.blockSize, *contents.pool, contents.readerMetrics);
- }
+ slice = contents.readCache->read(ReadRange(stripeFooterStart,
stripeFooterLength));
}
}
-
- // Fall back to reading from disk if not in cache
- if (!pbStream) {
- pbStream = createDecompressor(
- contents.compression,
- std::make_unique<SeekableFileInputStream>(contents.stream.get(),
stripeFooterStart,
- stripeFooterLength,
*contents.pool),
- contents.blockSize, *contents.pool, contents.readerMetrics);
+ if (slice.buffer) {
+ pbStream =
std::make_unique<SeekableArrayInputStream>(slice.buffer->data() + slice.offset,
+ slice.length);
+ } else {
+ pbStream =
std::make_unique<SeekableFileInputStream>(contents.stream.get(),
stripeFooterStart,
+ stripeFooterLength,
*contents.pool);
}
+ pbStream = createDecompressor(contents.compression, std::move(pbStream),
contents.blockSize,
+ *contents.pool, contents.readerMetrics);
proto::StripeFooter result;
if (!result.ParseFromZeroCopyStream(pbStream.get())) {
@@ -567,43 +620,6 @@ namespace orc {
return result;
}
- std::vector<ReadRange> extractReadRangesForStripe(uint64_t stripeIndex,
- const
proto::StripeInformation& stripeInfo,
- const proto::StripeFooter&
stripeFooter,
- const std::vector<bool>&
selectedColumns) {
- std::vector<ReadRange> ranges;
-
- uint64_t stripeFooterStart =
- stripeInfo.offset() + stripeInfo.index_length() +
stripeInfo.data_length();
- uint64_t offset = stripeInfo.offset();
-
- for (int i = 0; i < stripeFooter.streams_size(); i++) {
- const proto::Stream& stream = stripeFooter.streams(i);
- if (offset + stream.length() > stripeFooterStart) {
- std::stringstream msg;
- msg << "Malformed stream meta at stream index " << i << " in stripe "
<< stripeIndex
- << ": streamOffset=" << offset << ", streamLength=" <<
stream.length()
- << ", stripeOffset=" << stripeInfo.offset()
- << ", stripeIndexLength=" << stripeInfo.index_length()
- << ", stripeDataLength=" << stripeInfo.data_length();
- throw ParseError(msg.str());
- }
-
- if (stream.has_kind() && selectedColumns[stream.column()]) {
- const auto& kind = stream.kind();
- if (kind == proto::Stream_Kind_DATA || kind ==
proto::Stream_Kind_DICTIONARY_DATA ||
- kind == proto::Stream_Kind_PRESENT || kind ==
proto::Stream_Kind_LENGTH ||
- kind == proto::Stream_Kind_SECONDARY) {
- ranges.emplace_back(offset, stream.length());
- }
- }
-
- offset += stream.length();
- }
-
- return ranges;
- }
-
ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> contents, const
ReaderOptions& opts,
uint64_t fileLength, uint64_t postscriptLength)
: contents_(std::move(contents)),
@@ -1173,13 +1189,10 @@ namespace orc {
if (enableAsyncPrefetch_) {
contents_->evictCache(currentStripeInfo_.offset());
- if (fullyCachedStripes_.find(currentStripe_) !=
fullyCachedStripes_.cend()) {
- // Current stripe has been fully cached, do nothing.
- } else if (isSmallStripe(currentStripeInfo_,
contents_->cacheOptions.rangeSizeLimit)) {
+ auto extractSmallStripeRanges = [this](uint64_t startStripe) {
std::vector<ReadRange> ranges;
- uint64_t maxStripe =
- std::min(lastStripe_, currentStripe_ +
smallStripeLookAheadLimit_ + 1);
- for (uint64_t stripe = currentStripe_; stripe < maxStripe; stripe++)
{
+ uint64_t maxStripe = std::min(lastStripe_, startStripe +
smallStripeLookAheadLimit_ + 1);
+ for (uint64_t stripe = startStripe; stripe < maxStripe; stripe++) {
const auto& stripeInfo =
footer_->stripes(static_cast<int>(stripe));
if (!isSmallStripe(stripeInfo,
contents_->cacheOptions.rangeSizeLimit)) {
break;
@@ -1187,7 +1200,22 @@ namespace orc {
ranges.push_back(ReadRange{stripeInfo.offset(),
getStripeSize(stripeInfo)});
fullyCachedStripes_.insert(stripe);
}
- contents_->cacheRanges(std::move(ranges));
+ return ranges;
+ };
+
+ if (fullyCachedStripes_.find(currentStripe_) !=
fullyCachedStripes_.cend()) {
+ // Current stripe has been fully cached, only prefetch next (small)
stripe if not cached
+ auto nextStripe = currentStripe_ + 1;
+ if (nextStripe < lastStripe_ &&
+ fullyCachedStripes_.find(nextStripe) ==
fullyCachedStripes_.cend()) {
+ std::vector<ReadRange> ranges =
extractSmallStripeRanges(nextStripe);
+ if (!ranges.empty()) {
+ contents_->cacheRanges(std::move(ranges));
+ }
+ }
+ } else if (smallStripeLookAheadLimit_ > 0 &&
+ isSmallStripe(currentStripeInfo_,
contents_->cacheOptions.rangeSizeLimit)) {
+ contents_->cacheRanges(extractSmallStripeRanges(currentStripe_));
} else {
// This is very coarse since I/O ranges of all selected columns are
about to prefetch.
// We can further evaluate index stream with knowledge of pruned row
groups to issue
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 12931a5eb..ab6e5ac66 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -1146,6 +1146,7 @@ namespace orc {
uint64_t noPrefetchIOCount = 0;
uint64_t smallLimitIOCount = 0;
uint64_t largeLimitIOCount = 0;
+ uint64_t zeroLimitIOCount = 0;
// Test 1: No async prefetch - should have most I/O operations
{
@@ -1154,7 +1155,7 @@ namespace orc {
auto* countingPtr = countingStream.get();
ReaderOptions readerOptions;
- std::unique_ptr<Reader> reader = createReader(std::move(countingStream),
readerOptions);
+ auto reader = createReader(std::move(countingStream), readerOptions);
RowReaderOptions rowReaderOptions;
rowReaderOptions.setEnableAsyncPrefetch(false);
@@ -1166,8 +1167,6 @@ namespace orc {
noPrefetchIOCount = countingPtr->getReadCount();
EXPECT_EQ(readRows, totalRows);
EXPECT_GT(noPrefetchIOCount, 0UL);
-
- std::cout << "No async prefetch I/O count: " << noPrefetchIOCount <<
std::endl;
}
// Test 2: Async prefetch with small look ahead limit
@@ -1177,7 +1176,7 @@ namespace orc {
auto* countingPtr = countingStream.get();
ReaderOptions readerOptions;
- std::unique_ptr<Reader> reader = createReader(std::move(countingStream),
readerOptions);
+ auto reader = createReader(std::move(countingStream), readerOptions);
RowReaderOptions rowReaderOptions;
rowReaderOptions.setEnableAsyncPrefetch(true);
@@ -1190,8 +1189,6 @@ namespace orc {
smallLimitIOCount = countingPtr->getReadCount();
EXPECT_EQ(readRows, totalRows);
EXPECT_GT(smallLimitIOCount, 0UL);
-
- std::cout << "Small limit (1) prefetch I/O count: " << smallLimitIOCount
<< std::endl;
}
// Test 3: Async prefetch with large look ahead limit
@@ -1201,7 +1198,7 @@ namespace orc {
auto* countingPtr = countingStream.get();
ReaderOptions readerOptions;
- std::unique_ptr<Reader> reader = createReader(std::move(countingStream),
readerOptions);
+ auto reader = createReader(std::move(countingStream), readerOptions);
RowReaderOptions rowReaderOptions;
rowReaderOptions.setEnableAsyncPrefetch(true);
@@ -1214,11 +1211,32 @@ namespace orc {
largeLimitIOCount = countingPtr->getReadCount();
EXPECT_EQ(readRows, totalRows);
EXPECT_GT(largeLimitIOCount, 0UL);
+ }
+
+ // Test 4: Async prefetch with zero look ahead limit
+ {
+ auto countingStream = std::make_unique<IOCountingInputStream>(
+ std::make_unique<MemoryInputStream>(memStream.getData(),
memStream.getLength()));
+ auto* countingPtr = countingStream.get();
+
+ ReaderOptions readerOptions;
+ auto reader = createReader(std::move(countingStream), readerOptions);
+
+ RowReaderOptions rowReaderOptions;
+ rowReaderOptions.setEnableAsyncPrefetch(true);
+ rowReaderOptions.setSmallStripeLookAheadLimit(0); // Zero limit
+ auto rowReader = reader->createRowReader(rowReaderOptions);
- std::cout << "Large limit (3) prefetch I/O count: " << largeLimitIOCount
<< std::endl;
+ countingPtr->resetReadCount();
+ uint64_t readRows = readAllRows(*rowReader);
+
+ zeroLimitIOCount = countingPtr->getReadCount();
+ EXPECT_EQ(readRows, totalRows);
+ EXPECT_GT(zeroLimitIOCount, 0UL);
}
- EXPECT_LT(smallLimitIOCount, noPrefetchIOCount);
+ EXPECT_LT(zeroLimitIOCount, noPrefetchIOCount);
+ EXPECT_LT(smallLimitIOCount, zeroLimitIOCount);
EXPECT_LT(largeLimitIOCount, smallLimitIOCount);
}