[ 
https://issues.apache.org/jira/browse/ORC-224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16135656#comment-16135656
 ] 

ASF GitHub Bot commented on ORC-224:
------------------------------------

Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/149#discussion_r134308560
  
    --- Diff: c++/src/ColumnWriter.cc ---
    @@ -468,25 +472,1099 @@ namespace orc {
         rleEncoder->recordPosition(rowIndexPosition.get());
       }
     
    -  std::unique_ptr<ColumnWriter> buildWriter(
    -                                            const Type& type,
    -                                            const StreamsFactory& factory,
    -                                            const WriterOptions& options) {
    -    switch (static_cast<int64_t>(type.getKind())) {
    -      case STRUCT:
    -        return std::unique_ptr<ColumnWriter>(
    -          new StructColumnWriter(
    -                                 type,
    -                                 factory,
    -                                 options));
    -      case INT:
    -      case LONG:
    -      case SHORT:
    -        return std::unique_ptr<ColumnWriter>(
    -          new IntegerColumnWriter(
    -                                  type,
    -                                  factory,
    -                                  options));
    +  class ByteColumnWriter : public ColumnWriter {
    +  public:
    +    ByteColumnWriter(const Type& type,
    +                     const StreamsFactory& factory,
    +                     const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +            std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    std::unique_ptr<ByteRleEncoder> byteRleEncoder;
    +  };
    +
    +  ByteColumnWriter::ByteColumnWriter(
    +                        const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options) :
    +                             ColumnWriter(type, factory, options) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +                                  
factory.createStream(proto::Stream_Kind_DATA);
    +    byteRleEncoder = createByteRleEncoder(std::move(dataStream));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                             uint64_t offset,
    +                             uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    LongVectorBatch& byteBatch =
    +                               dynamic_cast<LongVectorBatch&>(rowBatch);
    +
    +    int64_t* data = byteBatch.data.data() + offset;
    +    const char* notNull = byteBatch.hasNulls ?
    +                          byteBatch.notNull.data() + offset : nullptr;
    +
    +    char* byteData = reinterpret_cast<char*>(data);
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      byteData[i] = static_cast<char>(data[i]);
    +    }
    +    byteRleEncoder->add(byteData, numValues, notNull);
    +
    +    IntegerColumnStatisticsImpl* intStats =
    +        
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        intStats->increase(1);
    +        intStats->update(static_cast<int64_t>(byteData[i]), 1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    intStats->setHasNull(hasNull);
    +  }
    +
    +  void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(byteRleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t ByteColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += byteRleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void ByteColumnWriter::getColumnEncoding(
    +    std::vector<proto::ColumnEncoding>& encodings) const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void ByteColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    byteRleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class BooleanColumnWriter : public ColumnWriter {
    +  public:
    +    BooleanColumnWriter(const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    std::unique_ptr<ByteRleEncoder> rleEncoder;
    +  };
    +
    +  BooleanColumnWriter::BooleanColumnWriter(
    +                           const Type& type,
    +                           const StreamsFactory& factory,
    +                           const WriterOptions& options) :
    +                               ColumnWriter(type, factory, options) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +      factory.createStream(proto::Stream_Kind_DATA);
    +    rleEncoder = createBooleanRleEncoder(std::move(dataStream));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                uint64_t offset,
    +                                uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
    +    int64_t* data = byteBatch.data.data() + offset;
    +    const char* notNull = byteBatch.hasNulls ?
    +                          byteBatch.notNull.data() + offset : nullptr;
    +
    +    char* byteData = reinterpret_cast<char*>(data);
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      byteData[i] = static_cast<char>(data[i]);
    +    }
    +    rleEncoder->add(byteData, numValues, notNull);
    +
    +    BooleanColumnStatisticsImpl* boolStats =
    +        
dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        boolStats->increase(1);
    +        boolStats->update(byteData[i], 1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    boolStats->setHasNull(hasNull);
    +  }
    +
    +  void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(rleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t BooleanColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += rleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void BooleanColumnWriter::getColumnEncoding(
    +                       std::vector<proto::ColumnEncoding>& encodings) 
const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void BooleanColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    rleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class DoubleColumnWriter : public ColumnWriter {
    +  public:
    +    DoubleColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options,
    +                       bool isFloat);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    bool isFloat;
    +    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
    +    DataBuffer<char> buffer;
    +  };
    +
    +  DoubleColumnWriter::DoubleColumnWriter(
    +                          const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options,
    +                          bool isFloatType) :
    +                              ColumnWriter(type, factory, options),
    +                              isFloat(isFloatType),
    +                              buffer(*options.getMemoryPool()) {
    +    dataStream.reset(new AppendOnlyBufferedStream(
    +                             
factory.createStream(proto::Stream_Kind_DATA)));
    +    buffer.resize(isFloat ? 4 : 8);
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    const DoubleVectorBatch& dblBatch =
    +                             dynamic_cast<const 
DoubleVectorBatch&>(rowBatch);
    +
    +    const double* doubleData = dblBatch.data.data() + offset;
    +    const char* notNull = dblBatch.hasNulls ?
    +                          dblBatch.notNull.data() + offset : nullptr;
    +
    +    size_t bytes = isFloat ? 4 : 8;
    +    char* data = buffer.data();
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        if (isFloat) {
    +          // to avoid float-double cast
    +          const int32_t* intBits =
    +            reinterpret_cast<const int32_t*>(&static_cast<const float&>(
    +              doubleData[i]));
    +          for (size_t j = 0; j < bytes; ++j) {
    +            data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
    +          }
    +        } else {
    +          const int64_t* intBits =
    +            reinterpret_cast<const int64_t*>(&(doubleData[i]));
    +          for (size_t j = 0; j < bytes; ++j) {
    +            data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
    +          }
    +        }
    +        dataStream->write(data, bytes);
    +      }
    +    }
    +
    +    DoubleColumnStatisticsImpl* doubleStats =
    +        
dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        doubleStats->increase(1);
    +        doubleStats->update(doubleData[i]);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    doubleStats->setHasNull(hasNull);
    +  }
    +
    +  void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(dataStream->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t DoubleColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += dataStream->getSize();
    +    return size;
    +  }
    +
    +  void DoubleColumnWriter::getColumnEncoding(
    +                      std::vector<proto::ColumnEncoding>& encodings) const 
{
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void DoubleColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    dataStream->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class StringColumnWriter : public ColumnWriter {
    +  public:
    +    StringColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  protected:
    +    std::unique_ptr<RleEncoder> lengthEncoder;
    +    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
    +    RleVersion rleVersion;
    +  };
    +
    +  StringColumnWriter::StringColumnWriter(
    +                          const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options) :
    +                              ColumnWriter(type, factory, options),
    +                              rleVersion(RleVersion_1) {
    +    std::unique_ptr<BufferedOutputStream> lengthStream =
    +        factory.createStream(proto::Stream_Kind_LENGTH);
    +    lengthEncoder = createRleEncoder(std::move(lengthStream),
    +                                     false,
    +                                     rleVersion,
    +                                     memPool);
    +    dataStream.reset(new AppendOnlyBufferedStream(
    +        factory.createStream(proto::Stream_Kind_DATA)));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    const StringVectorBatch & stringBatch =
    +      dynamic_cast<const StringVectorBatch &>(rowBatch);
    +
    +    char *const * data = stringBatch.data.data() + offset;
    +    const int64_t* length = stringBatch.length.data() + offset;
    +    const char* notNull = stringBatch.hasNulls ?
    +                          stringBatch.notNull.data() + offset : nullptr;
    +
    +    lengthEncoder->add(length, numValues, notNull);
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        dataStream->write(data[i], static_cast<size_t>(length[i]));
    +      }
    +    }
    +
    +    StringColumnStatisticsImpl* strStats =
    +        
dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        strStats->update(data[i],
    +                         static_cast<size_t>(length[i]));
    +        strStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    strStats->setHasNull(hasNull);
    +  }
    +
    +  void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream length;
    +    length.set_kind(proto::Stream_Kind_LENGTH);
    +    length.set_column(static_cast<uint32_t>(columnId));
    +    length.set_length(lengthEncoder->flush());
    +    streams.push_back(length);
    +
    +    proto::Stream data;
    +    data.set_kind(proto::Stream_Kind_DATA);
    +    data.set_column(static_cast<uint32_t>(columnId));
    +    data.set_length(dataStream->flush());
    +    streams.push_back(data);
    +  }
    +
    +  uint64_t StringColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += lengthEncoder->getBufferSize();
    +    size += dataStream->getSize();
    +    return size;
    +  }
    +
    +  void StringColumnWriter::getColumnEncoding(
    +    std::vector<proto::ColumnEncoding>& encodings) const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(rleVersion == RleVersion_1 ?
    +                      proto::ColumnEncoding_Kind_DIRECT :
    +                      proto::ColumnEncoding_Kind_DIRECT_V2);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void StringColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    dataStream->recordPosition(rowIndexPosition.get());
    +    lengthEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class CharColumnWriter : public StringColumnWriter {
    +  public:
    +    CharColumnWriter(const Type& type,
    +                     const StreamsFactory& factory,
    +                     const WriterOptions& options) :
    +                         StringColumnWriter(type, factory, options),
    +                         fixedLength(type.getMaximumLength()),
    +                         padBuffer(*options.getMemoryPool(),
    +                                   type.getMaximumLength()) {
    +      // PASS
    +    }
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +  private:
    +    uint64_t fixedLength;
    +    DataBuffer<char> padBuffer;
    +  };
    +
    +  void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                             uint64_t offset,
    +                             uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    StringVectorBatch& charsBatch = 
