This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push:
new f10bb3d9113 [Feature] Add input stream of stripe streams in stripe
reader. (#270)
f10bb3d9113 is described below
commit f10bb3d91136149845f69fa51ad3a0663941c43d
Author: Qi Chen <[email protected]>
AuthorDate: Fri Jan 10 09:00:56 2025 +0800
[Feature] Add input stream of stripe streams in stripe reader. (#270)
---
c++/include/orc/Common.hh | 49 +++++++++++++++++++++++++++++++++++++++
c++/include/orc/OrcFile.hh | 6 +++--
c++/src/Reader.cc | 58 ++++++++++++++++++++++++++--------------------
c++/src/Reader.hh | 12 +++++-----
c++/src/StripeStream.cc | 17 ++++++++------
c++/src/StripeStream.hh | 10 ++++----
6 files changed, 108 insertions(+), 44 deletions(-)
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
index beae9dd6f31..9df0b0ed6a8 100644
--- a/c++/include/orc/Common.hh
+++ b/c++/include/orc/Common.hh
@@ -156,6 +156,46 @@ namespace orc {
std::string columnEncodingKindToString(ColumnEncodingKind kind);
+ class StreamId {
+ public:
+ StreamId(uint64_t columnId, StreamKind streamKind)
+ : _columnId(columnId), _streamKind(streamKind) {}
+
+ size_t hash() const {
+ size_t h1 = std::hash<uint64_t>{}(_columnId);
+ size_t h2 = std::hash<int>{}(static_cast<int>(_streamKind));
+ return h1 ^ (h2 << 1);
+ }
+
+ bool operator==(const StreamId& other) const {
+ return _columnId == other._columnId && _streamKind == other._streamKind;
+ }
+
+ bool operator<(const StreamId& other) const {
+ if (_columnId != other._columnId) {
+ return _columnId < other._columnId;
+ }
+ return static_cast<int>(_streamKind) <
static_cast<int>(other._streamKind);
+ }
+
+ std::string toString() const {
+ std::ostringstream oss;
+ oss << "[columnId=" << _columnId << ", streamKind=" <<
static_cast<int>(_streamKind) << "]";
+ return oss.str();
+ }
+
+ uint64_t columnId() const {
+ return _columnId;
+ }
+ StreamKind streamKind() const {
+ return _streamKind;
+ }
+
+ private:
+ uint64_t _columnId;
+ StreamKind _streamKind;
+ };
+
class StripeInformation {
public:
virtual ~StripeInformation();
@@ -306,4 +346,13 @@ namespace orc {
} // namespace orc
+namespace std {
+ template <>
+ struct hash<orc::StreamId> {
+ size_t operator()(const orc::StreamId& id) const {
+ return id.hash();
+ }
+ };
+} // namespace std
+
#endif
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index c52b66b7210..c7826e13add 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -63,8 +63,10 @@ namespace orc {
*/
virtual const std::string& getName() const = 0;
- virtual void beforeReadStripe(std::unique_ptr<StripeInformation>
currentStripeInformation,
- std::vector<bool> selectedColumns);
+ virtual void beforeReadStripe(
+ std::unique_ptr<StripeInformation> currentStripeInformation,
+ std::vector<bool> selectedColumns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams);
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index b8e1a914215..564c1f2aa6d 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -549,11 +549,14 @@ namespace orc {
if (selectedColumns[colId] && pbStream.has_kind() &&
(pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
- std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
- getCompression(),
- std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
- contents->stream.get(), offset, pbStream.length(),
*contents->pool)),
- getCompressionSize(), *contents->pool, contents->readerMetrics);
+ auto iter = streams.find({colId,
static_cast<StreamKind>(pbStream.kind())});
+ InputStream* inputStream =
+ (iter != streams.end()) ? iter->second.get() :
contents->stream.get();
+ std::unique_ptr<SeekableInputStream> inStream =
+ createDecompressor(getCompression(),
+ std::unique_ptr<SeekableInputStream>(new
SeekableFileInputStream(
+ inputStream, offset, pbStream.length(),
*contents->pool)),
+ getCompressionSize(), *contents->pool,
contents->readerMetrics);
if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
proto::RowIndex rowIndex;
@@ -951,7 +954,7 @@ namespace orc {
readMetadata();
}
- std::vector<int> allStripesNeeded(numberOfStripes,1);
+ std::vector<int> allStripesNeeded(numberOfStripes, 1);
if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
auto sargs = opts.getSearchArgument();
@@ -963,19 +966,20 @@ namespace orc {
return allStripesNeeded;
}
- for ( uint64_t currentStripeIndex = 0;currentStripeIndex <
numberOfStripes ; currentStripeIndex ++) {
+ for (uint64_t currentStripeIndex = 0; currentStripeIndex <
numberOfStripes;
+ currentStripeIndex++) {
const auto& currentStripeStats =
-
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
- //Not need add mMetrics,so use 0.
- allStripesNeeded[currentStripeIndex] =
sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
+
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
+ // Not need add mMetrics,so use 0.
+ allStripesNeeded[currentStripeIndex] =
+ sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);
+ ;
}
contents->sargsApplier = std::move(sargsApplier);
}
return allStripesNeeded;
}
-
-
uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
@@ -1161,6 +1165,7 @@ namespace orc {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
rowIndexes.clear();
bloomFilterIndex.clear();
+ streams.clear();
followRowInStripe = 0;
// evaluate file statistics if it exists
@@ -1189,13 +1194,13 @@ namespace orc {
if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
- const auto ¤tStripeStats =
-
contents->metadata->stripestats(static_cast<int>(currentStripe));
+ const auto& currentStripeStats =
+ contents->metadata->stripestats(static_cast<int>(currentStripe));
// skip this stripe after stats fail to satisfy sargs
uint64_t stripeRowGroupCount =
- (rowsInCurrentStripe + footer->rowindexstride() - 1) /
footer->rowindexstride();
+ (rowsInCurrentStripe + footer->rowindexstride() - 1) /
footer->rowindexstride();
isStripeNeeded =
- sargsApplier->evaluateStripeStatistics(currentStripeStats,
stripeRowGroupCount);
+ sargsApplier->evaluateStripeStatistics(currentStripeStats,
stripeRowGroupCount);
}
if (!isStripeNeeded) {
// advance to next stripe when current stripe has no matching rows
@@ -1209,11 +1214,12 @@ namespace orc {
processingStripe = currentStripe;
std::unique_ptr<StripeInformation> currentStripeInformation(new
StripeInformationImpl(
- currentStripeInfo.offset(), currentStripeInfo.indexlength(),
- currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
- currentStripeInfo.numberofrows(), contents->stream.get(),
*contents->pool,
- contents->compression, contents->blockSize,
contents->readerMetrics));
- contents->stream->beforeReadStripe(std::move(currentStripeInformation),
selectedColumns);
+ currentStripeInfo.offset(), currentStripeInfo.indexlength(),
+ currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
+ currentStripeInfo.numberofrows(), contents->stream.get(),
*contents->pool,
+ contents->compression, contents->blockSize,
contents->readerMetrics));
+ contents->stream->beforeReadStripe(std::move(currentStripeInformation),
selectedColumns,
+ streams);
if (sargsApplier) {
bool isStripeNeeded = true;
@@ -1237,8 +1243,8 @@ namespace orc {
?
getTimezoneByName(currentStripeFooter.writertimezone())
: localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo,
currentStripeFooter,
- currentStripeInfo.offset(),
*contents->stream, writerTimezone,
- readerTimezone);
+ currentStripeInfo.offset(),
*contents->stream, streams,
+ writerTimezone, readerTimezone);
reader = buildReader(*contents->schema, stripeStreams,
useTightNumericVector);
if (stringDictFilter != nullptr) {
@@ -1760,7 +1766,9 @@ namespace orc {
// PASS
};
- void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation>
currentStripeInformation,
- std::vector<bool> selectedColumns) {}
+ void InputStream::beforeReadStripe(
+ std::unique_ptr<StripeInformation> currentStripeInformation,
+ std::vector<bool> selectedColumns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {}
} // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 9505022c558..7ec049ad963 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -162,6 +162,7 @@ namespace orc {
// contents
std::shared_ptr<FileContents> contents;
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>> streams;
const bool throwOnHive11DecimalOverflow;
const int32_t forcedScaleOnHive11Decimal;
@@ -322,10 +323,9 @@ namespace orc {
// internal methods
void readMetadata() const;
void checkOrcVersion();
- void getRowIndexStatistics(
- const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
- const proto::StripeFooter& currentStripeFooter,
- std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
+ void getRowIndexStatistics(const proto::StripeInformation& stripeInfo,
uint64_t stripeIndex,
+ const proto::StripeFooter& currentStripeFooter,
+
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;
// metadata
mutable bool isMetadataLoaded;
@@ -425,8 +425,8 @@ namespace orc {
return contents->stream.get();
}
- void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
- contents->stream = std::move(inputStreamUPtr);
+ void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override {
+ contents->stream = std::move(inputStreamUPtr);
}
uint64_t getMemoryUse(int stripeIx = -1) override;
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index 1f43da4f243..8efa23efa86 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -25,17 +25,18 @@
namespace orc {
- StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t
_index,
- const proto::StripeInformation&
_stripeInfo,
- const proto::StripeFooter& _footer,
uint64_t _stripeStart,
- InputStream& _input, const Timezone&
_writerTimezone,
- const Timezone& _readerTimezone)
+ StripeStreamsImpl::StripeStreamsImpl(
+ const RowReaderImpl& _reader, uint64_t _index, const
proto::StripeInformation& _stripeInfo,
+ const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream&
_input,
+ const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
_streams,
+ const Timezone& _writerTimezone, const Timezone& _readerTimezone)
: reader(_reader),
stripeInfo(_stripeInfo),
footer(_footer),
stripeIndex(_index),
stripeStart(_stripeStart),
input(_input),
+ streams(_streams),
writerTimezone(_writerTimezone),
readerTimezone(_readerTimezone) {
// PASS
@@ -87,8 +88,10 @@ namespace orc {
const proto::Stream& stream = footer.streams(i);
if (stream.has_kind() && stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
+ auto iter = streams.find({columnId, static_cast<StreamKind>(kind)});
+ InputStream* inputStream = (iter != streams.end()) ?
iter->second.get() : &input;
uint64_t streamLength = stream.length();
- uint64_t myBlock = shouldStream ? input.getNaturalReadSize() :
streamLength;
+ uint64_t myBlock = shouldStream ? inputStream->getNaturalReadSize() :
streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe
" << stripeIndex
@@ -100,7 +103,7 @@ namespace orc {
}
return createDecompressor(reader.getCompression(),
std::make_unique<SeekableFileInputStream>(
- &input, offset, stream.length(), *pool,
myBlock),
+ inputStream, offset, stream.length(),
*pool, myBlock),
reader.getCompressionSize(), *pool,
reader.getFileContents().readerMetrics);
}
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 74bebda6f25..57e51ef76f0 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -43,14 +43,16 @@ namespace orc {
const uint64_t stripeIndex;
const uint64_t stripeStart;
InputStream& input;
+ const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams;
const Timezone& writerTimezone;
const Timezone& readerTimezone;
public:
- StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
- const proto::StripeInformation& stripeInfo, const
proto::StripeFooter& footer,
- uint64_t stripeStart, InputStream& input, const
Timezone& writerTimezone,
- const Timezone& readerTimezone);
+ StripeStreamsImpl(
+ const RowReaderImpl& reader, uint64_t index, const
proto::StripeInformation& stripeInfo,
+ const proto::StripeFooter& footer, uint64_t stripeStart, InputStream&
input,
+ const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams,
+ const Timezone& writerTimezone, const Timezone& readerTimezone);
virtual ~StripeStreamsImpl() override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]