This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push:
new 0e53506 [Feature] Implements selection vector for ORC lazy
materialization. (#62)
0e53506 is described below
commit 0e53506146c965a5a71f0582691ab2ea148dae7c
Author: Qi Chen <[email protected]>
AuthorDate: Sat May 6 15:46:41 2023 +0800
[Feature] Implements selection vector for ORC lazy materialization. (#62)
1. Implements selection vector for ORC lazy materialization.
From the test, currently implements float/double, date/timestamp, decimal,
string dict types for better performance,
and other types have performance penalty.
2. Decrease `loadStripIndex()` call count.
3. Adjust code format.
---
c++/include/orc/Reader.hh | 35 +--
c++/include/orc/Type.hh | 48 ++--
c++/src/ColumnReader.cc | 672 +++++++++++++++++++++++++++++++++++++++-------
c++/src/ColumnReader.hh | 23 +-
c++/src/Reader.cc | 222 +++++++--------
c++/src/Reader.hh | 45 ++--
c++/src/TypeImpl.cc | 14 +-
7 files changed, 784 insertions(+), 275 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 2666b4d..429c1ed 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -186,26 +186,25 @@ namespace orc {
RowReaderOptions& includeTypes(const std::list<uint64_t>& types);
/**
- * For files that have structs as the top-level object, filter the fields.
- * by index. The first field is 0, the second 1, and so on. By default,
- * all columns are read. This option clears any previous setting of
- * the selected columns.
- * @param filterColIndexes a list of fields to read
- * @return this
- */
+ * For files that have structs as the top-level object, filter the fields.
+ * by index. The first field is 0, the second 1, and so on. By default,
+ * all columns are read. This option clears any previous setting of
+ * the selected columns.
+ * @param filterColIndexes a list of fields to read
+ * @return this
+ */
RowReaderOptions& filter(const std::list<uint64_t>& filterColIndexes);
/**
- * For files that have structs as the top-level object, filter the fields
- * by name. By default, all columns are read. This option clears
- * any previous setting of the selected columns.
- * @param filterColNames a list of fields to read
- * @return this
- */
+ * For files that have structs as the top-level object, filter the fields
+ * by name. By default, all columns are read. This option clears
+ * any previous setting of the selected columns.
+ * @param filterColNames a list of fields to read
+ * @return this
+ */
RowReaderOptions& filter(const std::list<std::string>& filterColNames);
-
- /**
+ /**
* A map type of <typeId, ReadIntent>.
*/
typedef std::map<uint64_t, ReadIntent> IdReadIntentMap;
@@ -367,7 +366,8 @@ namespace orc {
class ORCFilter {
public:
virtual ~ORCFilter() = default;
- virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
void* arg = nullptr) const = 0;
+ virtual void filter(ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
+ void* arg = nullptr) const = 0;
};
class RowReader;
@@ -562,7 +562,8 @@ namespace orc {
* @param options RowReader Options
* @return a RowReader to read the rows
*/
- virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options, const ORCFilter* filter = nullptr) const = 0;
+ virtual std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options,
+ const ORCFilter* filter
= nullptr) const = 0;
/**
* Get the name of the input stream.
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index 73b813e..1f0901e 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -27,37 +27,37 @@
namespace orc {
enum class ReaderCategory {
- FILTER_CHILD, // Primitive type that is a filter column
- FILTER_PARENT, // Compound type with filter children
- NON_FILTER // Non-filter column
+ FILTER_CHILD, // Primitive type that is a filter column
+ FILTER_PARENT, // Compound type with filter children
+ NON_FILTER // Non-filter column
};
class ReadPhase {
public:
- static const int NUM_CATEGORIES = 3; // Number of values in
ReaderCategory
- std::bitset<NUM_CATEGORIES> categories;
-
- static const ReadPhase ALL;
- static const ReadPhase LEADERS;
- static const ReadPhase FOLLOWERS;
- static const ReadPhase LEADER_PARENTS;
- static const ReadPhase FOLLOWERS_AND_PARENTS;
-
- static ReadPhase fromCategories(const
std::unordered_set<ReaderCategory>& cats) {
- ReadPhase phase;
- for (ReaderCategory cat : cats) {
- phase.categories.set(static_cast<size_t>(cat));
- }
- return phase;
+ static const int NUM_CATEGORIES = 3; // Number of values in ReaderCategory
+ std::bitset<NUM_CATEGORIES> categories;
+
+ static const ReadPhase ALL;
+ static const ReadPhase LEADERS;
+ static const ReadPhase FOLLOWERS;
+ static const ReadPhase LEADER_PARENTS;
+ static const ReadPhase FOLLOWERS_AND_PARENTS;
+
+ static ReadPhase fromCategories(const std::unordered_set<ReaderCategory>&
cats) {
+ ReadPhase phase;
+ for (ReaderCategory cat : cats) {
+ phase.categories.set(static_cast<size_t>(cat));
}
+ return phase;
+ }
- bool contains(ReaderCategory cat) const {
- return categories.test(static_cast<size_t>(cat));
- }
+ bool contains(ReaderCategory cat) const {
+ return categories.test(static_cast<size_t>(cat));
+ }
- bool operator==(const ReadPhase& other) const {
- return categories == other.categories;
- }
+ bool operator==(const ReadPhase& other) const {
+ return categories == other.categories;
+ }
};
enum TypeKind {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index cabcdbe..8c6e166 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -85,7 +85,8 @@ namespace orc {
return numValues;
}
- void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* incomingMask, const ReadPhase& readPhase) {
+ void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* incomingMask,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size) {
if (numValues > rowBatch.capacity) {
rowBatch.resize(numValues);
}
@@ -101,19 +102,17 @@ namespace orc {
return;
}
}
- rowBatch.hasNulls = false;
} else if (incomingMask) {
// If we don't have a notNull stream, copy the incomingMask
rowBatch.hasNulls = true;
memcpy(rowBatch.notNull.data(), incomingMask, numValues);
return;
- } else {
- rowBatch.hasNulls = false;
- memset(rowBatch.notNull.data(), 1, numValues);
}
+ rowBatch.hasNulls = false;
}
- void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase) {
+ void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
+ const ReadPhase& readPhase) {
if (notNullDecoder.get()) {
notNullDecoder->seek(positions.at(columnId));
}
@@ -148,9 +147,11 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
};
template <typename BatchType>
@@ -176,7 +177,8 @@ namespace orc {
template <typename BatchType>
void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
- char* notNull, const ReadPhase&
readPhase) {
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// Since the byte rle places the output in a char* and BatchType here may
be
// LongVectorBatch with long*. We cheat here in that case and use the long*
@@ -215,7 +217,8 @@ namespace orc {
return numValues;
}
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override {
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// Since the byte rle places the output in a char* instead of long*,
// we cheat here and use the long* and then expand it in a second pass.
@@ -225,7 +228,8 @@ namespace orc {
expandBytesToIntegers(ptr, numValues);
}
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override {
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override {
ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
}
@@ -255,13 +259,15 @@ namespace orc {
return numValues;
}
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override {
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
}
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override {
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override {
ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
}
@@ -276,15 +282,24 @@ namespace orc {
const int64_t epochOffset;
const bool sameTimezone;
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase);
+ void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size);
+ uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase);
+
public:
TimestampColumnReader(const Type& type, StripeStreams& stripe, bool
isInstantType);
~TimestampColumnReader() override;
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
};
TimestampColumnReader::TimestampColumnReader(const Type& type,
StripeStreams& stripe,
@@ -310,12 +325,28 @@ namespace orc {
uint64_t TimestampColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
numValues = ColumnReader::skip(numValues, readPhase);
+ numValues = skipInternal(numValues, readPhase);
+ return numValues;
+ }
+
+ uint64_t TimestampColumnReader::skipInternal(uint64_t numValues, const
ReadPhase& readPhase) {
secondsRle->skip(numValues);
nanoRle->skip(numValues);
return numValues;
}
- void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
+ void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ if (sel_rowid_idx == nullptr) {
+ nextInternal(rowBatch, numValues, notNull, readPhase);
+ } else {
+ nextInternalWithFilter(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
+ }
+
+ void TimestampColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
+ char* notNull, const ReadPhase&
readPhase) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
TimestampVectorBatch& timestampBatch =
dynamic_cast<TimestampVectorBatch&>(rowBatch);
@@ -356,6 +387,51 @@ namespace orc {
}
}
+ void TimestampColumnReader::nextInternalWithFilter(ColumnVectorBatch&
rowBatch,
+ uint64_t numValues, char*
notNull,
+ const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx,
size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ TimestampVectorBatch& timestampBatch =
dynamic_cast<TimestampVectorBatch&>(rowBatch);
+ int64_t* secsBuffer = timestampBatch.data.data();
+ secondsRle->next(secsBuffer, numValues, notNull);
+ int64_t* nanoBuffer = timestampBatch.nanoseconds.data();
+ nanoRle->next(nanoBuffer, numValues, notNull);
+
+ // Construct the values
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (notNull == nullptr || notNull[idx]) {
+ uint64_t zeros = nanoBuffer[idx] & 0x7;
+ nanoBuffer[idx] >>= 3;
+ if (zeros != 0) {
+ for (uint64_t j = 0; j <= zeros; ++j) {
+ nanoBuffer[idx] *= 10;
+ }
+ }
+ int64_t writerTime = secsBuffer[idx] + epochOffset;
+ if (!sameTimezone) {
+ // adjust timestamp value to same wall clock time if writer and
reader
+ // time zones have different rules, which is required for Apache Orc.
+ const auto& wv = writerTimezone.getVariant(writerTime);
+ const auto& rv = readerTimezone.getVariant(writerTime);
+ if (!wv.hasSameTzRule(rv)) {
+ // If the timezone adjustment moves the millis across a DST
boundary,
+ // we need to reevaluate the offsets.
+ int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset;
+ const auto& adjustedReader =
readerTimezone.getVariant(adjustedTime);
+ writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset;
+ }
+ }
+ secsBuffer[idx] = writerTime;
+ if (secsBuffer[idx] < 0 && nanoBuffer[i] > 999999) {
+ secsBuffer[idx] -= 1;
+ }
+ }
+ }
+ }
+
void TimestampColumnReader::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
ColumnReader::seekToRowGroup(positions, readPhase);
@@ -371,9 +447,11 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
private:
std::unique_ptr<SeekableInputStream> inputStream;
@@ -381,6 +459,13 @@ namespace orc {
const char* bufferPointer;
const char* bufferEnd;
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase);
+
+ void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size);
+
unsigned char readByte() {
if (bufferPointer == bufferEnd) {
int length;
@@ -442,6 +527,8 @@ namespace orc {
}
return static_cast<FloatType>(*result);
}
+
+ uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase);
};
template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
@@ -456,7 +543,12 @@ namespace orc {
uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::skip(
uint64_t numValues, const ReadPhase& readPhase) {
numValues = ColumnReader::skip(numValues, readPhase);
+ return skipInternal(numValues, readPhase);
+ }
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
+ uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::skipInternal(
+ uint64_t numValues, const ReadPhase& readPhase) {
if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue *
numValues) {
bufferPointer += bytesPerValue * numValues;
} else {
@@ -477,6 +569,17 @@ namespace orc {
template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
void DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::next(
+ ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const
ReadPhase& readPhase,
+ uint16_t* sel_rowid_idx, size_t sel_size) {
+ if (sel_rowid_idx == nullptr) {
+ nextInternal(rowBatch, numValues, notNull, readPhase);
+ } else {
+ nextInternalWithFilter(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
+ }
+
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
+ void DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::nextInternal(
ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const
ReadPhase& readPhase) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// update the notNull from the parent class
@@ -521,6 +624,68 @@ namespace orc {
}
}
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
+ void DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::nextInternalWithFilter(
+ ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull, const
ReadPhase& readPhase,
+ uint16_t* sel_rowid_idx, size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
+ // update the notNull from the parent class
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ ValueType* outArray =
+
reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data());
+ uint16_t previousIdx = 0;
+
+ if constexpr (columnKind == FLOAT) {
+ if (notNull) {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx),
readPhase);
+ }
+ if (notNull[idx]) {
+ outArray[idx] = readFloat<ValueType>();
+ }
+ previousIdx = idx + 1;
+ }
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues),
readPhase);
+ } else {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(idx - previousIdx, readPhase);
+ }
+ outArray[idx] = readFloat<ValueType>();
+ previousIdx = idx + 1;
+ }
+ skipInternal(numValues - previousIdx, readPhase);
+ }
+ } else {
+ if (notNull) {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx),
readPhase);
+ }
+ if (notNull[idx]) {
+ outArray[idx] = readDouble<ValueType>();
+ }
+ previousIdx = idx + 1;
+ }
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues),
readPhase);
+ } else {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(idx - previousIdx, readPhase);
+ }
+ outArray[idx] = readDouble<ValueType>();
+ previousIdx = idx + 1;
+ }
+ skipInternal(numValues - previousIdx, readPhase);
+ }
+ }
+ }
+
template <TypeKind columnKind, bool isLittleEndian, typename ValueType,
typename BatchType>
void DoubleColumnReader<columnKind, isLittleEndian, ValueType,
BatchType>::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
@@ -552,17 +717,26 @@ namespace orc {
std::shared_ptr<StringDictionary> dictionary;
std::unique_ptr<RleDecoder> rle;
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase);
+ void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size);
+
public:
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
~StringDictionaryColumnReader() override;
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
};
StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
@@ -613,7 +787,17 @@ namespace orc {
}
void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
- char* notNull, const ReadPhase&
readPhase) {
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
+ if (sel_rowid_idx == nullptr) {
+ nextInternal(rowBatch, numValues, notNull, readPhase);
+ } else {
+ nextInternalWithFilter(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
+ }
+
+ void StringDictionaryColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
+ char* notNull, const
ReadPhase& readPhase) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
@@ -647,8 +831,58 @@ namespace orc {
}
}
+ void StringDictionaryColumnReader::nextInternalWithFilter(ColumnVectorBatch&
rowBatch,
+ uint64_t
numValues, char* notNull,
+ const ReadPhase&
readPhase,
+ uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ // update the notNull from the parent class
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
+ char* blob = dictionary->dictionaryBlob.data();
+ int64_t* dictionaryOffsets = dictionary->dictionaryOffset.data();
+ char** outputStarts = byteBatch.data.data();
+ int64_t* outputLengths = byteBatch.length.data();
+ std::unique_ptr<int64_t[]> tmpOutputLengths(new
int64_t[byteBatch.length.size()]);
+ rle->next(tmpOutputLengths.get(), numValues, notNull);
+ uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1;
+ if (notNull) {
+ for (size_t i = 0; i < numValues; i++) {
+ outputStarts[i] = nullptr;
+ outputLengths[i] = 0;
+ }
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (notNull[idx]) {
+ int64_t entry = tmpOutputLengths[idx];
+ if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
+ throw ParseError("Entry index out of range in
StringDictionaryColumn");
+ }
+ outputStarts[idx] = blob + dictionaryOffsets[entry];
+ outputLengths[idx] = dictionaryOffsets[entry + 1] -
dictionaryOffsets[entry];
+ }
+ }
+ } else {
+ for (size_t i = 0; i < numValues; i++) {
+ outputStarts[i] = nullptr;
+ outputLengths[i] = 0;
+ }
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ int64_t entry = tmpOutputLengths[idx];
+ if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
+ throw ParseError("Entry index out of range in
StringDictionaryColumn");
+ }
+ outputStarts[idx] = blob + dictionaryOffsets[entry];
+ outputLengths[idx] = dictionaryOffsets[entry + 1] -
dictionaryOffsets[entry];
+ }
+ }
+ }
+
void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
- char* notNull, const
ReadPhase& readPhase) {
+ char* notNull, const
ReadPhase& readPhase,
+ uint16_t* sel_rowid_idx,
size_t sel_size) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
rowBatch.isEncoded = true;
@@ -688,9 +922,11 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
};
StringDirectColumnReader::StringDirectColumnReader(const Type& type,
StripeStreams& stripe)
@@ -760,7 +996,8 @@ namespace orc {
}
void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
@@ -836,15 +1073,19 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size);
};
StructColumnReader::StructColumnReader(const Type& type, StripeStreams&
stripe,
@@ -879,35 +1120,40 @@ namespace orc {
return numValues;
}
- void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
- nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+ void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
}
void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
- nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
}
template <bool encoded>
void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
- ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
uint64_t i = 0;
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
for (auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
if (shouldProcessChild((*iter)->getType().getReaderCategory(),
readPhase)) {
if (encoded) {
- (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch
&>(rowBatch).fields[i]), numValues,
- notNull, readPhase);
+
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues,
+ notNull, readPhase, sel_rowid_idx, sel_size);
} else {
- (*iter)->next(*(dynamic_cast<StructVectorBatch
&>(rowBatch).fields[i]), numValues, notNull, readPhase);
+
(*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues, notNull,
+ readPhase, sel_rowid_idx, sel_size);
}
}
}
}
- void StructColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ void StructColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
+ const ReadPhase& readPhase) {
ColumnReader::seekToRowGroup(positions, readPhase);
for (auto& ptr : children) {
@@ -928,15 +1174,19 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size);
};
ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
@@ -982,19 +1232,23 @@ namespace orc {
return numValues;
}
- void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull, const ReadPhase& readPhase) {
- nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+ void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
}
- void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
- nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+ void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
}
template <bool encoded>
void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
- ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
int64_t* offsets = listBatch.offsets.data();
notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
@@ -1021,14 +1275,17 @@ namespace orc {
ColumnReader* childReader = child.get();
if (childReader) {
if (encoded) {
- childReader->nextEncoded(*(listBatch.elements.get()), totalChildren,
nullptr, readPhase);
+ childReader->nextEncoded(*(listBatch.elements.get()), totalChildren,
nullptr, readPhase,
+ sel_rowid_idx, sel_size);
} else {
- childReader->next(*(listBatch.elements.get()), totalChildren, nullptr,
readPhase);
+ childReader->next(*(listBatch.elements.get()), totalChildren, nullptr,
readPhase,
+ sel_rowid_idx, sel_size);
}
}
}
- void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase) {
+ void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
+ const ReadPhase& readPhase) {
ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
if (child.get()) {
@@ -1048,15 +1305,19 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size);
};
MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
@@ -1112,19 +1373,22 @@ namespace orc {
return numValues;
}
- void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull, const ReadPhase& readPhase) {
- nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+ void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx, size_t sel_size) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
}
- void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase& readPhase)
{
- nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+ void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
}
template <bool encoded>
- void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
- ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
int64_t* offsets = mapBatch.offsets.data();
notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
@@ -1159,14 +1423,16 @@ namespace orc {
ColumnReader* rawElementReader = elementReader.get();
if (rawElementReader) {
if (encoded) {
- rawElementReader->nextEncoded(*(mapBatch.elements.get()),
totalChildren, nullptr, readPhase);
+ rawElementReader->nextEncoded(*(mapBatch.elements.get()),
totalChildren, nullptr,
+ readPhase);
} else {
rawElementReader->next(*(mapBatch.elements.get()), totalChildren,
nullptr, readPhase);
}
}
}
- void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase) {
+ void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
+ const ReadPhase& readPhase) {
ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
if (keyReader.get()) {
@@ -1189,15 +1455,19 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase) override;
+ void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
private:
template <bool encoded>
- void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase);
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx,
size_t sel_size);
};
UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
@@ -1237,26 +1507,30 @@ namespace orc {
lengthsRead += chunk;
}
for (size_t i = 0; i < numChildren; ++i) {
- if (counts[i] != 0 && childrenReader[i] != nullptr
- &&
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
+ if (counts[i] != 0 && childrenReader[i] != nullptr &&
+ shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
childrenReader[i]->skip(static_cast<uint64_t>(counts[i]), readPhase);
}
}
return numValues;
}
- void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
- nextInternal<false>(rowBatch, numValues, notNull, readPhase);
+ void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ nextInternal<false>(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
}
void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
- nextInternal<true>(rowBatch, numValues, notNull, readPhase);
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
+ nextInternal<true>(rowBatch, numValues, notNull, readPhase, sel_rowid_idx,
sel_size);
}
template <bool encoded>
void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
uint64_t* offsets = unionBatch.offsets.data();
@@ -1279,7 +1553,8 @@ namespace orc {
}
// read the right number of each child column
for (size_t i = 0; i < numChildren; ++i) {
- if (childrenReader[i] != nullptr &&
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
+ if (childrenReader[i] != nullptr &&
+ shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
if (encoded) {
childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
static_cast<uint64_t>(counts[i]),
nullptr, readPhase);
@@ -1291,12 +1566,13 @@ namespace orc {
}
}
- void UnionColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions, const
ReadPhase& readPhase) {
+ void UnionColumnReader::seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
+ const ReadPhase& readPhase) {
ColumnReader::seekToRowGroup(positions, readPhase);
rle->seek(positions.at(columnId));
for (size_t i = 0; i < numChildren; ++i) {
- if (childrenReader[i] != nullptr &&
shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
+ if (childrenReader[i] != nullptr &&
+ shouldProcessChild(childrenReader[i]->getType().getReaderCategory(),
readPhase)) {
childrenReader[i]->seekToRowGroup(positions, readPhase);
}
}
@@ -1367,15 +1643,24 @@ namespace orc {
}
}
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase);
+ void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size);
+ uint64_t skipInternal(uint64_t numValues, const ReadPhase& readPhase);
+
public:
Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal64ColumnReader() override;
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions, const ReadPhase& readPhase) override;
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>&
positions,
+ const ReadPhase& readPhase) override;
};
const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
@@ -1420,6 +1705,12 @@ namespace orc {
uint64_t Decimal64ColumnReader::skip(uint64_t numValues, const ReadPhase&
readPhase) {
numValues = ColumnReader::skip(numValues, readPhase);
+ numValues = skipInternal(numValues, readPhase);
+ scaleDecoder->skip(numValues);
+ return numValues;
+ }
+
+ uint64_t Decimal64ColumnReader::skipInternal(uint64_t numValues, const
ReadPhase& readPhase) {
uint64_t skipped = 0;
while (skipped < numValues) {
readBuffer();
@@ -1427,11 +1718,21 @@ namespace orc {
skipped += 1;
}
}
- scaleDecoder->skip(numValues);
return numValues;
}
- void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull, const ReadPhase& readPhase) {
+ void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ if (sel_rowid_idx == nullptr) {
+ nextInternal(rowBatch, numValues, notNull, readPhase);
+ } else {
+ nextInternalWithFilter(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
+ }
+
+ void Decimal64ColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
+ char* notNull, const ReadPhase&
readPhase) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch& batch =
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
@@ -1454,6 +1755,46 @@ namespace orc {
}
}
+ void Decimal64ColumnReader::nextInternalWithFilter(ColumnVectorBatch&
rowBatch,
+ uint64_t numValues, char*
notNull,
+ const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx,
size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ Decimal64VectorBatch& batch =
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
+ int64_t* values = batch.values.data();
+ int64_t* scaleBuffer = batch.readScales.data();
+ scaleDecoder->next(scaleBuffer, numValues, notNull);
+ batch.precision = precision;
+ batch.scale = scale;
+
+ uint16_t previousIdx = 0;
+ if (notNull) {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx),
readPhase);
+ }
+ if (notNull[idx]) {
+ readInt64(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+ ;
+ }
+ previousIdx = idx + 1;
+ }
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues),
readPhase);
+ } else {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(idx - previousIdx, readPhase);
+ }
+ readInt64(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+ previousIdx = idx + 1;
+ }
+ skipInternal(numValues - previousIdx, readPhase);
+ }
+ }
+
void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
if (scale > currentScale) {
while (scale > currentScale) {
@@ -1488,7 +1829,8 @@ namespace orc {
Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
~Decimal128ColumnReader() override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
private:
void readInt128(Int128& value, int32_t currentScale) {
@@ -1509,6 +1851,13 @@ namespace orc {
unZigZagInt128(value);
scaleInt128(value, static_cast<uint32_t>(scale),
static_cast<uint32_t>(currentScale));
}
+
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase);
+
+ void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size);
};
Decimal128ColumnReader::Decimal128ColumnReader(const Type& type,
StripeStreams& stripe)
@@ -1520,8 +1869,18 @@ namespace orc {
// PASS
}
- void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase& readPhase)
{
+ void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
+ if (sel_rowid_idx == nullptr) {
+ nextInternal(rowBatch, numValues, notNull, readPhase);
+ } else {
+ nextInternalWithFilter(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
+ }
+
+ void Decimal128ColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
+ char* notNull, const ReadPhase&
readPhase) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch& batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
@@ -1544,6 +1903,46 @@ namespace orc {
}
}
+ void Decimal128ColumnReader::nextInternalWithFilter(ColumnVectorBatch&
rowBatch,
+ uint64_t numValues,
char* notNull,
+ const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx,
size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ Decimal128VectorBatch& batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
+ Int128* values = batch.values.data();
+ // read the next group of scales
+ int64_t* scaleBuffer = batch.readScales.data();
+ scaleDecoder->next(scaleBuffer, numValues, notNull);
+ batch.precision = precision;
+ batch.scale = scale;
+
+ uint16_t previousIdx = 0;
+ if (notNull) {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx),
readPhase);
+ }
+ if (notNull[idx]) {
+ readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+ }
+ previousIdx = idx + 1;
+ }
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues),
readPhase);
+ } else {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(idx - previousIdx, readPhase);
+ }
+ readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]));
+ previousIdx = idx + 1;
+ }
+ skipInternal(numValues - previousIdx, readPhase);
+ }
+ }
+
class Decimal64ColumnReaderV2 : public ColumnReader {
protected:
std::unique_ptr<RleDecoder> valueDecoder;
@@ -1556,7 +1955,8 @@ namespace orc {
uint64_t skip(uint64_t numValues, const ReadPhase& readPhase) override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
};
Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type,
StripeStreams& stripe)
@@ -1583,8 +1983,9 @@ namespace orc {
return numValues;
}
- void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
+ void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal64VectorBatch& batch =
dynamic_cast<Decimal64VectorBatch&>(rowBatch);
@@ -1635,11 +2036,19 @@ namespace orc {
return value >= MIN_VALUE && value <= MAX_VALUE;
}
+ void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase);
+
+ void nextInternalWithFilter(ColumnVectorBatch& rowBatch, uint64_t
numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t*
sel_rowid_idx,
+ size_t sel_size);
+
public:
DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
~DecimalHive11ColumnReader() override;
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
const ReadPhase& readPhase) override;
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull,
+ const ReadPhase& readPhase, uint16_t* sel_rowid_idx, size_t
sel_size) override;
};
DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type,
StripeStreams& stripe)
@@ -1654,7 +2063,17 @@ namespace orc {
}
void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t
numValues,
- char* notNull, const ReadPhase&
readPhase) {
+ char* notNull, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, size_t
sel_size) {
+ if (sel_rowid_idx == nullptr) {
+ nextInternal(rowBatch, numValues, notNull, readPhase);
+ } else {
+ nextInternalWithFilter(rowBatch, numValues, notNull, readPhase,
sel_rowid_idx, sel_size);
+ }
+ }
+
+ void DecimalHive11ColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
+ char* notNull, const ReadPhase&
readPhase) {
ColumnReader::next(rowBatch, numValues, notNull, readPhase);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
Decimal128VectorBatch& batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
@@ -1698,6 +2117,67 @@ namespace orc {
}
}
+ void DecimalHive11ColumnReader::nextInternalWithFilter(ColumnVectorBatch&
rowBatch,
+ uint64_t numValues,
char* notNull,
+ const ReadPhase&
readPhase,
+ uint16_t*
sel_rowid_idx, size_t sel_size) {
+ ColumnReader::next(rowBatch, numValues, notNull, readPhase);
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ Decimal128VectorBatch& batch =
dynamic_cast<Decimal128VectorBatch&>(rowBatch);
+ Int128* values = batch.values.data();
+ // read the next group of scales
+ int64_t* scaleBuffer = batch.readScales.data();
+
+ scaleDecoder->next(scaleBuffer, numValues, notNull);
+
+ batch.precision = precision;
+ batch.scale = scale;
+
+ uint16_t previousIdx = 0;
+ if (notNull) {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, idx),
readPhase);
+ }
+ if (notNull[idx]) {
+ if (!readInt128(values[idx],
static_cast<int32_t>(scaleBuffer[idx]))) {
+ if (throwOnOverflow) {
+ throw ParseError("Hive 0.11 decimal was more than 38 digits.");
+ } else {
+ *errorStream << "Warning: "
+ << "Hive 0.11 decimal with more than 38 digits "
+ << "replaced by NULL.\n";
+ notNull[idx] = false;
+ }
+ }
+ }
+ previousIdx = idx + 1;
+ }
+ skipInternal(countNonNullRowsInRange(notNull, previousIdx, numValues),
readPhase);
+ } else {
+ for (size_t i = 0; i < sel_size; i++) {
+ uint16_t idx = sel_rowid_idx[i];
+ if (idx - previousIdx > 0) {
+ skipInternal(idx - previousIdx, readPhase);
+ }
+ if (!readInt128(values[idx], static_cast<int32_t>(scaleBuffer[idx]))) {
+ if (throwOnOverflow) {
+ throw ParseError("Hive 0.11 decimal was more than 38 digits.");
+ } else {
+ *errorStream << "Warning: "
+ << "Hive 0.11 decimal with more than 38 digits "
+ << "replaced by NULL.\n";
+ batch.hasNulls = true;
+ batch.notNull[idx] = false;
+ }
+ }
+ previousIdx = idx + 1;
+ }
+ skipInternal(numValues - previousIdx, readPhase);
+ }
+ }
+
static bool isLittleEndian() {
static union {
uint32_t i;
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 25363e2..fd89489 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -118,6 +118,16 @@ namespace orc {
return readPhase.contains(readerCategory) || readerCategory ==
ReaderCategory::FILTER_PARENT;
}
+ static int countNonNullRowsInRange(char* notNull, uint16_t start, uint16_t
end) {
+ int result = 0;
+ while (start < end) {
+ if (notNull[start++]) {
+ result++;
+ }
+ }
+ return result;
+ }
+
public:
ColumnReader(const Type& type, StripeStreams& stipe);
@@ -142,7 +152,9 @@ namespace orc {
* a mask (with at least numValues bytes) for which values to
* set.
*/
- virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull, const ReadPhase& readPhase = ReadPhase::ALL);
+ virtual void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char*
notNull,
+ const ReadPhase& readPhase = ReadPhase::ALL,
+ uint16_t* sel_rowid_idx = nullptr, size_t sel_size = 0);
/**
* Read the next group of values without decoding
@@ -152,16 +164,19 @@ namespace orc {
* a mask (with at least numValues bytes) for which values to
* set.
*/
- virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull, const ReadPhase& readPhase = ReadPhase::ALL) {
+ virtual void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
char* notNull,
+ const ReadPhase& readPhase = ReadPhase::ALL,
+ uint16_t* sel_rowid_idx = nullptr, size_t
sel_size = 0) {
rowBatch.isEncoded = false;
- next(rowBatch, numValues, notNull, readPhase);
+ next(rowBatch, numValues, notNull, readPhase, sel_rowid_idx);
}
/**
* Seek to beginning of a row group in the current stripe
* @param positions a list of PositionProviders storing the positions
*/
- virtual void seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions, const ReadPhase& readPhase = ReadPhase::ALL);
+ virtual void seekToRowGroup(std::unordered_map<uint64_t,
PositionProvider>& positions,
+ const ReadPhase& readPhase = ReadPhase::ALL);
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 52052fb..c132e46 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -247,8 +247,7 @@ namespace orc {
}
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
- const RowReaderOptions& opts,
- const ORCFilter* _filter)
+ const RowReaderOptions& opts, const ORCFilter*
_filter)
: localTimezone(getLocalTimezone()),
contents(_contents),
throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
@@ -319,7 +318,7 @@ namespace orc {
std::unordered_set<int> filterColIds;
if (!filterCols.empty()) {
- for (auto& colName: filterCols) {
+ for (auto& colName : filterCols) {
auto iter = nameTypeMap.find(colName);
if (iter != nameTypeMap.end()) {
Type* type = iter->second;
@@ -405,8 +404,7 @@ namespace orc {
return columnPath.substr(0, columnPath.length() - 1);
}
-
- CompressionKind RowReaderImpl::getCompression() const {
+ CompressionKind RowReaderImpl::getCompression() const {
return contents->compression;
}
@@ -900,7 +898,8 @@ namespace orc {
return createRowReader(defaultOpts, filter);
}
- std::unique_ptr<RowReader> ReaderImpl::createRowReader(const
RowReaderOptions& opts, const ORCFilter* filter) const {
+ std::unique_ptr<RowReader> ReaderImpl::createRowReader(const
RowReaderOptions& opts,
+ const ORCFilter*
filter) const {
if (opts.getSearchArgument() && !isMetadataLoaded) {
// load stripe statistics for PPD
readMetadata();
@@ -1121,9 +1120,6 @@ namespace orc {
rowsInCurrentStripe = currentStripeInfo.numberofrows();
processingStripe = currentStripe;
- // read row group statistics and bloom filters of current stripe
- loadStripeIndex();
-
if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
@@ -1137,6 +1133,8 @@ namespace orc {
}
if (isStripeNeeded) {
+ // read row group statistics and bloom filters of current stripe
+ loadStripeIndex();
// select row groups to read in the current stripe
sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes,
bloomFilterIndex);
if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
@@ -1170,7 +1168,8 @@ namespace orc {
sargsApplier->getNextSkippedRows());
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
if (currentRowInStripe > 0) {
- seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()), readPhase);
+ seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()),
+ readPhase);
}
}
} else {
@@ -1200,29 +1199,29 @@ namespace orc {
followRowInStripe = currentRowInStripe;
}
rowsToRead =
- std::min(static_cast<uint64_t>(data.capacity),
- rowsInCurrentStripe - currentRowInStripe);
+ std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe -
currentRowInStripe);
if (sargsApplier) {
- rowsToRead = computeBatchSize(rowsToRead,
- currentRowInStripe,
- rowsInCurrentStripe,
- footer->rowindexstride(),
- sargsApplier->getNextSkippedRows());
+ rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe,
rowsInCurrentStripe,
+ footer->rowindexstride(),
sargsApplier->getNextSkippedRows());
}
if (rowsToRead == 0) {
markEndOfFile();
return readRows;
}
- uint16_t sel_rowid_idx[rowsToRead];
- nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx, arg);
-
- if (startReadPhase == ReadPhase::LEADERS && data.numElements > 0) {
- // At least 1 row has been selected and as a result we read the follow
columns into the
- // row batch
- nextBatch(data, rowsToRead,
- prepareFollowReaders(footer->rowindexstride(),
- currentRowInStripe, followRowInStripe),
sel_rowid_idx, arg);
- followRowInStripe = currentRowInStripe + rowsToRead;
+ if (startReadPhase == ReadPhase::LEADERS) {
+ auto sel_rowid_idx = std::make_unique<uint16_t[]>(rowsToRead);
+ nextBatch(data, rowsToRead, startReadPhase, sel_rowid_idx.get(), arg);
+ if (data.numElements > 0) {
+ // At least 1 row has been selected and as a result we read the
follow columns into the
+ // row batch
+ nextBatch(
+ data, rowsToRead,
+ prepareFollowReaders(footer->rowindexstride(),
currentRowInStripe, followRowInStripe),
+ sel_rowid_idx.get(), arg);
+ followRowInStripe = currentRowInStripe + rowsToRead;
+ }
+ } else {
+ nextBatch(data, rowsToRead, startReadPhase, nullptr, arg);
}
// update row number
@@ -1230,11 +1229,11 @@ namespace orc {
currentRowInStripe += rowsToRead;
readRows += rowsToRead;
- // check if we need to advance to next selected row group
- if (sargsApplier) {
- uint64_t nextRowToRead =
- advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
footer->rowindexstride(),
- sargsApplier->getNextSkippedRows());
+ // check if we need to advance to next selected row group
+ if (sargsApplier) {
+ uint64_t nextRowToRead =
+ advanceToNextRowGroup(currentRowInStripe, rowsInCurrentStripe,
footer->rowindexstride(),
+ sargsApplier->getNextSkippedRows());
if (currentRowInStripe != nextRowToRead) {
// it is guaranteed to be at start of a row group
currentRowInStripe = nextRowToRead;
@@ -1253,16 +1252,22 @@ namespace orc {
return readRows;
}
- void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const
ReadPhase& readPhase, uint16_t* sel_rowid_idx, void* arg) {
- if (enableEncodedBlock) {
- reader->nextEncoded(data, batchSize, nullptr, readPhase);
- }
- else {
- reader->next(data, batchSize, nullptr, readPhase);
- }
+ void RowReaderImpl::nextBatch(ColumnVectorBatch& data, int batchSize, const
ReadPhase& readPhase,
+ uint16_t* sel_rowid_idx, void* arg) {
if (readPhase == ReadPhase::ALL || readPhase == ReadPhase::LEADERS) {
+ if (enableEncodedBlock) {
+ reader->nextEncoded(data, batchSize, nullptr, readPhase);
+ } else {
+ reader->next(data, batchSize, nullptr, readPhase);
+ }
// Set the batch size when reading everything or when reading FILTER
columns
data.numElements = batchSize;
+ } else {
+ if (enableEncodedBlock) {
+ reader->nextEncoded(data, batchSize, nullptr, readPhase,
sel_rowid_idx, data.numElements);
+ } else {
+ reader->next(data, batchSize, nullptr, readPhase, sel_rowid_idx,
data.numElements);
+ }
}
if (readPhase == ReadPhase::LEADERS) {
@@ -1274,80 +1279,81 @@ namespace orc {
}
}
- /**
+ /**
* Determine the RowGroup based on the supplied row id.
* @param rowIdx Row for which the row group is being determined
* @return Id of the RowGroup that the row belongs to
*/
- int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) {
- return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride);
- }
-
- /**
- * This method prepares the non-filter column readers for next batch. This
involves the following
- * 1. Determine position
- * 2. Perform IO if required
- * 3. Position the non-filter readers
- *
- * This method is repositioning the non-filter columns and as such this
method shall never have to
- * deal with navigating the stripe forward or skipping row groups, all of
this should have already
- * taken place based on the filter columns.
- * @param toFollowRow The rowIdx identifies the required row position
within the stripe for
- * follow read
- * @param fromFollowRow Indicates the current position of the follow read,
exclusive
- * @return the read phase for reading non-filter columns, this shall be
FOLLOWERS_AND_PARENTS in
- * case of a seek otherwise will be FOLLOWERS
- */
- ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride,
long toFollowRow, long fromFollowRow) {
- // 1. Determine the required row group and skip rows needed from the RG
start
- int needRG = computeRGIdx(rowIndexStride, toFollowRow);
- // The current row is not yet read so we -1 to compute the previously
read row group
- int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1);
- long skipRows;
- if (needRG == readRG && toFollowRow >= fromFollowRow) {
- // In case we are skipping forward within the same row group, we
compute skip rows from the
- // current position
- skipRows = toFollowRow - fromFollowRow;
- } else {
- // In all other cases including seeking backwards, we compute the skip
rows from the start of
- // the required row group
- skipRows = toFollowRow - (needRG * rowIndexStride);
- }
-
- // 2. Plan the row group idx for the non-filter columns if this has not
already taken place
- if (needsFollowColumnsRead) {
- needsFollowColumnsRead = false;
- }
+ int RowReaderImpl::computeRGIdx(uint64_t rowIndexStride, long rowIdx) {
+ return rowIndexStride == 0 ? 0 : (int)(rowIdx / rowIndexStride);
+ }
- // 3. Position the non-filter readers to the required RG and skipRows
- ReadPhase result = ReadPhase::FOLLOWERS;
- if (needRG != readRG || toFollowRow < fromFollowRow) {
- // When having to change a row group or in case of back navigation,
seek both the filter
- // parents and non-filter. This will re-position the parents present
vector. This is needed
- // to determine the number of non-null values to skip on the
non-filter columns.
- seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS);
- // skip rows on both the filter parents and non-filter as both have
been positioned in the
- // previous step
- reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
- result = ReadPhase::FOLLOWERS_AND_PARENTS;
- } else if (skipRows > 0) {
- // in case we are only skipping within the row group, position the
filter parents back to the
- // position of the follow. This is required to determine the non-null
values to skip on the
- // non-filter columns.
- seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS);
- reader->skip(fromFollowRow - (readRG * rowIndexStride),
ReadPhase::LEADER_PARENTS);
- // Move both the filter parents and non-filter forward, this will
compute the correct
- // non-null skips on follow children
- reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
- result = ReadPhase::FOLLOWERS_AND_PARENTS;
- }
- // Identifies the read level that should be performed for the read
- // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both
non-filter and filter parents
- // FOLLOWERS indicates read only of the non-filter level without the
parents, which is used during
- // contiguous read. During a contiguous read no skips are needed and the
non-null information of
- // the parent is available in the column vector for use during
non-filter read
- return result;
- }
+ /**
+ * This method prepares the non-filter column readers for next batch. This
involves the following
+ * 1. Determine position
+ * 2. Perform IO if required
+ * 3. Position the non-filter readers
+ *
+ * This method is repositioning the non-filter columns and as such this
method shall never have to
+ * deal with navigating the stripe forward or skipping row groups, all of
this should have already
+ * taken place based on the filter columns.
+ * @param toFollowRow The rowIdx identifies the required row position within
the stripe for
+ * follow read
+ * @param fromFollowRow Indicates the current position of the follow read,
exclusive
+ * @return the read phase for reading non-filter columns, this shall be
FOLLOWERS_AND_PARENTS in
+ * case of a seek otherwise will be FOLLOWERS
+ */
+ ReadPhase RowReaderImpl::prepareFollowReaders(uint64_t rowIndexStride, long
toFollowRow,
+ long fromFollowRow) {
+ // 1. Determine the required row group and skip rows needed from the RG
start
+ int needRG = computeRGIdx(rowIndexStride, toFollowRow);
+ // The current row is not yet read so we -1 to compute the previously read
row group
+ int readRG = computeRGIdx(rowIndexStride, fromFollowRow - 1);
+ long skipRows;
+ if (needRG == readRG && toFollowRow >= fromFollowRow) {
+ // In case we are skipping forward within the same row group, we compute
skip rows from the
+ // current position
+ skipRows = toFollowRow - fromFollowRow;
+ } else {
+ // In all other cases including seeking backwards, we compute the skip
rows from the start of
+ // the required row group
+ skipRows = toFollowRow - (needRG * rowIndexStride);
+ }
+
+ // 2. Plan the row group idx for the non-filter columns if this has not
already taken place
+ if (needsFollowColumnsRead) {
+ needsFollowColumnsRead = false;
+ }
+
+ // 3. Position the non-filter readers to the required RG and skipRows
+ ReadPhase result = ReadPhase::FOLLOWERS;
+ if (needRG != readRG || toFollowRow < fromFollowRow) {
+ // When having to change a row group or in case of back navigation, seek
both the filter
+ // parents and non-filter. This will re-position the parents present
vector. This is needed
+ // to determine the number of non-null values to skip on the non-filter
columns.
+ seekToRowGroup(needRG, ReadPhase::FOLLOWERS_AND_PARENTS);
+ // skip rows on both the filter parents and non-filter as both have been
positioned in the
+ // previous step
+ reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+ result = ReadPhase::FOLLOWERS_AND_PARENTS;
+ } else if (skipRows > 0) {
+ // in case we are only skipping within the row group, position the
filter parents back to the
+ // position of the follow. This is required to determine the non-null
values to skip on the
+ // non-filter columns.
+ seekToRowGroup(readRG, ReadPhase::LEADER_PARENTS);
+ reader->skip(fromFollowRow - (readRG * rowIndexStride),
ReadPhase::LEADER_PARENTS);
+ // Move both the filter parents and non-filter forward, this will
compute the correct
+ // non-null skips on follow children
+ reader->skip(skipRows, ReadPhase::FOLLOWERS_AND_PARENTS);
+ result = ReadPhase::FOLLOWERS_AND_PARENTS;
+ }
+ // Identifies the read level that should be performed for the read
+ // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both
non-filter and filter parents
+ // FOLLOWERS indicates read only of the non-filter level without the
parents, which is used
+ // during contiguous read. During a contiguous read no skips are needed
and the non-null
+ // information of the parent is available in the column vector for use
during non-filter read
+ return result;
+ }
uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t
currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 0c52387..d84532c 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -35,22 +35,23 @@ namespace orc {
static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
class ReaderContext {
- public:
- ReaderContext() = default;
-
- const ORCFilter* getFilterCallback() const {
- return filter;
- }
-
- ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds,
const ORCFilter* _filter) {
- this->filterColumnIds = std::move(_filterColumnIds);
- this->filter = _filter;
- return *this;
- }
-
- private:
- std::unordered_set<int> filterColumnIds;
- const ORCFilter* filter;
+ public:
+ ReaderContext() = default;
+
+ const ORCFilter* getFilterCallback() const {
+ return filter;
+ }
+
+ ReaderContext& setFilterCallback(std::unordered_set<int> _filterColumnIds,
+ const ORCFilter* _filter) {
+ this->filterColumnIds = std::move(_filterColumnIds);
+ this->filter = _filter;
+ return *this;
+ }
+
+ private:
+ std::unordered_set<int> filterColumnIds;
+ const ORCFilter* filter;
};
/**
@@ -237,25 +238,26 @@ namespace orc {
*/
bool hasBadBloomFilters();
-
// build map from type name and id, id to Type
void buildTypeNameIdMap(Type* type);
std::string toDotColumnPath();
- void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase&
readPhase, uint16_t* sel_rowid_idx, void* arg);
+ void nextBatch(ColumnVectorBatch& data, int batchSize, const ReadPhase&
readPhase,
+ uint16_t* sel_rowid_idx, void* arg);
int computeRGIdx(uint64_t rowIndexStride, long rowIdx);
ReadPhase prepareFollowReaders(uint64_t rowIndexStride, long toFollowRow,
long fromFollowRow);
- public:
+ public:
/**
* Constructor that lets the user specify additional options.
* @param contents of the file
* @param options options for reading
*/
- RowReaderImpl(std::shared_ptr<FileContents> contents, const
RowReaderOptions& options, const ORCFilter* filter = nullptr);
+ RowReaderImpl(std::shared_ptr<FileContents> contents, const
RowReaderOptions& options,
+ const ORCFilter* filter = nullptr);
// Select the columns from the options object
const std::vector<bool> getSelectedColumns() const override;
@@ -357,7 +359,8 @@ namespace orc {
std::unique_ptr<RowReader> createRowReader(const ORCFilter* filter =
nullptr) const override;
- std::unique_ptr<RowReader> createRowReader(const RowReaderOptions&
options, const ORCFilter* filter = nullptr) const override;
+ std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options,
+ const ORCFilter* filter =
nullptr) const override;
uint64_t getContentLength() const override;
uint64_t getStripeStatisticsLength() const override;
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index 47095b3..3f28dce 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -25,11 +25,15 @@
namespace orc {
- const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories({
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER });
- const ReadPhase ReadPhase::LEADERS = ReadPhase::fromCategories({
ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT });
- const ReadPhase ReadPhase::FOLLOWERS = ReadPhase::fromCategories({
ReaderCategory::NON_FILTER });
- const ReadPhase ReadPhase::LEADER_PARENTS = ReadPhase::fromCategories({
ReaderCategory::FILTER_PARENT });
- const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS =
ReadPhase::fromCategories({ ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER });
+ const ReadPhase ReadPhase::ALL = ReadPhase::fromCategories(
+ {ReaderCategory::FILTER_CHILD, ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER});
+ const ReadPhase ReadPhase::LEADERS =
+ ReadPhase::fromCategories({ReaderCategory::FILTER_CHILD,
ReaderCategory::FILTER_PARENT});
+ const ReadPhase ReadPhase::FOLLOWERS =
ReadPhase::fromCategories({ReaderCategory::NON_FILTER});
+ const ReadPhase ReadPhase::LEADER_PARENTS =
+ ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT});
+ const ReadPhase ReadPhase::FOLLOWERS_AND_PARENTS =
+ ReadPhase::fromCategories({ReaderCategory::FILTER_PARENT,
ReaderCategory::NON_FILTER});
Type::~Type() {
// PASS
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]