[ 
https://issues.apache.org/jira/browse/ORC-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067310#comment-16067310
 ] 

ASF GitHub Bot commented on ORC-178:
------------------------------------

Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/128#discussion_r124667097
  
    --- Diff: c++/src/ColumnWriter.cc ---
    @@ -0,0 +1,507 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "orc/Int128.hh"
    +#include "orc/Writer.hh"
    +
    +#include "ByteRLE.hh"
    +#include "ColumnWriter.hh"
    +#include "RLE.hh"
    +#include "Statistics.hh"
    +#include "Timezone.hh"
    +
    +namespace orc {
    +  StreamsFactory::~StreamsFactory() {
    +    //PASS
    +  }
    +
    +  class StreamsFactoryImpl : public StreamsFactory {
    +  public:
    +    StreamsFactoryImpl(
    +                       const WriterOptions& writerOptions,
    +                       OutputStream* outputStream) :
    +                       options(writerOptions),
    +                       outStream(outputStream) {
    +                       }
    +
    +    virtual std::unique_ptr<BufferedOutputStream>
    +                    createStream(proto::Stream_Kind kind) override;
    +  private:
    +    const WriterOptions& options;
    +    OutputStream* outStream;
    +  };
    +
    +  std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
    +                                                      proto::Stream_Kind) {
    +    // In the future, we can decide compression strategy and modifier
    +    // based on stream kind. But for now we just use the setting from
    +    // WriterOption
    +    return createCompressor(
    +                            options.getCompression(),
    +                            outStream,
    +                            options.getCompressionStrategy(),
    +                            options.getBufferSize(),
    +                            options.getBlockSize(),
    +                            *options.getMemoryPool());
    +  }
    +
    +  std::unique_ptr<StreamsFactory> createStreamsFactory(
    +                                        const WriterOptions& options,
    +                                        OutputStream* outStream) {
    +    return std::unique_ptr<StreamsFactory>(
    +                                   new StreamsFactoryImpl(options, 
outStream));
    +  }
    +
    +  RowIndexPositionRecorder::~RowIndexPositionRecorder() {
    +    // PASS
    +  }
    +
    +  ColumnWriter::ColumnWriter(
    +                             const Type& type,
    +                             StreamsFactory& factory,
    +                             const WriterOptions& options) :
    +                                columnId(type.getColumnId()),
    +                                streamsFactory(factory),
    +                                colIndexStatistics(),
    +                                colStripeStatistics(),
    +                                colFileStatistics(),
    +                                enableIndex(options.getEnableIndex()),
    +                                enableStats(options.getEnableStats()),
    +                                rowIndex(),
    +                                rowIndexEntry(),
    +                                rowIndexPosition(),
    +                                memPool(*options.getMemoryPool()),
    +                                indexStream() {
    +
    +    std::unique_ptr<BufferedOutputStream> presentStream =
    +        factory.createStream(proto::Stream_Kind_PRESENT);
    +    notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
    +
    +    if (enableIndex || enableStats) {
    +      bool enableStrCmp = options.getEnableStrStatsCmp();
    +      colIndexStatistics = createColumnStatistics(type, enableStrCmp);
    +      if (enableStats) {
    +        colStripeStatistics = createColumnStatistics(type, enableStrCmp);
    +        colFileStatistics = createColumnStatistics(type, enableStrCmp);
    +      }
    +    }
    +
    +    if (enableIndex) {
    +      rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
    +      rowIndexEntry =
    +        std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
    +      rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
    +                     new RowIndexPositionRecorder(*rowIndexEntry));
    +      indexStream =
    +        factory.createStream(proto::Stream_Kind_ROW_INDEX);
    +    }
    +  }
    +
    +  ColumnWriter::~ColumnWriter() {
    +    // PASS
    +  }
    +
    +  void ColumnWriter::add(ColumnVectorBatch& batch,
    +                         uint64_t offset,
    +                         uint64_t numValues) {
    +    notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr);
    +  }
    +
    +  void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_PRESENT);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(notNullEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t ColumnWriter::getEstimatedSize() const {
    +    return notNullEncoder->getBufferSize();
    +  }
    +
    +  void ColumnWriter::getStripeStatistics(
    +    std::vector<proto::ColumnStatistics>& stats) const {
    +    getProtoBufStatistics(stats, colStripeStatistics.get());
    +  }
    +
    +  void ColumnWriter::mergeStripeStatsIntoFileStats() {
    +    colFileStatistics->merge(*colStripeStatistics);
    +    colStripeStatistics->reset();
    +  }
    +
    +  void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
    +    colStripeStatistics->merge(*colIndexStatistics);
    +    colIndexStatistics->reset();
    +  }
    +
    +  void ColumnWriter::getFileStatistics(
    +    std::vector<proto::ColumnStatistics>& stats) const {
    +    getProtoBufStatistics(stats, colFileStatistics.get());
    +  }
    +
    +  void ColumnWriter::createRowIndexEntry() {
    +    proto::ColumnStatistics *indexStats = 
rowIndexEntry->mutable_statistics();
    +    colIndexStatistics->toProtoBuf(*indexStats);
    +
    +    *rowIndex->add_entry() = *rowIndexEntry;
    +
    +    rowIndexEntry->clear_positions();
    +    rowIndexEntry->clear_statistics();
    +
    +    if (enableStats) {
    +      colStripeStatistics->merge(*colIndexStatistics);
    +    }
    +    colIndexStatistics->reset();
    +
    +    recordPosition();
    +  }
    +
    +  void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const 
{
    +    // write row index to output stream
    +    rowIndex->SerializeToZeroCopyStream(indexStream.get());
    +
    +    // construct row index stream
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_ROW_INDEX);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(indexStream->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  void ColumnWriter::recordPosition() const {
    +    notNullEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  void ColumnWriter::resetIndex() {
    +    // clear row index
    +    rowIndex->clear_entry();
    +    rowIndexEntry->clear_positions();
    +    rowIndexEntry->clear_statistics();
    +
    +    // write current positions
    +    recordPosition();
    +  }
    +
    +  class StructColumnWriter : public ColumnWriter {
    +  public:
    +    StructColumnWriter(
    +                       const Type& type,
    +                       StreamsFactory& factory,
    +                       const WriterOptions& options);
    +    ~StructColumnWriter();
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +    virtual void getColumnEncoding(
    +      std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void getStripeStatistics(
    +      std::vector<proto::ColumnStatistics>& stats) const override;
    +
    +    virtual void getFileStatistics(
    +      std::vector<proto::ColumnStatistics>& stats) const override;
    +
    +    virtual void mergeStripeStatsIntoFileStats() override;
    +
    +    virtual void mergeRowGroupStatsIntoStripeStats() override;
    +
    +    virtual void createRowIndexEntry() override;
    +
    +    virtual void writeIndex(
    +      std::vector<proto::Stream> &streams) const override;
    +
    +    virtual void resetIndex() override;
    +
    +  private:
    +    std::vector<ColumnWriter *> children;
    +  };
    +
    +  StructColumnWriter::StructColumnWriter(
    +                                         const Type& type,
    +                                         StreamsFactory& factory,
    +                                         const WriterOptions& options) :
    +                                         ColumnWriter(type, factory, 
options) {
    +    for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
    +      const Type& child = *type.getSubtype(i);
    +      children.push_back(buildWriter(child, factory, options).release());
    +    }
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  StructColumnWriter::~StructColumnWriter() {
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      delete children[i];
    +    }
    +  }
    +
    +  void StructColumnWriter::add(
    +                              ColumnVectorBatch& rowBatch,
    +                              uint64_t offset,
    +                              uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    const StructVectorBatch & structBatch =
    +          dynamic_cast<const StructVectorBatch &>(rowBatch);
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->add(*structBatch.fields[i], offset, numValues);
    +    }
    +
    +    // update stats
    +    if (enableIndex || enableStats) {
    +      bool hasNull = false;
    +      if (!structBatch.hasNulls) {
    +        colIndexStatistics->increase(numValues);
    +      } else {
    +        const char* notNull = structBatch.notNull.data() + offset;
    +        for (uint64_t i = 0; i < numValues; ++i) {
    +          if (notNull[i]) {
    +            colIndexStatistics->increase(1);
    +          } else if (!hasNull) {
    +            hasNull = true;
    +          }
    +        }
    +      }
    +      colIndexStatistics->setHasNull(hasNull);
    +    }
    +  }
    +
    +  void StructColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->flush(streams);
    +    }
    +  }
    +
    +  void StructColumnWriter::writeIndex(
    +                      std::vector<proto::Stream> &streams) const {
    +    ColumnWriter::writeIndex(streams);
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->writeIndex(streams);
    +    }
    +  }
    +
    +  uint64_t StructColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      size += children[i]->getEstimatedSize();
    +    }
    +    return size;
    +  }
    +
    +  void StructColumnWriter::getColumnEncoding(
    +                      std::vector<proto::ColumnEncoding>& encodings) const 
{
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->getColumnEncoding(encodings);
    +    }
    +  }
    +
    +  void StructColumnWriter::getStripeStatistics(
    +    std::vector<proto::ColumnStatistics>& stats) const {
    +    ColumnWriter::getStripeStatistics(stats);
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->getStripeStatistics(stats);
    +    }
    +  }
    +
    +  void StructColumnWriter::mergeStripeStatsIntoFileStats() {
    +    ColumnWriter::mergeStripeStatsIntoFileStats();
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->mergeStripeStatsIntoFileStats();
    +    }
    +  }
    +
    +  void StructColumnWriter::getFileStatistics(
    +    std::vector<proto::ColumnStatistics>& stats) const {
    +    ColumnWriter::getFileStatistics(stats);
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->getFileStatistics(stats);
    +    }
    +  }
    +
    +  void StructColumnWriter::mergeRowGroupStatsIntoStripeStats()  {
    +    ColumnWriter::mergeRowGroupStatsIntoStripeStats();
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->mergeRowGroupStatsIntoStripeStats();
    +    }
    +  }
    +
    +  void StructColumnWriter::createRowIndexEntry() {
    +    ColumnWriter::createRowIndexEntry();
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->createRowIndexEntry();
    +    }
    +  }
    +
    +  void StructColumnWriter::resetIndex() {
    +    ColumnWriter::resetIndex();
    +
    +    for (uint32_t i = 0; i < children.size(); ++i) {
    +      children[i]->resetIndex();
    +    }
    +  }
    +
    +  class IntegerColumnWriter : public ColumnWriter {
    +  public:
    +    IntegerColumnWriter(
    +                        const Type& type,
    +                        StreamsFactory& factory,
    +                        const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +              std::vector<proto::ColumnEncoding>& encodings) const 
override;
    +
    +    virtual void recordPosition() const override;
    +
    +  protected:
    +    std::unique_ptr<RleEncoder> rleEncoder;
    +
    +  private:
    +    RleVersion rleVersion;
    +  };
    +
    +  IntegerColumnWriter::IntegerColumnWriter(
    +                        const Type& type,
    +                        StreamsFactory& factory,
    +                        const WriterOptions& options) :
    +                          ColumnWriter(type, factory, options),
    +                          rleVersion(options.getRleVersion()) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +      factory.createStream(proto::Stream_Kind_DATA);
    +    rleEncoder = createRleEncoder(
    +                                  std::move(dataStream),
    +                                  true,
    +                                  rleVersion,
    +                                  memPool);
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void IntegerColumnWriter::add(
    +                              ColumnVectorBatch& rowBatch,
    +                              uint64_t offset,
    +                              uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    const LongVectorBatch & longBatch =
    +                    dynamic_cast<const LongVectorBatch &>(rowBatch);
    +
    +    const int64_t* data = longBatch.data.data() + offset;
    +    const char* notNull = longBatch.hasNulls ?
    +                          longBatch.notNull.data() + offset : nullptr;
    +
    +    rleEncoder->add(data, numValues, notNull);
    +
    +    // update stats
    +    if (enableIndex || enableStats) {
    +      IntegerColumnStatisticsImpl* intStats =
    +        
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
    +      for (uint64_t i = 0; i < numValues; ++i) {
    +        if (notNull == nullptr || notNull[i]) {
    +          intStats->increase(1);
    +          intStats->update(data[i], 1);
    +        } else if (!intStats->hasNull()) {
    +          intStats->setHasNull(true);
    +        }
    +      }
    +    }
    +  }
    +
    +  void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(rleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t IntegerColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += rleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void IntegerColumnWriter::getColumnEncoding(
    +                       std::vector<proto::ColumnEncoding>& encodings) 
const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(rleVersion == RleVersion_1 ?
    +                                proto::ColumnEncoding_Kind_DIRECT :
    +                                proto::ColumnEncoding_Kind_DIRECT_V2);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void IntegerColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    rleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  std::unique_ptr<ColumnWriter> buildWriter(
    +                                            const Type& type,
    +                                            StreamsFactory& factory,
    --- End diff --
    
    The factory object should be const&.


> Implement Basic C++ Writer and Writer Option
> --------------------------------------------
>
>                 Key: ORC-178
>                 URL: https://issues.apache.org/jira/browse/ORC-178
>             Project: ORC
>          Issue Type: Sub-task
>          Components: C++
>            Reporter: Gang Wu
>            Assignee: Xiening Dai
>
> 1. write orc file header, file footer, postscript, etc.
> 2. write columns of all types 
> 3. write column statistics
> 4. write index stream in writer and reader seeks to row based on index 
> information 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to