[
https://issues.apache.org/jira/browse/ORC-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067399#comment-16067399
]
ASF GitHub Bot commented on ORC-178:
------------------------------------
Github user xndai commented on a diff in the pull request:
https://github.com/apache/orc/pull/128#discussion_r124676106
--- Diff: c++/src/ColumnWriter.cc ---
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Int128.hh"
+#include "orc/Writer.hh"
+
+#include "ByteRLE.hh"
+#include "ColumnWriter.hh"
+#include "RLE.hh"
+#include "Statistics.hh"
+#include "Timezone.hh"
+
+namespace orc {
+ StreamsFactory::~StreamsFactory() {
+ //PASS
+ }
+
+ class StreamsFactoryImpl : public StreamsFactory {
+ public:
+ StreamsFactoryImpl(
+ const WriterOptions& writerOptions,
+ OutputStream * outputStream) :
+ options(writerOptions),
+ outStream(outputStream) {
+ }
+
+ virtual std::unique_ptr<BufferedOutputStream>
+ createStream(proto::Stream_Kind kind) override;
+ private:
+ const WriterOptions& options;
+ OutputStream * outStream;
+ };
+
+ std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
+ proto::Stream_Kind) {
+ // In the future, we can decide compression strategy and modifier
+ // based on stream kind. But for now we just use the setting from
+ // WriterOption
+ return createCompressor(
+ options.getCompression(),
+ outStream,
+ options.getCompressionStrategy(),
+ options.getBufferSize(),
+ options.getBlockSize(),
+ *options.getMemoryPool());
+ }
+
+ std::unique_ptr<StreamsFactory> createStreamsFactory(
+ const WriterOptions& options,
+ OutputStream * outStream) {
+ return std::unique_ptr<StreamsFactory>(
+ new StreamsFactoryImpl(options,
outStream));
+ }
+
+ RowIndexPositionRecorder::~RowIndexPositionRecorder() {
+ // PASS
+ }
+
+ ColumnWriter::ColumnWriter(
+ const Type& type,
+ StreamsFactory& factory,
+ const WriterOptions& options) :
+ columnId(type.getColumnId()),
+ streamsFactory(factory),
+ colIndexStatistics(),
+ colStripeStatistics(),
+ colFileStatistics(),
+ enableIndex(options.getEnableIndex()),
+ enableStats(options.getEnableStats()),
+ rowIndex(),
+ rowIndexEntry(),
+ rowIndexPosition(),
+ memPool(*options.getMemoryPool()),
+ indexStream() {
+
+ std::unique_ptr<BufferedOutputStream> presentStream =
+ factory.createStream(proto::Stream_Kind_PRESENT);
+ notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
+
+ if (enableIndex || enableStats) {
+ bool enableStrCmp = options.getEnableStrStatsCmp();
+ colIndexStatistics = createColumnStatistics(type, enableStrCmp);
+ if (enableStats) {
+ colStripeStatistics = createColumnStatistics(type, enableStrCmp);
+ colFileStatistics = createColumnStatistics(type, enableStrCmp);
+ }
+ }
+
+ if (enableIndex) {
+ rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
+ rowIndexEntry =
+ std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
+ rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
+ new RowIndexPositionRecorder(*rowIndexEntry));
+ indexStream =
+ factory.createStream(proto::Stream_Kind_ROW_INDEX);
+ }
+ }
+
+ ColumnWriter::~ColumnWriter() {
+ // PASS
+ }
+
+ void ColumnWriter::add(ColumnVectorBatch& batch,
+ uint64_t offset,
+ uint64_t numValues) {
+ notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr);
+ }
+
+ void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_PRESENT);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(notNullEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t ColumnWriter::getEstimatedSize() const {
+ return notNullEncoder->getBufferSize();
+ }
+
+ void ColumnWriter::getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ getProtoBufStatistics(stats, colStripeStatistics.get());
+ }
+
+ void ColumnWriter::mergeStripeStatsIntoFileStats() {
+ colFileStatistics->merge(*colStripeStatistics);
+ colStripeStatistics->reset();
+ }
+
+ void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+ colStripeStatistics->merge(*colIndexStatistics);
+ colIndexStatistics->reset();
+ }
+
+ void ColumnWriter::getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ getProtoBufStatistics(stats, colFileStatistics.get());
+ }
+
+ void ColumnWriter::createRowIndexEntry() {
+ proto::ColumnStatistics *indexStats =
rowIndexEntry->mutable_statistics();
+ colIndexStatistics->toProtoBuf(*indexStats);
+
+ *rowIndex->add_entry() = *rowIndexEntry;
+
+ rowIndexEntry->clear_positions();
+ rowIndexEntry->clear_statistics();
+
+ if (enableStats) {
+ colStripeStatistics->merge(*colIndexStatistics);
+ }
+ colIndexStatistics->reset();
+
+ recordPosition();
+ }
+
+ void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const
{
+ // write row index to output stream
+ rowIndex->SerializeToZeroCopyStream(indexStream.get());
+
+ // construct row index stream
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_ROW_INDEX);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(indexStream->flush());
+ streams.push_back(stream);
+ }
+
+ void ColumnWriter::recordPosition() const {
+ notNullEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ void ColumnWriter::resetIndex() {
+ // clear row index
+ rowIndex->clear_entry();
+ rowIndexEntry->clear_positions();
+ rowIndexEntry->clear_statistics();
+
+ // write current positions
+ recordPosition();
+ }
+
+ class StructColumnWriter : public ColumnWriter {
+ public:
+ StructColumnWriter(
+ const Type& type,
+ StreamsFactory& factory,
+ const WriterOptions& options);
+ ~StructColumnWriter();
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void mergeStripeStatsIntoFileStats() override;
+
+ virtual void mergeRowGroupStatsIntoStripeStats() override;
+
+ virtual void createRowIndexEntry() override;
+
+ virtual void writeIndex(
+ std::vector<proto::Stream> &streams) const override;
+
+ virtual void resetIndex() override;
+
+ private:
+ std::vector<ColumnWriter *> children;
--- End diff --
We could do that. Unfortunately our own code base doesn't support
unique_ptr, removing this will create difficulties for us to integrate Orc code.
> Implement Basic C++ Writer and Writer Option
> --------------------------------------------
>
> Key: ORC-178
> URL: https://issues.apache.org/jira/browse/ORC-178
> Project: ORC
> Issue Type: Sub-task
> Components: C++
> Reporter: Gang Wu
> Assignee: Xiening Dai
>
> 1. write orc file header, file footer, postscript, etc.
> 2. write columns of all types
> 3. write column statistics
> 4. write index stream in writer and reader seeks to row based on index
> information
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)