[
https://issues.apache.org/jira/browse/ORC-224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16135656#comment-16135656
]
ASF GitHub Bot commented on ORC-224:
------------------------------------
Github user majetideepak commented on a diff in the pull request:
https://github.com/apache/orc/pull/149#discussion_r134308560
--- Diff: c++/src/ColumnWriter.cc ---
@@ -468,25 +472,1099 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}
- std::unique_ptr<ColumnWriter> buildWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) {
- switch (static_cast<int64_t>(type.getKind())) {
- case STRUCT:
- return std::unique_ptr<ColumnWriter>(
- new StructColumnWriter(
- type,
- factory,
- options));
- case INT:
- case LONG:
- case SHORT:
- return std::unique_ptr<ColumnWriter>(
- new IntegerColumnWriter(
- type,
- factory,
- options));
+ class ByteColumnWriter : public ColumnWriter {
+ public:
+ ByteColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> byteRleEncoder;
+ };
+
+ ByteColumnWriter::ByteColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+
factory.createStream(proto::Stream_Kind_DATA);
+ byteRleEncoder = createByteRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ LongVectorBatch& byteBatch =
+ dynamic_cast<LongVectorBatch&>(rowBatch);
+
+ int64_t* data = byteBatch.data.data() + offset;
+ const char* notNull = byteBatch.hasNulls ?
+ byteBatch.notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ byteRleEncoder->add(byteData, numValues, notNull);
+
+ IntegerColumnStatisticsImpl* intStats =
+
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ intStats->increase(1);
+ intStats->update(static_cast<int64_t>(byteData[i]), 1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ intStats->setHasNull(hasNull);
+ }
+
+ void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(byteRleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t ByteColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += byteRleEncoder->getBufferSize();
+ return size;
+ }
+
+ void ByteColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void ByteColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ byteRleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class BooleanColumnWriter : public ColumnWriter {
+ public:
+ BooleanColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> rleEncoder;
+ };
+
+ BooleanColumnWriter::BooleanColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ rleEncoder = createBooleanRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
+ int64_t* data = byteBatch.data.data() + offset;
+ const char* notNull = byteBatch.hasNulls ?
+ byteBatch.notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ rleEncoder->add(byteData, numValues, notNull);
+
+ BooleanColumnStatisticsImpl* boolStats =
+
dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ boolStats->increase(1);
+ boolStats->update(byteData[i], 1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ boolStats->setHasNull(hasNull);
+ }
+
+ void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(rleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t BooleanColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += rleEncoder->getBufferSize();
+ return size;
+ }
+
+ void BooleanColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings)
const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void BooleanColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ rleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class DoubleColumnWriter : public ColumnWriter {
+ public:
+ DoubleColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloat);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ bool isFloat;
+ std::unique_ptr<AppendOnlyBufferedStream> dataStream;
+ DataBuffer<char> buffer;
+ };
+
+ DoubleColumnWriter::DoubleColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloatType) :
+ ColumnWriter(type, factory, options),
+ isFloat(isFloatType),
+ buffer(*options.getMemoryPool()) {
+ dataStream.reset(new AppendOnlyBufferedStream(
+
factory.createStream(proto::Stream_Kind_DATA)));
+ buffer.resize(isFloat ? 4 : 8);
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ const DoubleVectorBatch& dblBatch =
+ dynamic_cast<const
DoubleVectorBatch&>(rowBatch);
+
+ const double* doubleData = dblBatch.data.data() + offset;
+ const char* notNull = dblBatch.hasNulls ?
+ dblBatch.notNull.data() + offset : nullptr;
+
+ size_t bytes = isFloat ? 4 : 8;
+ char* data = buffer.data();
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ if (isFloat) {
+ // to avoid float-double cast
+ const int32_t* intBits =
+ reinterpret_cast<const int32_t*>(&static_cast<const float&>(
+ doubleData[i]));
+ for (size_t j = 0; j < bytes; ++j) {
+ data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
+ }
+ } else {
+ const int64_t* intBits =
+ reinterpret_cast<const int64_t*>(&(doubleData[i]));
+ for (size_t j = 0; j < bytes; ++j) {
+ data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
+ }
+ }
+ dataStream->write(data, bytes);
+ }
+ }
+
+ DoubleColumnStatisticsImpl* doubleStats =
+
dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ doubleStats->increase(1);
+ doubleStats->update(doubleData[i]);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ doubleStats->setHasNull(hasNull);
+ }
+
+ void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(dataStream->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t DoubleColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += dataStream->getSize();
+ return size;
+ }
+
+ void DoubleColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const
{
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void DoubleColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ dataStream->recordPosition(rowIndexPosition.get());
+ }
+
+ class StringColumnWriter : public ColumnWriter {
+ public:
+ StringColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ protected:
+ std::unique_ptr<RleEncoder> lengthEncoder;
+ std::unique_ptr<AppendOnlyBufferedStream> dataStream;
+ RleVersion rleVersion;
+ };
+
+ StringColumnWriter::StringColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(RleVersion_1) {
+ std::unique_ptr<BufferedOutputStream> lengthStream =
+ factory.createStream(proto::Stream_Kind_LENGTH);
+ lengthEncoder = createRleEncoder(std::move(lengthStream),
+ false,
+ rleVersion,
+ memPool);
+ dataStream.reset(new AppendOnlyBufferedStream(
+ factory.createStream(proto::Stream_Kind_DATA)));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+ const StringVectorBatch & stringBatch =
+ dynamic_cast<const StringVectorBatch &>(rowBatch);
+
+ char *const * data = stringBatch.data.data() + offset;
+ const int64_t* length = stringBatch.length.data() + offset;
+ const char* notNull = stringBatch.hasNulls ?
+ stringBatch.notNull.data() + offset : nullptr;
+
+ lengthEncoder->add(length, numValues, notNull);
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ dataStream->write(data[i], static_cast<size_t>(length[i]));
+ }
+ }
+
+ StringColumnStatisticsImpl* strStats =
+
dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ strStats->update(data[i],
+ static_cast<size_t>(length[i]));
+ strStats->increase(1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ strStats->setHasNull(hasNull);
+ }
+
+ void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream length;
+ length.set_kind(proto::Stream_Kind_LENGTH);
+ length.set_column(static_cast<uint32_t>(columnId));
+ length.set_length(lengthEncoder->flush());
+ streams.push_back(length);
+
+ proto::Stream data;
+ data.set_kind(proto::Stream_Kind_DATA);
+ data.set_column(static_cast<uint32_t>(columnId));
+ data.set_length(dataStream->flush());
+ streams.push_back(data);
+ }
+
+ uint64_t StringColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += lengthEncoder->getBufferSize();
+ size += dataStream->getSize();
+ return size;
+ }
+
+ void StringColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(rleVersion == RleVersion_1 ?
+ proto::ColumnEncoding_Kind_DIRECT :
+ proto::ColumnEncoding_Kind_DIRECT_V2);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ }
+
+ void StringColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ dataStream->recordPosition(rowIndexPosition.get());
+ lengthEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class CharColumnWriter : public StringColumnWriter {
+ public:
+ CharColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ StringColumnWriter(type, factory, options),
+ fixedLength(type.getMaximumLength()),
+ padBuffer(*options.getMemoryPool(),
+ type.getMaximumLength()) {
+ // PASS
+ }
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ private:
+ uint64_t fixedLength;
+ DataBuffer<char> padBuffer;
+ };
+
+ void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+ StringVectorBatch& charsBatch =
dynamic_cast<StringVectorBatch&>(rowBatch);
+
+ char** data = charsBatch.data.data() + offset;
+ int64_t* length = charsBatch.length.data() + offset;
+ const char* notNull = charsBatch.hasNulls ?
+ charsBatch.notNull.data() + offset : nullptr;
+
+ StringColumnStatisticsImpl* strStats =
+
dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ char *charData = data[i];
+ uint64_t oriLength = static_cast<uint64_t>(length[i]);
+ if (oriLength < fixedLength) {
+ memcpy(padBuffer.data(), data[i], oriLength);
+ memset(padBuffer.data() + oriLength, ' ', fixedLength -
oriLength);
+ charData = padBuffer.data();
+ }
+ length[i] = static_cast<int64_t>(fixedLength);
+ dataStream->write(charData, fixedLength);
+
+ strStats->update(charData, fixedLength);
+ strStats->increase(1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ lengthEncoder->add(length, numValues, notNull);
+ strStats->setHasNull(hasNull);
+ }
+
+ class VarCharColumnWriter : public StringColumnWriter {
+ public:
+ VarCharColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ StringColumnWriter(type, factory, options),
+ maxLength(type.getMaximumLength()) {
+ // PASS
+ }
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ private:
+ uint64_t maxLength;
+ };
+
+ void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+ StringVectorBatch& charsBatch =
dynamic_cast<StringVectorBatch&>(rowBatch);
+
+ char* const* data = charsBatch.data.data() + offset;
+ int64_t* length = charsBatch.length.data() + offset;
+ const char* notNull = charsBatch.hasNulls ?
+ charsBatch.notNull.data() + offset : nullptr;
+
+ StringColumnStatisticsImpl* strStats =
+
dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+ bool hasNull = false;
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ if (length[i] > static_cast<int64_t>(maxLength)) {
+ length[i] = static_cast<int64_t>(maxLength);
+ }
+ dataStream->write(data[i], static_cast<size_t>(length[i]));
+
+ strStats->update(data[i], static_cast<size_t>(length[i]));
+ strStats->increase(1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ lengthEncoder->add(length, numValues, notNull);
+ strStats->setHasNull(hasNull);
+ }
+
+ class BinaryColumnWriter : public StringColumnWriter {
+ public:
+ BinaryColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ StringColumnWriter(type, factory, options) {
+ // PASS
+ }
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+ };
+
+ void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ StringVectorBatch & binBatch = dynamic_cast<StringVectorBatch
&>(rowBatch);
+ char** data = binBatch.data.data() + offset;
+ int64_t* length = binBatch.length.data() + offset;
+ const char* notNull = binBatch.hasNulls ?
+ binBatch.notNull.data() + offset : nullptr;
+
+ BinaryColumnStatisticsImpl* binStats =
+
dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
+
+ bool hasNull = false;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
+ if (!notNull || notNull[i]) {
+ dataStream->write(data[i], unsignedLength);
+
+ binStats->update(unsignedLength);
+ binStats->increase(1);
+ } else if (!hasNull) {
+ hasNull = true;
+ }
+ }
+ lengthEncoder->add(length, numValues, notNull);
+ binStats->setHasNull(hasNull);
+ }
+
+ class TimestampColumnWriter : public ColumnWriter {
+ public:
+ TimestampColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ protected:
+ std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
+
+ private:
+ RleVersion rleVersion;
+ const Timezone& timezone;
+ };
+
+ TimestampColumnWriter::TimestampColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(RleVersion_1),
+ timezone(getLocalTimezone()){
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ std::unique_ptr<BufferedOutputStream> secondaryStream =
+ factory.createStream(proto::Stream_Kind_SECONDARY);
+ secRleEncoder = createRleEncoder(std::move(dataStream),
+ true,
+ rleVersion,
+ memPool);
+ nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
+ false,
+ rleVersion,
+ memPool);
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ static int64_t formatNano(int64_t nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return (nanos) << 3;
+ } else {
+ nanos /= 100;
+ int64_t trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return (nanos) << 3 | trailingZeros;
+ }
+ }
+
+ void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) {
+ ColumnWriter::add(rowBatch, offset, numValues);
+
+ TimestampVectorBatch& tsBatch =
+ dynamic_cast<TimestampVectorBatch &>(rowBatch);
+
+ const char* notNull = tsBatch.hasNulls ?
+ tsBatch.notNull.data() + offset : nullptr;
+ int64_t *secs = tsBatch.data.data() + offset;
+ int64_t *nanos = tsBatch.nanoseconds.data() + offset;
+
+ TimestampColumnStatisticsImpl* tsStats =
+
dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ // TimestampVectorBatch already stores data in UTC
+ int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
+ tsStats->increase(1);
+ tsStats->update(millsUTC);
+ } else if (!tsStats->hasNull()) {
+ tsStats->setHasNull(true);
+ }
+ }
+
+ for (uint64_t i = 0; i < numValues; ++i) {
--- End diff --
can this loop be fused with the above loop? Applicable to other Writers
below as well?
> Implement column writers of primitive types
> -------------------------------------------
>
> Key: ORC-224
> URL: https://issues.apache.org/jira/browse/ORC-224
> Project: ORC
> Issue Type: Sub-task
> Components: C++
> Reporter: Gang Wu
> Assignee: Gang Wu
>
> As ORC-178 has implemented basic column writers for integers. This JIRA is
> targeted at implementing ColumnWriters of remaining primitive types including
> float, double, string, binary, etc.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)