dynamic_cast<StringVectorBatch&>(rowBatch);
    +
    +    char** data = charsBatch.data.data() + offset;
    +    int64_t* length = charsBatch.length.data() + offset;
    +    const char* notNull = charsBatch.hasNulls ?
    +                          charsBatch.notNull.data() + offset : nullptr;
    +
    +    StringColumnStatisticsImpl* strStats =
    +        
dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        char *charData = data[i];
    +        uint64_t oriLength = static_cast<uint64_t>(length[i]);
    +        if (oriLength < fixedLength) {
    +          memcpy(padBuffer.data(), data[i], oriLength);
    +          memset(padBuffer.data() + oriLength, ' ', fixedLength - 
oriLength);
    +          charData = padBuffer.data();
    +        }
    +        length[i] = static_cast<int64_t>(fixedLength);
    +        dataStream->write(charData, fixedLength);
    +
    +        strStats->update(charData, fixedLength);
    +        strStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    lengthEncoder->add(length, numValues, notNull);
    +    strStats->setHasNull(hasNull);
    +  }
    +
    +  class VarCharColumnWriter : public StringColumnWriter {
    +  public:
    +    VarCharColumnWriter(const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options) :
    +                            StringColumnWriter(type, factory, options),
    +                            maxLength(type.getMaximumLength()) {
    +      // PASS
    +    }
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +  private:
    +    uint64_t maxLength;
    +  };
    +
    +  void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                uint64_t offset,
    +                                uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    StringVectorBatch& charsBatch = 
