This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push:
new ff8c18e [Feature] Implements ORC lazy materialization. (#56)
ff8c18e is described below
commit ff8c18e96d55942f3767a2d41960cabfb0efd144
Author: Qi Chen <[email protected]>
AuthorDate: Thu Apr 27 13:56:21 2023 +0800
[Feature] Implements ORC lazy materialization. (#56)
Refer to orc java implementation: LazyIO of non-filter columns in the
presence of filters
Note: Row-level filtering by selection vector is currently not implemented,
will do it in future PR.
---
.clang-format | 26 ++++
c++/include/orc/Reader.hh | 48 ++++++-
c++/include/orc/Type.hh | 39 +++++
c++/src/ColumnReader.cc | 357 ++++++++++++++++++++++++----------------------
c++/src/ColumnReader.hh | 19 ++-
c++/src/Options.hh | 28 ++++
c++/src/Reader.cc | 295 +++++++++++++++++++++++++++++++-------
c++/src/Reader.hh | 55 ++++++-
c++/src/TypeImpl.cc | 29 +++-
c++/src/TypeImpl.hh | 9 ++
10 files changed, 663 insertions(+), 242 deletions(-)
diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..0779071
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+Language: Cpp
+BasedOnStyle: Google
+ColumnLimit: 100
+IndentWidth: 2
+NamespaceIndentation: All
+UseTab: Never
+AllowShortFunctionsOnASingleLine: Empty
+DerivePointerAlignment: false
+IncludeBlocks: Preserve
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index d8f83f9..2666b4d 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -186,6 +186,26 @@ namespace orc {
RowReaderOptions& includeTypes(const std::list<uint64_t>& types);
/**
+ * For files that have structs as the top-level object, filter the fields.
+ * by index. The first field is 0, the second 1, and so on. By default,
+ * all columns are read. This option clears any previous setting of
+ * the selected columns.
+ * @param filterColIndexes a list of fields to read
+ * @return this
+ */
+ RowReaderOptions& filter(const std::list<uint64_t>& filterColIndexes);
+
+ /**
+ * For files that have structs as the top-level object, filter the fields
+ * by name. By default, all columns are read. This option clears
+ * any previous setting of the selected columns.
+ * @param filterColNames a list of fields to read
+ * @return this
+ */
+ RowReaderOptions& filter(const std::list<std::string>& filterColNames);
+
+
+ /**
* A map type of <typeId, ReadIntent>.
*/
typedef std::map<uint64_t, ReadIntent> IdReadIntentMap;
@@ -281,6 +301,12 @@ namespace orc {
*/
const std::list<std::string>& getIncludeNames() const;
+ /**
+ * Get the list of selected columns to read. All children of the selected
+ * columns are also selected.
+ */
+ const std::list<std::string>& getFilterColNames() const;
+
/**
* Get the start of the range for the data being processed.
* @return if not set, return 0
@@ -338,6 +364,12 @@ namespace orc {
bool getUseTightNumericVector() const;
};
+ class ORCFilter {
+ public:
+ virtual ~ORCFilter() = default;
+ virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
void* arg = nullptr) const = 0;
+ };
+
class RowReader;
/**
@@ -523,14 +555,14 @@ namespace orc {
* Create a RowReader based on this reader with the default options.
* @return a RowReader to read the rows
*/
- virtual std::unique_ptr<RowReader> createRowReader() const = 0;
+ virtual std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter
= nullptr) const = 0;
/**
* Create a RowReader based on this reader.
* @param options RowReader Options
* @return a RowReader to read the rows
*/
- virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options) const = 0;
+ virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options, const ORCFilter* filter = nullptr) const = 0;
/**
* Get the name of the input stream.
@@ -616,13 +648,23 @@ namespace orc {
*/
virtual std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
const = 0;
+ /**
+ * Read the next row batch from the current position.
+ * Caller must look at numElements in the row batch to determine how
+ * many rows were read.
+ * @param data the row batch to read into.
+ * @param arg argument.
+ * @return number of rows.
+ */
+ virtual uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) =
0;
+
/**
* Read the next row batch from the current position.
* Caller must look at numElements in the row batch to determine how
* many rows were read.
* @param data the row batch to read into.
* @return true if a non-zero number of rows were read or false if the
- * end of the file was reached.
+ * end of the file was reached.
*/
virtual bool next(ColumnVectorBatch& data) = 0;
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index c8ada75..73b813e 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -19,11 +19,46 @@
#ifndef ORC_TYPE_HH
#define ORC_TYPE_HH
+#include <bitset>
+#include <unordered_set>
#include "MemoryPool.hh"
#include "orc/Vector.hh"
#include "orc/orc-config.hh"
namespace orc {
+ enum class ReaderCategory {
+ FILTER_CHILD, // Primitive type that is a filter column
+ FILTER_PARENT, // Compound type with filter children
+ NON_FILTER // Non-filter column
+ };
+
+ class ReadPhase {
+ public:
+ static const int NUM_CATEGORIES = 3; // Number of values in
ReaderCategory
+ std::bitset<NUM_CATEGORIES> categories;
+
+ static const ReadPhase ALL;
+ static const ReadPhase LEADERS;
+ static const ReadPhase FOLLOWERS;
+ static const ReadPhase LEADER_PARENTS;
+ static const ReadPhase FOLLOWERS_AND_PARENTS;
+
+ static ReadPhase fromCategories(const
std::unordered_set<ReaderCategory>& cats) {
+ ReadPhase phase;
+ for (ReaderCategory cat : cats) {
+ phase.categories.set(static_cast<size_t>(cat));
+ }
+ return phase;
+ }
+
+ bool contains(ReaderCategory cat) const {
+ return categories.test(static_cast<size_t>(cat));
+ }
+
+ bool operator==(const ReadPhase& other) const {
+ return categories == other.categories;
+ }
+ };
enum TypeKind {
BOOLEAN = 0,
@@ -54,7 +89,9 @@ namespace orc {
virtual uint64_t getMaximumColumnId() const = 0;
virtual TypeKind getKind() const = 0;
virtual uint64_t getSubtypeCount() const = 0;
+ virtual Type* getParent() const = 0;
virtual const Type* getSubtype(uint64_t childId) const = 0;
+ virtual Type* getSubtype(uint64_t childId) = 0;
virtual const std::string& getFieldName(uint64_t childId) const = 0;
virtual uint64_t getMaximumLength() const = 0;
virtual uint64_t getPrecision() const = 0;
@@ -64,6 +101,8 @@ namespace orc {
virtual Type& removeAttribute(const std::string& key) = 0;
virtual std::vector<std::string> getAttributeKeys() const = 0;
virtual std::string getAttributeValue(const std::string& key) const = 0;
+ virtual ReaderCategory getReaderCategory() const = 0;
+ virtual void setReaderCategory(ReaderCategory readerCategory) = 0;
virtual std::string toString() const = 0;
/**
* Get the Type with the given column ID
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 2a72c80..cabcdbe 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -46,8 +46,9 @@ namespace orc {
}
}
- ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe)
- : columnId(type.getColumnId()),
+ ColumnReader::ColumnReader(const Type& _type, StripeStreams& stripe)
+ : type(_type),
+ columnId(type.getColumnId()),
memoryPool(stripe.getMemoryPool()),
metrics(stripe.getReaderMetrics()) {
std::unique_ptr<SeekableInputStream> stream =
@@ -61,7 +62,7 @@ namespace orc {
// PASS
}
- uint64_t ColumnReader::skip(uint64_t numValues) {
+ uint64_t ColumnReader::skip(uint64_t numValues, const ReadPhase& readPhase) {
ByteRleDecoder* decoder = notNullDecoder.get();
if (decoder) {
// page through the values that we want to skip
@@ -84,7 +85,7 @@ namespace orc {
return numValues;
}
- void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* incomingMask) {
+ void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* incomingMask, const ReadPhase& readPhase) {
if (numValues > rowBatch.capacity) {
rowBatch.resize(numValues);
}
@@ -100,16 +101,19 @@ namespace orc {
return;
}
}
+ rowBatch.hasNulls = false;
} else if (incomingMask) {
// If we don't have a notNull stream, copy the incomingMask
rowBatch.hasNulls = true;
memcpy(rowBatch.notNull.data(), incomingMask, numValues);
return;
+ } else {
+ rowBatch.hasNulls = false;
+ memset(rowBatch.notNull.data(), 1, numValues);
}
- rowBatch.hasNulls = false;
}
- void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions) {
+ void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase) {
if (notNullDecoder.get()) {
notNullDecoder->seek(positions.at(columnId));
}
@@ -142,11 +146,11 @@ namespace orc {
BooleanColumnReader(const Type& type, StripeStreams& stipe);
~BooleanColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
};
template <typename BatchType>
@@ -164,16 +168,16 @@ namespace orc {
}
template <typename BatchType>
- uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues, const
ReadPhase& readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
rle->skip(numValues);
return numValues;
}
template <typename BatchType>
void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// Since the byte rle places the output in a char* and BatchType here may
be
// LongVectorBatch with long*. We cheat here in that case and use the long*
// and then expand it in a second pass..
@@ -185,8 +189,8 @@ namespace orc {
template <typename BatchType>
void BooleanColumnReader<BatchType>::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
}
@@ -205,14 +209,14 @@ namespace orc {
~ByteColumnReader() override = default;
- uint64_t skip(uint64_t numValues) override {
- numValues = ColumnReader::skip(numValues);
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override {
+ numValues = ColumnReader::skip(numValues, readPhase);
rle->skip(numValues);
return numValues;
}
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override {
- ColumnReader::next(rowBatch, numValues, notNull);
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// Since the byte rle places the output in a char* instead of long*,
// we cheat here and use the long* and then expand it in a second pass.
auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
@@ -221,8 +225,8 @@ namespace orc {
expandBytesToIntegers(ptr, numValues);
}
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override {
- ColumnReader::seekToRowGroup(positions);
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
}
};
@@ -245,20 +249,20 @@ namespace orc {
// PASS
}
- uint64_t skip(uint64_t numValues) override {
- numValues = ColumnReader::skip(numValues);
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override {
+ numValues = ColumnReader::skip(numValues, readPhase);
rle->skip(numValues);
return numValues;
}
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override {
- ColumnReader::next(rowBatch, numValues, notNull);
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
}
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override {
- ColumnReader::seekToRowGroup(positions);
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
}
};
@@ -276,11 +280,11 @@ namespace orc {
TimestampColumnReader(const Type& type, StripeStreams& stripe, bool
isInstantType);
~TimestampColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
};
TimestampColumnReader::TimestampColumnReader(const Type& type,
StripeStreams& stripe,
@@ -304,15 +308,15 @@ namespace orc {
// PASS
}
- uint64_t TimestampColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t TimestampColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
secondsRle->skip(numValues);
nanoRle->skip(numValues);
return numValues;
}
- void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
TimestampVectorBatch& timestampBatch =
dynamic_cast<TimestampVectorBatch&>(rowBatch);
int64_t* secsBuffer = timestampBatch.data.data();
@@ -353,8 +357,8 @@ namespace orc {
}
void TimestampColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
secondsRle->seek(positions.at(columnId));
nanoRle->seek(positions.at(columnId));
}
@@ -365,11 +369,11 @@ namespace orc {
DoubleColumnReader(const Type& type, StripeStreams& stripe);
~DoubleColumnReader() override {}
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
private:
std::unique_ptr<SeekableInputStream> inputStream;
@@ -450,8 +454,8 @@ namespace orc {
template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::skip(
- uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t numValues, const ReadPhase& readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue *
numValues) {
bufferPointer += bytesPerValue * numValues;
@@ -473,8 +477,8 @@ namespace orc {
template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
void DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::next(
- ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const
ReadPhase& readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
ValueType* outArray =
@@ -519,8 +523,8 @@ namespace orc {
template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
void DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
inputStream->seek(positions.at(columnId));
// clear buffer state after seek
bufferEnd = nullptr;
@@ -552,13 +556,13 @@ namespace orc {
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
~StringDictionaryColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
};
StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
@@ -602,15 +606,15 @@ namespace orc {
// PASS
}
- uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t StringDictionaryColumnReader::skip(uint64_t numValues, const
ReadPhase& readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
rle->skip(numValues);
return numValues;
}
void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
@@ -644,8 +648,8 @@ namespace orc {
}
void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const
ReadPhase& readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
rowBatch.isEncoded = true;
@@ -657,8 +661,8 @@ namespace orc {
}
void StringDictionaryColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
}
@@ -682,11 +686,11 @@ namespace orc {
StringDirectColumnReader(const Type& type, StripeStreams& stipe);
~StringDirectColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
};
StringDirectColumnReader::StringDirectColumnReader(const Type& type,
StripeStreams& stripe)
@@ -706,9 +710,9 @@ namespace orc {
// PASS
}
- uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
+ uint64_t StringDirectColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
const size_t BUFFER_SIZE = 1024;
- numValues = ColumnReader::skip(numValues);
+ numValues = ColumnReader::skip(numValues, readPhase);
int64_t buffer[BUFFER_SIZE];
uint64_t done = 0;
size_t totalBytes = 0;
@@ -756,8 +760,8 @@ namespace orc {
}
void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
@@ -814,8 +818,8 @@ namespace orc {
}
void StringDirectColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
blobStream->seek(positions.at(columnId));
lengthRle->seek(positions.at(columnId));
// clear buffer state after seek
@@ -830,17 +834,17 @@ namespace orc {
public:
StructColumnReader(const Type& type, StripeStreams& stipe, bool
useTightNumericVector = false);
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
};
StructColumnReader::StructColumnReader(const Type& type, StripeStreams&
stripe,
@@ -865,45 +869,51 @@ namespace orc {
}
}
- uint64_t StructColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t StructColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
for (auto& ptr : children) {
- ptr->skip(numValues);
+ if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
+ ptr->skip(numValues, readPhase);
+ }
}
return numValues;
}
- void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull) {
- nextInternal<false>(rowBatch, numValues, notNull);
+ void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase);
}
void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- nextInternal<true>(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase);
}
template <bool encoded>
void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
uint64_t i = 0;
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
- if (encoded) {
-
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues,
- notNull);
- } else {
- (*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues, notNull);
+ if (shouldProcessChild((*iter)->getType().getReaderCategory(),
readPhase)) {
+ if (encoded) {
+ (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch
&>(rowBatch).fields[i]), numValues,
+ notNull, readPhase);
+ } else {
+ (*iter)->next(*(dynamic_cast<StructVectorBatch
&>(rowBatch).fields[i]), numValues, notNull, readPhase);
+ }
}
}
}
void StructColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
for (auto& ptr : children) {
- ptr->seekToRowGroup(positions);
+ if (shouldProcessChild(ptr->getType().getReaderCategory(), readPhase)) {
+ ptr->seekToRowGroup(positions, readPhase);
+ }
}
}
@@ -916,17 +926,17 @@ namespace orc {
ListColumnReader(const Type& type, StripeStreams& stipe, bool
useTightNumericVector = false);
~ListColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
};
ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
@@ -949,8 +959,8 @@ namespace orc {
// PASS
}
- uint64_t ListColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t ListColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
ColumnReader* childReader = child.get();
if (childReader) {
const uint64_t BUFFER_SIZE = 1024;
@@ -965,26 +975,26 @@ namespace orc {
}
lengthsRead += chunk;
}
- childReader->skip(childrenElements);
+ childReader->skip(childrenElements, readPhase);
} else {
rle->skip(numValues);
}
return numValues;
}
- void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
- nextInternal<false>(rowBatch, numValues, notNull);
+ void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull, const ReadPhase& readPhase) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase);
}
void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- nextInternal<true>(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase);
}
template <bool encoded>
void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
int64_t* offsets = listBatch.offsets.data();
notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
@@ -1011,18 +1021,18 @@ namespace orc {
ColumnReader* childReader = child.get();
if (childReader) {
if (encoded) {
- childReader->nextEncoded(*(listBatch.elements.get()), totalChildren,
nullptr);
+ childReader->nextEncoded(*(listBatch.elements.get()), totalChildren,
nullptr, readPhase);
} else {
- childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
+ childReader->next(*(listBatch.elements.get()), totalChildren, nullptr,
readPhase);
}
}
}
- void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
if (child.get()) {
- child->seekToRowGroup(positions);
+ child->seekToRowGroup(positions, readPhase);
}
}
@@ -1036,17 +1046,17 @@ namespace orc {
MapColumnReader(const Type& type, StripeStreams& stipe, bool
useTightNumericVector = false);
~MapColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
};
MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
@@ -1073,8 +1083,8 @@ namespace orc {
// PASS
}
- uint64_t MapColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t MapColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
ColumnReader* rawKeyReader = keyReader.get();
ColumnReader* rawElementReader = elementReader.get();
if (rawKeyReader || rawElementReader) {
@@ -1091,10 +1101,10 @@ namespace orc {
lengthsRead += chunk;
}
if (rawKeyReader) {
- rawKeyReader->skip(childrenElements);
+ rawKeyReader->skip(childrenElements, readPhase);
}
if (rawElementReader) {
- rawElementReader->skip(childrenElements);
+ rawElementReader->skip(childrenElements, readPhase);
}
} else {
rle->skip(numValues);
@@ -1102,19 +1112,19 @@ namespace orc {
return numValues;
}
- void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
- nextInternal<false>(rowBatch, numValues, notNull);
+ void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull, const ReadPhase& readPhase) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase);
}
void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- nextInternal<true>(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase& readPhase)
{
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase);
}
template <bool encoded>
void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
int64_t* offsets = mapBatch.offsets.data();
notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
@@ -1141,29 +1151,29 @@ namespace orc {
ColumnReader* rawKeyReader = keyReader.get();
if (rawKeyReader) {
if (encoded) {
- rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren,
nullptr);
+ rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren,
nullptr, readPhase);
} else {
- rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
+ rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr,
readPhase);
}
}
ColumnReader* rawElementReader = elementReader.get();
if (rawElementReader) {
if (encoded) {
- rawElementReader->nextEncoded(*(mapBatch.elements.get()),
totalChildren, nullptr);
+ rawElementReader->nextEncoded(*(mapBatch.elements.get()),
totalChildren, nullptr, readPhase);
} else {
- rawElementReader->next(*(mapBatch.elements.get()), totalChildren,
nullptr);
+ rawElementReader->next(*(mapBatch.elements.get()), totalChildren,
nullptr, readPhase);
}
}
}
- void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
if (keyReader.get()) {
- keyReader->seekToRowGroup(positions);
+ keyReader->seekToRowGroup(positions, readPhase);
}
if (elementReader.get()) {
- elementReader->seekToRowGroup(positions);
+ elementReader->seekToRowGroup(positions, readPhase);
}
}
@@ -1177,17 +1187,17 @@ namespace orc {
public:
UnionColumnReader(const Type& type, StripeStreams& stipe, bool
useTightNumericVector = false);
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
};
UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
@@ -1211,8 +1221,8 @@ namespace orc {
}
}
- uint64_t UnionColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t UnionColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
const uint64_t BUFFER_SIZE = 1024;
char buffer[BUFFER_SIZE];
uint64_t lengthsRead = 0;
@@ -1227,26 +1237,27 @@ namespace orc {
lengthsRead += chunk;
}
for (size_t i = 0; i < numChildren; ++i) {
- if (counts[i] != 0 && childrenReader[i] != nullptr) {
- childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
+ if (counts[i] != 0 && childrenReader[i] != nullptr
+ &&
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
+ childrenReader[i]->skip(static_cast<uint64_t>(counts[i]), readPhase);
}
}
return numValues;
}
- void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull) {
- nextInternal<false>(rowBatch, numValues, notNull);
+ void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase);
}
void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- nextInternal<true>(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase);
}
template <bool encoded>
void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
uint64_t* offsets = unionBatch.offsets.data();
int64_t* counts = childrenCounts.data();
@@ -1268,25 +1279,25 @@ namespace orc {
}
// read the right number of each child column
for (size_t i = 0; i < numChildren; ++i) {
- if (childrenReader[i] != nullptr) {
+ if (childrenReader[i] != nullptr &&
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
if (encoded) {
childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
- static_cast<uint64_t>(counts[i]),
nullptr);
+ static_cast<uint64_t>(counts[i]),
nullptr, readPhase);
} else {
childrenReader[i]->next(*(unionBatch.children[i]),
static_cast<uint64_t>(counts[i]),
- nullptr);
+ nullptr, readPhase);
}
}
}
}
void UnionColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
for (size_t i = 0; i < numChildren; ++i) {
- if (childrenReader[i] != nullptr) {
- childrenReader[i]->seekToRowGroup(positions);
+ if (childrenReader[i] != nullptr &&
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
+ childrenReader[i]->seekToRowGroup(positions, readPhase);
}
}
}
@@ -1360,11 +1371,11 @@ namespace orc {
Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal64ColumnReader() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
};
const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
@@ -1407,8 +1418,8 @@ namespace orc {
// PASS
}
- uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t Decimal64ColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
uint64_t skipped = 0;
while (skipped < numValues) {
readBuffer();
@@ -1420,8 +1431,8 @@ namespace orc {
return numValues;
}
- void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch& batch =
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
int64_t* values = batch.values.data();
@@ -1463,8 +1474,8 @@ namespace orc {
}
void Decimal64ColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
+ std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ ColumnReader::seekToRowGroup(positions, readPhase);
valueStream->seek(positions.at(columnId));
scaleDecoder->seek(positions.at(columnId));
// clear buffer state after seek
@@ -1477,7 +1488,7 @@ namespace orc {
Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal128ColumnReader() override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
private:
void readInt128(Int128& value, int32_t currentScale) {
@@ -1510,8 +1521,8 @@ namespace orc {
}
void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase& readPhase)
{
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch& batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
Int128* values = batch.values.data();
@@ -1543,9 +1554,9 @@ namespace orc {
Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe);
~Decimal64ColumnReaderV2() override;
- uint64_t skip(uint64_t numValues) override;
+ uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
};
Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type,
StripeStreams& stripe)
@@ -1566,15 +1577,15 @@ namespace orc {
// PASS
}
- uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
+ uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues, const ReadPhase&
readPhase) {
+ numValues = ColumnReader::skip(numValues, readPhase);
valueDecoder->skip(numValues);
return numValues;
}
void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch& batch =
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
valueDecoder->next(batch.values.data(), numValues, notNull);
@@ -1628,7 +1639,7 @@ namespace orc {
DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
~DecimalHive11ColumnReader() override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull)
override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
};
DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type,
StripeStreams& stripe)
@@ -1643,8 +1654,8 @@ namespace orc {
}
void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
+ char* notNull, const ReadPhase&
readPhase) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch& batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
Int128* values = batch.values.data();
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 3b765cb..25363e2 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -109,21 +109,30 @@ namespace orc {
class ColumnReader {
protected:
std::unique_ptr<ByteRleDecoder> notNullDecoder;
+ const Type& type;
uint64_t columnId;
MemoryPool& memoryPool;
ReaderMetrics* metrics;
+ static bool shouldProcessChild(ReaderCategory readerCategory, const
ReadPhase& readPhase) {
+ return readPhase.contains(readerCategory) || readerCategory ==
ReaderCategory::FILTER_PARENT;
+ }
+
public:
ColumnReader(const Type& type, StripeStreams& stipe);
virtual ~ColumnReader();
+ const Type& getType() const {
+ return type;
+ }
+
/**
* Skip number of specified rows.
* @param numValues the number of values to skip
* @return the number of non-null values skipped
*/
- virtual uint64_t skip(uint64_t numValues);
+ virtual uint64_t skip(uint64_t numValues, const ReadPhase& readPhase =
ReadPhase::ALL);
/**
* Read the next group of values into this rowBatch.
@@ -133,7 +142,7 @@ namespace orc {
* a mask (with at least numValues bytes) for which values to
* set.
*/
- virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull);
+ virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase = ReadPhase::ALL);
/**
* Read the next group of values without decoding
@@ -143,16 +152,16 @@ namespace orc {
* a mask (with at least numValues bytes) for which values to
* set.
*/
- virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull) {
+ virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull, const ReadPhase& readPhase = ReadPhase::ALL) {
rowBatch.isEncoded = false;
- next(rowBatch, numValues, notNull);
+ next(rowBatch, numValues, notNull, readPhase);
}
/**
* Seek to beginning of a row group in the current stripe
* @param positions a list of PositionProviders storing the positions
*/
- virtual void seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions);
+ virtual void seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase = ReadPhase::ALL);
};
/**
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 151434e..40a583e 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -34,6 +34,13 @@ namespace orc {
ColumnSelection_TYPE_IDS = 3,
};
+ enum ColumnFilter {
+ ColumnFilter_NONE = 0,
+ ColumnFilter_NAMES = 1,
+ ColumnFilter_FIELD_IDS = 2,
+ ColumnFilter_TYPE_IDS = 3,
+ };
+
/**
* ReaderOptions Implementation
*/
@@ -130,6 +137,9 @@ namespace orc {
ColumnSelection selection;
std::list<uint64_t> includedColumnIndexes;
std::list<std::string> includedColumnNames;
+ ColumnFilter filter;
+ std::list<uint64_t> filterColumnIndexes;
+ std::list<std::string> filterColumnNames;
uint64_t dataStart;
uint64_t dataLength;
bool throwOnHive11DecimalOverflow;
@@ -214,6 +224,20 @@ namespace orc {
return *this;
}
+ RowReaderOptions& RowReaderOptions::filter(const std::list<uint64_t>&
filterColIndexes) {
+ privateBits->filter = ColumnFilter_FIELD_IDS;
+ privateBits->filterColumnIndexes.assign(filterColIndexes.begin(),
filterColIndexes.end());
+ privateBits->filterColumnNames.clear();
+ return *this;
+ }
+
+ RowReaderOptions& RowReaderOptions::filter(const std::list<std::string>&
filterColNames) {
+ privateBits->filter = ColumnFilter_NAMES;
+ privateBits->filterColumnNames.assign(filterColNames.begin(),
filterColNames.end());
+ privateBits->filterColumnIndexes.clear();
+ return *this;
+ }
+
RowReaderOptions& RowReaderOptions::range(uint64_t offset, uint64_t length) {
privateBits->dataStart = offset;
privateBits->dataLength = length;
@@ -240,6 +264,10 @@ namespace orc {
return privateBits->includedColumnNames;
}
+ const std::list<std::string>& RowReaderOptions::getFilterColNames() const {
+ return privateBits->filterColumnNames;
+ }
+
uint64_t RowReaderOptions::getOffset() const {
return privateBits->dataStart;
}
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 8ccb0ec..52052fb 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -247,7 +247,8 @@ namespace orc {
}
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
- const RowReaderOptions& opts)
+ const RowReaderOptions& opts,
+ const ORCFilter* _filter)
: localTimezone(getLocalTimezone()),
contents(_contents),
throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
@@ -255,7 +256,8 @@ namespace orc {
footer(contents->footer.get()),
firstRowOfStripe(*contents->pool, 0),
enableEncodedBlock(opts.getEnableLazyDecoding()),
- readerTimezone(getTimezoneByName(opts.getTimezoneName())) {
+ readerTimezone(getTimezoneByName(opts.getTimezoneName())),
+ filter(_filter) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
currentStripe = numberOfStripes;
@@ -309,6 +311,37 @@ namespace orc {
}
skipBloomFilters = hasBadBloomFilters();
+
+ const std::list<std::string>& filterCols = opts.getFilterColNames();
+
+ // Map columnNames to ColumnIds
+ buildTypeNameIdMap(contents->schema.get());
+
+ std::unordered_set<int> filterColIds;
+ if (!filterCols.empty()) {
+ for (auto& colName: filterCols) {
+ auto iter = nameTypeMap.find(colName);
+ if (iter != nameTypeMap.end()) {
+ Type* type = iter->second;
+ while (type != nullptr) {
+ if (type->getSubtypeCount() == 0) {
+ type->setReaderCategory(ReaderCategory::FILTER_CHILD);
+ } else {
+ type->setReaderCategory(ReaderCategory::FILTER_PARENT);
+ }
+ filterColIds.emplace(type->getColumnId());
+ type = type->getParent();
+ }
+ } else {
+ throw ParseError("Invalid column selected " + colName);
+ }
+ }
+ startReadPhase = ReadPhase::LEADERS;
+ readerContext = std::unique_ptr<ReaderContext>(new ReaderContext());
+ readerContext->setFilterCallback(std::move(filterColIds), filter);
+ } else {
+ startReadPhase = ReadPhase::ALL;
+ }
}
// Check if the file has inconsistent bloom filters.
@@ -337,7 +370,43 @@ namespace orc {
return false;
}
- CompressionKind RowReaderImpl::getCompression() const {
+ /**
+ * Recurses over a type tree and build two maps
+ * map<TypeName, TypeId>, map<TypeId, Type>
+ */
+ void RowReaderImpl::buildTypeNameIdMap(Type* type) {
+ // map<type_id, Type*>
+ idTypeMap[type->getColumnId()] = type;
+
+ if (STRUCT == type->getKind()) {
+ for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
+ const std::string& fieldName = type->getFieldName(i);
+ columns.push_back(fieldName);
+ nameTypeMap[toDotColumnPath()] = type->getSubtype(i);
+ buildTypeNameIdMap(type->getSubtype(i));
+ columns.pop_back();
+ }
+ } else {
+ // other non-primitive type
+ for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
+ buildTypeNameIdMap(type->getSubtype(j));
+ }
+ }
+ }
+
+ std::string RowReaderImpl::toDotColumnPath() {
+ if (columns.empty()) {
+ return std::string();
+ }
+ std::ostringstream columnStream;
+ std::copy(columns.begin(), columns.end(),
+ std::ostream_iterator<std::string>(columnStream, "."));
+ std::string columnPath = columnStream.str();
+ return columnPath.substr(0, columnPath.length() - 1);
+ }
+
+
+ CompressionKind RowReaderImpl::getCompression() const {
return contents->compression;
}
@@ -401,7 +470,7 @@ namespace orc {
// current stripe doesn't have row indexes
currentStripe = seekToStripe;
currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
- startNextStripe();
+ startNextStripe(startReadPhase);
if (currentStripe >= lastStripe) {
return;
}
@@ -422,14 +491,14 @@ namespace orc {
loadStripeIndex();
}
// TODO(ORC-1175): process the failures of loadStripeIndex() call
- seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride));
+ seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride),
startReadPhase);
// skip leading rows in the target row group
rowsToSkip %= rowIndexStride;
}
// 'reader' is reset in startNextStripe(). It could be nullptr if
'rowsToSkip' is 0,
// e.g. when startNextStripe() skips all remaining rows of the file.
if (rowsToSkip > 0) {
- reader->skip(rowsToSkip);
+ reader->skip(rowsToSkip, startReadPhase);
}
}
@@ -477,7 +546,7 @@ namespace orc {
}
}
- void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
+ void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId, const
ReadPhase& readPhase) {
// store positions for selected columns
std::list<std::list<uint64_t>> positions;
// store position providers for selected colimns
@@ -497,7 +566,7 @@ namespace orc {
positionProviders.insert(std::make_pair(colId,
PositionProvider(position)));
}
- reader->seekToRowGroup(positionProviders);
+ reader->seekToRowGroup(positionProviders, readPhase);
}
const FileContents& RowReaderImpl::getFileContents() const {
@@ -826,17 +895,17 @@ namespace orc {
}
}
- std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
+ std::unique_ptr<RowReader> ReaderImpl::createRowReader(const ORCFilter*
filter) const {
RowReaderOptions defaultOpts;
- return createRowReader(defaultOpts);
+ return createRowReader(defaultOpts, filter);
}
- std::unique_ptr<RowReader> ReaderImpl::createRowReader(const
RowReaderOptions& opts) const {
+ std::unique_ptr<RowReader> ReaderImpl::createRowReader(const
RowReaderOptions& opts, const ORCFilter* filter) const {
if (opts.getSearchArgument() && !isMetadataLoaded) {
// load stripe statistics for PPD
readMetadata();
}
- return std::make_unique<RowReaderImpl>(contents, opts);
+ return std::make_unique<RowReaderImpl>(contents, opts, filter);
}
uint64_t maxStreamsForType(const proto::Type& type) {
@@ -1020,10 +1089,11 @@ namespace orc {
}
}
- void RowReaderImpl::startNextStripe() {
+ void RowReaderImpl::startNextStripe(const ReadPhase& readPhase) {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
rowIndexes.clear();
bloomFilterIndex.clear();
+ followRowInStripe = 0;
// evaluate file statistics if it exists
if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer,
numRowGroupsInStripeRange)) {
@@ -1051,6 +1121,9 @@ namespace orc {
rowsInCurrentStripe = currentStripeInfo.numberofrows();
processingStripe = currentStripe;
+ // read row group statistics and bloom filters of current stripe
+ loadStripeIndex();
+
if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
@@ -1064,9 +1137,6 @@ namespace orc {
}
if (isStripeNeeded) {
- // read row group statistics and bloom filters of current stripe
- loadStripeIndex();
-
// select row groups to read in the current stripe
sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes,
bloomFilterIndex);
if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
@@ -1100,7 +1170,7 @@ namespace orc {
sargsApplier->getNextSkippedRows());
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
if (currentRowInStripe > 0) {
- seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()));
+ seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()), readPhase);
}
}
} else {
@@ -1110,56 +1180,175 @@ namespace orc {
}
bool RowReaderImpl::next(ColumnVectorBatch& data) {
+ return nextBatch(data, nullptr) != 0;
+ }
+
+ uint64_t RowReaderImpl::nextBatch(ColumnVectorBatch& data, void* arg) {
SCOPED_STOPWATCH(contents->readerMetrics, ReaderInclusiveLatencyUs,
ReaderCall);
- if (currentStripe >= lastStripe) {
- data.numElements = 0;
- markEndOfFile();
- return false;
- }
- if (currentRowInStripe == 0) {
- startNextStripe();
- }
- uint64_t rowsToRead =
- std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe -
currentRowInStripe);
- if (sargsApplier && rowsToRead > 0) {
- rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe,
rowsInCurrentStripe,
- footer->rowindexstride(),
sargsApplier->getNextSkippedRows());
- }
- data.numElements = rowsToRead;
- if (rowsToRead == 0) {
- markEndOfFile();
- return false;
- }
- if (enableEncodedBlock) {
- reader->nextEncoded(data, rowsToRead, nullptr);
- } else {
- reader->next(data, rowsToRead, nullptr);
- }
- // update row number
- previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
- currentRowInStripe += rowsToRead;
+ uint64_t readRows = 0;
+ uint64_t rowsToRead = 0;
+ // do...while is required to handle the case where the filter eliminates
all rows in the
+ // batch, we never return an empty batch unless the file is exhausted
+ do {
+ if (currentStripe >= lastStripe) {
+ data.numElements = 0;
+ markEndOfFile();
+ return readRows;
+ }
+ if (currentRowInStripe == 0) {
+ startNextStripe(startReadPhase);
+ followRowInStripe = currentRowInStripe;
+ }
+ rowsToRead =
+ std::min(static_cast<uint64_t>(data.capacity),
+ rowsInCurrentStripe - currentRowInStripe);
+ if (sargsApplier) {
+ rowsToRead = computeBatchSize(rowsToRead,
+ currentRowInStripe,
+ rowsInCurrentStripe,
+ footer->rowindexstride(),
+ sargsApplier->getNextSkippedRows());
+ }
+ if (rowsToRead == 0) {
+ markEndOfFile();
+ return readRows;
+ }
+ uint16_t sel_rowid_idx[rowsToRead];
+ nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx, arg);
+
+ if (startReadPhase == ReadPhase::LEADERS && data.numElements > 0) {
+ // At least 1 row has been selected and as a result we read the follow
columns into the
+ // row batch
+ nextBatch(data, rowsToRead,
+ prepareFollowReaders(footer->rowindexstride(),
+ currentRowInStripe, followRowInStripe),
sel_rowid_idx, arg);
+ followRowInStripe = currentRowInStripe + rowsToRead;
+ }
+
+ // update row number
+ previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
+ currentRowInStripe += rowsToRead;
+ readRows += rowsToRead;
// check if we need to advance to next selected row group
if (sargsApplier) {
uint64_t nextRowToRead =
advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
footer->rowindexstride(),
sargsApplier->getNextSkippedRows());
- if (currentRowInStripe != nextRowToRead) {
- // it is guaranteed to be at start of a row group
- currentRowInStripe = nextRowToRead;
- if (currentRowInStripe < rowsInCurrentStripe) {
- seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()));
+ if (currentRowInStripe != nextRowToRead) {
+ // it is guaranteed to be at start of a row group
+ currentRowInStripe = nextRowToRead;
+ if (currentRowInStripe < rowsInCurrentStripe) {
+ seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()),
+ startReadPhase);
+ }
}
}
+
+ if (currentRowInStripe >= rowsInCurrentStripe) {
+ currentStripe += 1;
+ currentRowInStripe = 0;
+ }
+ } while (rowsToRead != 0 && data.numElements == 0);
+ return readRows;
+ }
+
+ void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const
ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg) {
+ if (enableEncodedBlock) {
+ reader->nextEncoded(data, batchSize, nullptr, readPhase);
+ }
+ else {
+ reader->next(data, batchSize, nullptr, readPhase);
+ }
+ if (readPhase == ReadPhase::ALL || readPhase == ReadPhase::LEADERS) {
+ // Set the batch size when reading everything or when reading FILTER
columns
+ data.numElements = batchSize;
}
- if (currentRowInStripe >= rowsInCurrentStripe) {
- currentStripe += 1;
- currentRowInStripe = 0;
+ if (readPhase == ReadPhase::LEADERS) {
+ // Apply filter callback to reduce number of # rows selected for
decoding in the next
+ // TreeReaders
+ if (readerContext->getFilterCallback()) {
+ readerContext->getFilterCallback()->filter(data, sel_rowid_idx,
batchSize, arg);
+ }
}
- return rowsToRead != 0;
}
+ /**
+ * Determine the RowGroup based on the supplied row id.
+ * @param rowIdx Row for which the row group is being determined
+ * @return Id of the RowGroup that the row belongs to
+ */
+ int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) {
+ return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride);
+ }
+
+ /**
+ * This method prepares the non-filter column readers for next batch. This
involves the following
+ * 1. Determine position
+ * 2. Perform IO if required
+ * 3. Position the non-filter readers
+ *
+ * This method is repositioning the non-filter columns and as such this
method shall never have to
+ * deal with navigating the stripe forward or skipping row groups, all of
this should have already
+ * taken place based on the filter columns.
+ * @param toFollowRow The rowIdx identifies the required row position
within the stripe for
+ * follow read
+ * @param fromFollowRow Indicates the current position of the follow read,
exclusive
+ * @return the read phase for reading non-filter columns, this shall be
FOLLOWERS_AND_PARENTS in
+ * case of a seek otherwise will be FOLLOWERS
+ */
+ ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride,
long toFollowRow, long fromFollowRow) {
+ // 1. Determine the required row group and skip rows needed from the RG
start
+ int needRG = computeRGIdx(rowIndexStride, toFollowRow);
+ // The current row is not yet read so we -1 to compute the previously
read row group
+ int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1);
+ long skipRows;
+ if (needRG == readRG && toFollowRow >= fromFollowRow) {
+ // In case we are skipping forward within the same row group, we
compute skip rows from the
+ // current position
+ skipRows = toFollowRow - fromFollowRow;
+ } else {
+ // In all other cases including seeking backwards, we compute the skip
rows from the start of
+ // the required row group
+ skipRows = toFollowRow - (needRG * rowIndexStride);
+ }
+
+ // 2. Plan the row group idx for the non-filter columns if this has not
already taken place
+ if (needsFollowColumnsRead) {
+ needsFollowColumnsRead = false;
+ }
+
+ // 3. Position the non-filter readers to the required RG and skipRows
+ ReadPhase result = ReadPhase::FOLLOWERS;
+ if (needRG != readRG || toFollowRow < fromFollowRow) {
+ // When having to change a row group or in case of back navigation,
seek both the filter
+ // parents and non-filter. This will re-position the parents present
vector. This is needed
+ // to determine the number of non-null values to skip on the
non-filter columns.
+ seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS);
+ // skip rows on both the filter parents and non-filter as both have
been positioned in the
+ // previous step
+ reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+ result = ReadPhase::FOLLOWERS_AND_PARENTS;
+ } else if (skipRows > 0) {
+ // in case we are only skipping within the row group, position the
filter parents back to the
+ // position of the follow. This is required to determine the non-null
values to skip on the
+ // non-filter columns.
+ seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS);
+ reader->skip(fromFollowRow - (readRG * rowIndexStride),
ReadPhase::LEADER_PARENTS);
+ // Move both the filter parents and non-filter forward, this will
compute the correct
+ // non-null skips on follow children
+ reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+ result = ReadPhase::FOLLOWERS_AND_PARENTS;
+ }
+ // Identifies the read level that should be performed for the read
+ // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both
non-filter and filter parents
+ // FOLLOWERS indicates read only of the non-filter level without the
parents, which is used during
+ // contiguous read. During a contiguous read no skips are needed and the
non-null information of
+ // the parent is available in the column vector for use during
non-filter read
+ return result;
+ }
+
uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t
currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
const std::vector<uint64_t>&
nextSkippedRows) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index ea6db3a..0c52387 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -34,6 +34,25 @@ namespace orc {
static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
+ class ReaderContext {
+ public:
+ ReaderContext() = default;
+
+ const ORCFilter* getFilterCallback() const {
+ return filter;
+ }
+
+ ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds,
const ORCFilter* _filter) {
+ this->filterColumnIds = std::move(_filterColumnIds);
+ this->filter = _filter;
+ return *this;
+ }
+
+ private:
+ std::unordered_set<int> filterColumnIds;
+ const ORCFilter* filter;
+ };
+
/**
* WriterVersion Implementation
*/
@@ -150,6 +169,7 @@ namespace orc {
uint64_t lastStripe; // the stripe AFTER the last one
uint64_t processingStripe;
uint64_t currentRowInStripe;
+ uint64_t followRowInStripe;
uint64_t rowsInCurrentStripe;
// number of row groups between first stripe and last stripe
uint64_t numRowGroupsInStripeRange;
@@ -160,7 +180,7 @@ namespace orc {
bool enableEncodedBlock;
bool useTightNumericVector;
// internal methods
- void startNextStripe();
+ void startNextStripe(const ReadPhase& readPhase);
inline void markEndOfFile();
// row index of current stripe with column id as the key
@@ -172,6 +192,15 @@ namespace orc {
// desired timezone to return data of timestamp types.
const Timezone& readerTimezone;
+ std::unique_ptr<ReaderContext> readerContext;
+ const ORCFilter* filter;
+ ReadPhase startReadPhase;
+ bool needsFollowColumnsRead;
+
+ std::map<uint64_t, const Type*> idTypeMap;
+ std::map<std::string, Type*> nameTypeMap;
+ std::vector<std::string> columns;
+
// load stripe index if not done so
void loadStripeIndex();
@@ -199,7 +228,7 @@ namespace orc {
* Seek to the start of a row group in the current stripe
* @param rowGroupEntryId the row group id to seek to
*/
- void seekToRowGroup(uint32_t rowGroupEntryId);
+ void seekToRowGroup(uint32_t rowGroupEntryId, const ReadPhase& readPhase);
/**
* Check if the file has bad bloom filters. We will skip using them in the
@@ -208,13 +237,25 @@ namespace orc {
*/
bool hasBadBloomFilters();
- public:
+
+ // build map from type name and id, id to Type
+ void buildTypeNameIdMap(Type* type);
+
+ std::string toDotColumnPath();
+
+ void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase&
readPhase, uint16_t* sel_rowid_idx, void* arg);
+
+ int computeRGIdx(uint64_t rowIndexStride, long rowIdx);
+
+ ReadPhase prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow,
long fromFollowRow);
+
+ public:
/**
* Constructor that lets the user specify additional options.
* @param contents of the file
* @param options options for reading
*/
- RowReaderImpl(std::shared_ptr<FileContents> contents, const
RowReaderOptions& options);
+ RowReaderImpl(std::shared_ptr<FileContents> contents, const
RowReaderOptions& options, const ORCFilter* filter = nullptr);
// Select the columns from the options object
const std::vector<bool> getSelectedColumns() const override;
@@ -223,6 +264,8 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) const
override;
+ uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) override;
+
bool next(ColumnVectorBatch& data) override;
CompressionKind getCompression() const;
@@ -312,9 +355,9 @@ namespace orc {
std::unique_ptr<StripeStatistics> getStripeStatistics(uint64_t
stripeIndex) const override;
- std::unique_ptr<RowReader> createRowReader() const override;
+ std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter =
nullptr) const override;
- std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options) const override;
+ std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options, const ORCFilter* filter = nullptr) const override;
uint64_t getContentLength() const override;
uint64_t getStripeStatisticsLength() const override;
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index 0075d04..47095b3 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -25,6 +25,12 @@
namespace orc {
+ const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories({
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER });
+ const ReadPhase ReadPhase::LEADERS = ReadPhase::fromCategories({
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT });
+ const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({
ReaderCategory::NON_FILTER });
+ const ReadPhase ReadPhase::LEADER_PARENTS = ReadPhase::fromCategories({
ReaderCategory::FILTER_PARENT });
+ const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS =
ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER });
+
Type::~Type() {
// PASS
}
@@ -38,6 +44,7 @@ namespace orc {
precision = 0;
scale = 0;
subtypeCount = 0;
+ readerCategory = ReaderCategory::NON_FILTER;
}
TypeImpl::TypeImpl(TypeKind _kind, uint64_t _maxLength) {
@@ -49,6 +56,7 @@ namespace orc {
precision = 0;
scale = 0;
subtypeCount = 0;
+ readerCategory = ReaderCategory::NON_FILTER;
}
TypeImpl::TypeImpl(TypeKind _kind, uint64_t _precision, uint64_t _scale) {
@@ -60,6 +68,7 @@ namespace orc {
precision = _precision;
scale = _scale;
subtypeCount = 0;
+ readerCategory = ReaderCategory::NON_FILTER;
}
uint64_t TypeImpl::assignIds(uint64_t root) const {
@@ -75,8 +84,8 @@ namespace orc {
void TypeImpl::ensureIdAssigned() const {
if (columnId == -1) {
const TypeImpl* root = this;
- while (root->parent != nullptr) {
- root = root->parent;
+ while (root->getParent() != nullptr) {
+ root = dynamic_cast<const TypeImpl*>(root->getParent());
}
root->assignIds(0);
}
@@ -100,10 +109,18 @@ namespace orc {
return subtypeCount;
}
+ Type* TypeImpl::getParent() const {
+ return parent;
+ }
+
const Type* TypeImpl::getSubtype(uint64_t i) const {
return subTypes[i].get();
}
+ Type* TypeImpl::getSubtype(uint64_t i) {
+ return subTypes[i].get();
+ }
+
const std::string& TypeImpl::getFieldName(uint64_t i) const {
return fieldNames[i];
}
@@ -155,6 +172,14 @@ namespace orc {
return it->second;
}
+ ReaderCategory TypeImpl::getReaderCategory() const {
+ return readerCategory;
+ }
+
+ void TypeImpl::setReaderCategory(ReaderCategory _readerCategory) {
+ readerCategory = _readerCategory;
+ }
+
void TypeImpl::setIds(uint64_t _columnId, uint64_t _maxColumnId) {
columnId = static_cast<int64_t>(_columnId);
maximumColumnId = static_cast<int64_t>(_maxColumnId);
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index 6d07437..6bc4a84 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -41,6 +41,7 @@ namespace orc {
uint64_t precision;
uint64_t scale;
std::map<std::string, std::string> attributes;
+ ReaderCategory readerCategory;
public:
/**
@@ -66,8 +67,12 @@ namespace orc {
uint64_t getSubtypeCount() const override;
+ Type* getParent() const override;
+
const Type* getSubtype(uint64_t i) const override;
+ Type* getSubtype(uint64_t i) override;
+
const std::string& getFieldName(uint64_t i) const override;
uint64_t getMaximumLength() const override;
@@ -86,6 +91,10 @@ namespace orc {
std::string getAttributeValue(const std::string& key) const override;
+ ReaderCategory getReaderCategory() const override;
+
+ void setReaderCategory(ReaderCategory _readerCategory) override;
+
std::string toString() const override;
const Type* getTypeByColumnId(uint64_t colIdx) const override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]