dynamic_cast<StringVectorBatch&>(rowBatch);
    +
    +    char* const* data = charsBatch.data.data() + offset;
    +    int64_t* length = charsBatch.length.data() + offset;
    +    const char* notNull = charsBatch.hasNulls ?
    +                          charsBatch.notNull.data() + offset : nullptr;
    +
    +    StringColumnStatisticsImpl* strStats =
    +        
dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        if (length[i] > static_cast<int64_t>(maxLength)) {
    +          length[i] = static_cast<int64_t>(maxLength);
    +        }
    +        dataStream->write(data[i], static_cast<size_t>(length[i]));
    +
    +        strStats->update(data[i], static_cast<size_t>(length[i]));
    +        strStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    lengthEncoder->add(length, numValues, notNull);
    +    strStats->setHasNull(hasNull);
    +  }
    +
    +  class BinaryColumnWriter : public StringColumnWriter {
    +  public:
    +    BinaryColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options) :
    +                           StringColumnWriter(type, factory, options) {
    +      // PASS
    +    }
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +  };
    +
    +  void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    StringVectorBatch & binBatch = dynamic_cast<StringVectorBatch 
&>(rowBatch);
    +    char** data = binBatch.data.data() + offset;
    +    int64_t* length = binBatch.length.data() + offset;
    +    const char* notNull = binBatch.hasNulls ?
    +                          binBatch.notNull.data() + offset : nullptr;
    +
    +    BinaryColumnStatisticsImpl* binStats =
    +        
dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
    +
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
    +      if (!notNull || notNull[i]) {
    +        dataStream->write(data[i], unsignedLength);
    +
    +        binStats->update(unsignedLength);
    +        binStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    lengthEncoder->add(length, numValues, notNull);
    +    binStats->setHasNull(hasNull);
    +  }
    +
    +  class TimestampColumnWriter : public ColumnWriter {
    +  public:
    +    TimestampColumnWriter(const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  protected:
    +    std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
    +
    +  private:
    +    RleVersion rleVersion;
    +    const Timezone& timezone;
    +  };
    +
    +  TimestampColumnWriter::TimestampColumnWriter(
    +                             const Type& type,
    +                             const StreamsFactory& factory,
    +                             const WriterOptions& options) :
    +                                 ColumnWriter(type, factory, options),
    +                                 rleVersion(RleVersion_1),
    +                                 timezone(getLocalTimezone()){
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +        factory.createStream(proto::Stream_Kind_DATA);
    +    std::unique_ptr<BufferedOutputStream> secondaryStream =
    +        factory.createStream(proto::Stream_Kind_SECONDARY);
    +    secRleEncoder = createRleEncoder(std::move(dataStream),
    +                                     true,
    +                                     rleVersion,
    +                                     memPool);
    +    nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
    +                                      false,
    +                                      rleVersion,
    +                                      memPool);
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  static int64_t formatNano(int64_t nanos) {
    +    if (nanos == 0) {
    +      return 0;
    +    } else if (nanos % 100 != 0) {
    +      return (nanos) << 3;
    +    } else {
    +      nanos /= 100;
    +      int64_t trailingZeros = 1;
    +      while (nanos % 10 == 0 && trailingZeros < 7) {
    +        nanos /= 10;
    +        trailingZeros += 1;
    +      }
    +      return (nanos) << 3 | trailingZeros;
    +    }
    +  }
    +
    +  void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                  uint64_t offset,
    +                                  uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    TimestampVectorBatch& tsBatch =
    +      dynamic_cast<TimestampVectorBatch &>(rowBatch);
    +
    +    const char* notNull = tsBatch.hasNulls ?
    +                          tsBatch.notNull.data() + offset : nullptr;
    +    int64_t *secs = tsBatch.data.data() + offset;
    +    int64_t *nanos = tsBatch.nanoseconds.data() + offset;
    +
    +    TimestampColumnStatisticsImpl* tsStats =
    +        
dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        // TimestampVectorBatch already stores data in UTC
    +        int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
    +        tsStats->increase(1);
    +        tsStats->update(millsUTC);
    +      } else if (!tsStats->hasNull()) {
    +        tsStats->setHasNull(true);
    +      }
    +    }
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    --- End diff --
    
    can this loop be fused with the above loop? Applicable to other Writers 
below as well?


> Implement column writers of primitive types
> -------------------------------------------
>
>                 Key: ORC-224
>                 URL: https://issues.apache.org/jira/browse/ORC-224
>             Project: ORC
>          Issue Type: Sub-task
>          Components: C++
>            Reporter: Gang Wu
>            Assignee: Gang Wu
>
> As ORC-178 has implemented basic column writers for integers. This JIRA is 
> targeted at implementing ColumnWriters of remaining primitive types including 
> float, double, string, binary, etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to