This is an automated email from the ASF dual-hosted git repository. uwe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 7ce2655 PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class' 7ce2655 is described below commit 7ce26553b8ce78085751e4a4ae603d4043abf337 Author: Wes McKinney <wesm+...@apache.org> AuthorDate: Tue Feb 5 15:15:43 2019 +0100 PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class' This follows corresponding work in TypedColumnReader. The public API is unchanged as can be verified by lack of changes to the unit tests Author: Wes McKinney <wesm+...@apache.org> Closes #3551 from wesm/PARQUET-1521 and squashes the following commits: aa6687a9 <Wes McKinney> Fix clang warnings 33555044 <Wes McKinney> Print build warning level b657ac93 <Wes McKinney> Fix parquet-column-io-benchmark 61204dec <Wes McKinney> Refactor TypedColumnWriter implementation to be based on pure virtual interface, remove use of extern template class --- cpp/cmake_modules/SetupCxxFlags.cmake | 2 + cpp/src/parquet/column-io-benchmark.cc | 26 +- cpp/src/parquet/column_writer.cc | 450 ++++++++++++++++++++++----------- cpp/src/parquet/column_writer.h | 208 +++------------ 4 files changed, 350 insertions(+), 336 deletions(-) diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index 44ca22f..43dab02 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -111,6 +111,8 @@ if (NOT BUILD_WARNING_LEVEL) endif(NOT BUILD_WARNING_LEVEL) string(TOUPPER ${BUILD_WARNING_LEVEL} BUILD_WARNING_LEVEL) +message(STATUS "Arrow build warning level: ${BUILD_WARNING_LEVEL}") + if ("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN") # Pre-checkin builds if ("${COMPILER_FAMILY}" STREQUAL "msvc") diff --git a/cpp/src/parquet/column-io-benchmark.cc b/cpp/src/parquet/column-io-benchmark.cc index c648d56..762bcb7 100644 --- a/cpp/src/parquet/column-io-benchmark.cc +++ b/cpp/src/parquet/column-io-benchmark.cc @@ -30,14 +30,15 @@ using schema::PrimitiveNode; namespace benchmark { -std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst, +std::shared_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst, ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema, const WriterProperties* properties) { std::unique_ptr<PageWriter> pager = PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata); - return std::unique_ptr<Int64Writer>(new Int64Writer( - metadata, std::move(pager), false /*use_dictionary*/, Encoding::PLAIN, properties)); + std::shared_ptr<ColumnWriter> writer = + ColumnWriter::Make(metadata, std::move(pager), properties); + return std::static_pointer_cast<Int64Writer>(writer); } std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) { @@ -65,14 +66,17 @@ static void BM_WriteInt64Column(::benchmark::State& state) { std::vector<int16_t> definition_levels(state.range(0), 1); std::vector<int16_t> repetition_levels(state.range(0), 0); std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition); - WriterProperties::Builder builder; - std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build(); + std::shared_ptr<WriterProperties> properties = WriterProperties::Builder() + .compression(codec) + ->encoding(Encoding::PLAIN) + ->disable_dictionary() + ->build(); auto metadata = ColumnChunkMetaDataBuilder::Make( properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata)); while (state.KeepRunning()) { InMemoryOutputStream stream; - std::unique_ptr<Int64Writer> writer = BuildWriter( + std::shared_ptr<Int64Writer> writer = BuildWriter( state.range(0), &stream, metadata.get(), schema.get(), properties.get()); writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(), values.data()); @@ -125,13 +129,17 @@ static void BM_ReadInt64Column(::benchmark::State& state) { std::vector<int16_t> definition_levels(state.range(0), 1); std::vector<int16_t> repetition_levels(state.range(0), 0); std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition); - WriterProperties::Builder builder; - std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build(); + std::shared_ptr<WriterProperties> properties = WriterProperties::Builder() + .compression(codec) + ->encoding(Encoding::PLAIN) + ->disable_dictionary() + ->build(); + auto metadata = ColumnChunkMetaDataBuilder::Make( properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata)); InMemoryOutputStream stream; - std::unique_ptr<Int64Writer> writer = BuildWriter( + std::shared_ptr<Int64Writer> writer = BuildWriter( state.range(0), &stream, metadata.get(), schema.get(), properties.get()); writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(), values.data()); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 0919a3f..47a1256 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -353,57 +353,148 @@ std::shared_ptr<WriterProperties> default_writer_properties() { return default_writer_properties; } -ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, - std::unique_ptr<PageWriter> pager, bool has_dictionary, - Encoding::type encoding, const WriterProperties* properties) - : metadata_(metadata), - descr_(metadata->descr()), - pager_(std::move(pager)), - has_dictionary_(has_dictionary), - encoding_(encoding), - properties_(properties), - allocator_(properties->memory_pool()), - num_buffered_values_(0), - num_buffered_encoded_values_(0), - rows_written_(0), - total_bytes_written_(0), - total_compressed_bytes_(0), - closed_(false), - fallback_(false) { - definition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); - repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); - definition_levels_rle_ = - std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); - repetition_levels_rle_ = - std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); - uncompressed_data_ = - std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); - if (pager_->has_compressor()) { - compressed_data_ = +class ColumnWriterImpl { + public: + ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, + std::unique_ptr<PageWriter> pager, const bool use_dictionary, + Encoding::type encoding, const WriterProperties* properties) + : metadata_(metadata), + descr_(metadata->descr()), + pager_(std::move(pager)), + has_dictionary_(use_dictionary), + encoding_(encoding), + properties_(properties), + allocator_(properties->memory_pool()), + num_buffered_values_(0), + num_buffered_encoded_values_(0), + rows_written_(0), + total_bytes_written_(0), + total_compressed_bytes_(0), + closed_(false), + fallback_(false) { + definition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); + repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); + definition_levels_rle_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + repetition_levels_rle_ = std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + uncompressed_data_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + if (pager_->has_compressor()) { + compressed_data_ = + std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); + } } -} -void ColumnWriter::InitSinks() { - definition_levels_sink_->Clear(); - repetition_levels_sink_->Clear(); -} + virtual ~ColumnWriterImpl() = default; -void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { - DCHECK(!closed_); - definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels), - sizeof(int16_t) * num_levels); -} + int64_t Close(); -void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) { - DCHECK(!closed_); - repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels), - sizeof(int16_t) * num_levels); -} + protected: + virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0; + + // Serializes Dictionary Page if enabled + virtual void WriteDictionaryPage() = 0; + + // Plain-encoded statistics of the current page + virtual EncodedStatistics GetPageStatistics() = 0; + + // Plain-encoded statistics of the whole chunk + virtual EncodedStatistics GetChunkStatistics() = 0; + + // Merges page statistics into chunk statistics, then resets the values + virtual void ResetPageStatistics() = 0; + + // Adds Data Pages to an in memory buffer in dictionary encoding mode + // Serializes the Data Pages in other encoding modes + void AddDataPage(); + + // Serializes Data Pages + void WriteDataPage(const CompressedDataPage& page); + + // Write multiple definition levels + void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { + DCHECK(!closed_); + definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels), + sizeof(int16_t) * num_levels); + } + + // Write multiple repetition levels + void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) { + DCHECK(!closed_); + repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels), + sizeof(int16_t) * num_levels); + } + + // RLE encode the src_buffer into dest_buffer and return the encoded size + int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer, + int16_t max_level); + + // Serialize the buffered Data Pages + void FlushBufferedDataPages(); + + ColumnChunkMetaDataBuilder* metadata_; + const ColumnDescriptor* descr_; + + std::unique_ptr<PageWriter> pager_; + + bool has_dictionary_; + Encoding::type encoding_; + const WriterProperties* properties_; + + LevelEncoder level_encoder_; + + ::arrow::MemoryPool* allocator_; + + // The total number of values stored in the data page. This is the maximum of + // the number of encoded definition levels or encoded values. For + // non-repeated, required columns, this is equal to the number of encoded + // values. For repeated or optional values, there may be fewer data values + // than levels, and this tells you how many encoded levels there are in that + // case. + int64_t num_buffered_values_; + + // The total number of stored values. For repeated or optional values, this + // number may be lower than num_buffered_values_. + int64_t num_buffered_encoded_values_; + + // Total number of rows written with this ColumnWriter + int rows_written_; + + // Records the total number of bytes written by the serializer + int64_t total_bytes_written_; + + // Records the current number of compressed bytes in a column + int64_t total_compressed_bytes_; + + // Flag to check if the Writer has been closed + bool closed_; + + // Flag to infer if dictionary encoding has fallen back to PLAIN + bool fallback_; + + std::unique_ptr<InMemoryOutputStream> definition_levels_sink_; + std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_; + + std::shared_ptr<ResizableBuffer> definition_levels_rle_; + std::shared_ptr<ResizableBuffer> repetition_levels_rle_; + + std::shared_ptr<ResizableBuffer> uncompressed_data_; + std::shared_ptr<ResizableBuffer> compressed_data_; + + std::vector<CompressedDataPage> data_pages_; + + private: + void InitSinks() { + definition_levels_sink_->Clear(); + repetition_levels_sink_->Clear(); + } +}; // return the size of the encoded buffer -int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer, - ResizableBuffer* dest_buffer, int16_t max_level) { +int64_t ColumnWriterImpl::RleEncodeLevels(const Buffer& src_buffer, + ResizableBuffer* dest_buffer, + int16_t max_level) { // TODO: This only works with due to some RLE specifics int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_)) + @@ -425,7 +516,7 @@ int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer, return encoded_size; } -void ColumnWriter::AddDataPage() { +void ColumnWriterImpl::AddDataPage() { int64_t definition_levels_rle_size = 0; int64_t repetition_levels_rle_size = 0; @@ -493,11 +584,11 @@ void ColumnWriter::AddDataPage() { num_buffered_encoded_values_ = 0; } -void ColumnWriter::WriteDataPage(const CompressedDataPage& page) { +void ColumnWriterImpl::WriteDataPage(const CompressedDataPage& page) { total_bytes_written_ += pager_->WriteDataPage(page); } -int64_t ColumnWriter::Close() { +int64_t ColumnWriterImpl::Close() { if (!closed_) { closed_ = true; if (has_dictionary_ && !fallback_) { @@ -525,7 +616,7 @@ int64_t ColumnWriter::Close() { return total_bytes_written_; } -void ColumnWriter::FlushBufferedDataPages() { +void ColumnWriterImpl::FlushBufferedDataPages() { // Write all outstanding data to a new page if (num_buffered_values_ > 0) { AddDataPage(); @@ -540,47 +631,123 @@ void ColumnWriter::FlushBufferedDataPages() { // ---------------------------------------------------------------------- // TypedColumnWriter -template <typename Type> -TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, - std::unique_ptr<PageWriter> pager, - const bool use_dictionary, - Encoding::type encoding, - const WriterProperties* properties) - : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, properties) { - current_encoder_ = MakeEncoder(Type::type_num, encoding, use_dictionary, descr_, - properties->memory_pool()); - - if (properties->statistics_enabled(descr_->path()) && - (SortOrder::UNKNOWN != descr_->sort_order())) { - page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_)); - chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_)); +template <typename DType> +class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> { + public: + using T = typename DType::c_type; + + TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, + std::unique_ptr<PageWriter> pager, const bool use_dictionary, + Encoding::type encoding, const WriterProperties* properties) + : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding, + properties) { + current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_, + properties->memory_pool()); + + if (properties->statistics_enabled(descr_->path()) && + (SortOrder::UNKNOWN != descr_->sort_order())) { + page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_)); + chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_)); + } + } + + int64_t Close() override { return ColumnWriterImpl::Close(); } + + void WriteBatch(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const T* values) override; + + void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, + int64_t valid_bits_offset, const T* values) override; + + int64_t EstimatedBufferedValueBytes() const override { + return current_encoder_->EstimatedDataEncodedSize(); } -} + + protected: + std::shared_ptr<Buffer> GetValuesBuffer() override { + return current_encoder_->FlushValues(); + } + void WriteDictionaryPage() override; + + // Checks if the Dictionary Page size limit is reached + // If the limit is reached, the Dictionary and Data Pages are serialized + // The encoding is switched to PLAIN + void CheckDictionarySizeLimit(); + + EncodedStatistics GetPageStatistics() override { + EncodedStatistics result; + if (page_statistics_) result = page_statistics_->Encode(); + return result; + } + + EncodedStatistics GetChunkStatistics() override { + EncodedStatistics result; + if (chunk_statistics_) result = chunk_statistics_->Encode(); + return result; + } + + void ResetPageStatistics() override; + + Type::type type() const override { return descr_->physical_type(); } + + const ColumnDescriptor* descr() const override { return descr_; } + + int64_t rows_written() const override { return rows_written_; } + + int64_t total_compressed_bytes() const override { return total_compressed_bytes_; } + + int64_t total_bytes_written() const override { return total_bytes_written_; } + + const WriterProperties* properties() override { return properties_; } + + private: + inline int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const T* values); + + inline int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, + const uint8_t* valid_bits, + int64_t valid_bits_offset, const T* values, + int64_t* num_spaced_written); + + // Write values to a temporary buffer before they are encoded into pages + void WriteValues(int64_t num_values, const T* values); + void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset, const T* values); + + using ValueEncoderType = typename EncodingTraits<DType>::Encoder; + std::unique_ptr<Encoder> current_encoder_; + + typedef TypedRowGroupStatistics<DType> TypedStats; + std::unique_ptr<TypedStats> page_statistics_; + std::unique_ptr<TypedStats> chunk_statistics_; +}; // Only one Dictionary Page is written. // Fallback to PLAIN if dictionary page limit is reached. -template <typename Type> -void TypedColumnWriter<Type>::CheckDictionarySizeLimit() { +template <typename DType> +void TypedColumnWriterImpl<DType>::CheckDictionarySizeLimit() { // We have to dynamic cast here because TypedEncoder<Type> as some compilers // don't want to cast through virtual inheritance - auto dict_encoder = dynamic_cast<DictEncoder<Type>*>(current_encoder_.get()); + auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { WriteDictionaryPage(); // Serialize the buffered Dictionary Indicies FlushBufferedDataPages(); fallback_ = true; // Only PLAIN encoding is supported for fallback in V1 - current_encoder_ = MakeEncoder(Type::type_num, Encoding::PLAIN, false, descr_, + current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_, properties_->memory_pool()); encoding_ = Encoding::PLAIN; } } -template <typename Type> -void TypedColumnWriter<Type>::WriteDictionaryPage() { +template <typename DType> +void TypedColumnWriterImpl<DType>::WriteDictionaryPage() { // We have to dynamic cast here because TypedEncoder<Type> as some compilers // don't want to cast through virtual inheritance - auto dict_encoder = dynamic_cast<DictEncoder<Type>*>(current_encoder_.get()); + auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); DCHECK(dict_encoder); std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); @@ -591,22 +758,8 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() { total_bytes_written_ += pager_->WriteDictionaryPage(page); } -template <typename Type> -EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() { - EncodedStatistics result; - if (page_statistics_) result = page_statistics_->Encode(); - return result; -} - -template <typename Type> -EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() { - EncodedStatistics result; - if (chunk_statistics_) result = chunk_statistics_->Encode(); - return result; -} - -template <typename Type> -void TypedColumnWriter<Type>::ResetPageStatistics() { +template <typename DType> +void TypedColumnWriterImpl<DType>::ResetPageStatistics() { if (chunk_statistics_ != nullptr) { chunk_statistics_->Merge(*page_statistics_); page_statistics_->Reset(); @@ -614,58 +767,13 @@ void TypedColumnWriter<Type>::ResetPageStatistics() { } // ---------------------------------------------------------------------- -// Dynamic column writer constructor - -std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, - std::unique_ptr<PageWriter> pager, - const WriterProperties* properties) { - const ColumnDescriptor* descr = metadata->descr(); - const bool use_dictionary = properties->dictionary_enabled(descr->path()) && - descr->physical_type() != Type::BOOLEAN; - Encoding::type encoding = properties->encoding(descr->path()); - if (use_dictionary) { - encoding = properties->dictionary_index_encoding(); - } - switch (descr->physical_type()) { - case Type::BOOLEAN: - return std::make_shared<BoolWriter>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::INT32: - return std::make_shared<Int32Writer>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::INT64: - return std::make_shared<Int64Writer>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::INT96: - return std::make_shared<Int96Writer>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::FLOAT: - return std::make_shared<FloatWriter>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::DOUBLE: - return std::make_shared<DoubleWriter>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::BYTE_ARRAY: - return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), use_dictionary, - encoding, properties); - case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared<FixedLenByteArrayWriter>( - metadata, std::move(pager), use_dictionary, encoding, properties); - default: - ParquetException::NYI("type reader not implemented"); - } - // Unreachable code, but supress compiler warning - return std::shared_ptr<ColumnWriter>(nullptr); -} - -// ---------------------------------------------------------------------- // Instantiate templated classes template <typename DType> -inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values, - const int16_t* def_levels, - const int16_t* rep_levels, - const T* values) { +int64_t TypedColumnWriterImpl<DType>::WriteMiniBatch(int64_t num_values, + const int16_t* def_levels, + const int16_t* rep_levels, + const T* values) { int64_t values_to_write = 0; // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { @@ -722,7 +830,7 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values, } template <typename DType> -inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced( +int64_t TypedColumnWriterImpl<DType>::WriteMiniBatchSpaced( int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) { @@ -793,8 +901,10 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced( } template <typename DType> -void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels, - const int16_t* rep_levels, const T* values) { +void TypedColumnWriterImpl<DType>::WriteBatch(int64_t num_values, + const int16_t* def_levels, + const int16_t* rep_levels, + const T* values) { // We check for DataPage limits only after we have inserted the values. If a user // writes a large number of values, the DataPage size can be much above the limit. // The purpose of this chunking is to bound this. Even if a user writes large number @@ -817,7 +927,7 @@ void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def } template <typename DType> -void TypedColumnWriter<DType>::WriteBatchSpaced( +void TypedColumnWriterImpl<DType>::WriteBatchSpaced( int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) { // We check for DataPage limits only after we have inserted the values. If a user @@ -845,27 +955,63 @@ void TypedColumnWriter<DType>::WriteBatchSpaced( } template <typename DType> -void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) { +void TypedColumnWriterImpl<DType>::WriteValues(int64_t num_values, const T* values) { dynamic_cast<ValueEncoderType*>(current_encoder_.get()) ->Put(values, static_cast<int>(num_values)); } template <typename DType> -void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values, - const uint8_t* valid_bits, - int64_t valid_bits_offset, - const T* values) { +void TypedColumnWriterImpl<DType>::WriteValuesSpaced(int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset, + const T* values) { dynamic_cast<ValueEncoderType*>(current_encoder_.get()) ->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset); } -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>; -template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>; +// ---------------------------------------------------------------------- +// Dynamic column writer constructor + +std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, + std::unique_ptr<PageWriter> pager, + const WriterProperties* properties) { + const ColumnDescriptor* descr = metadata->descr(); + const bool use_dictionary = properties->dictionary_enabled(descr->path()) && + descr->physical_type() != Type::BOOLEAN; + Encoding::type encoding = properties->encoding(descr->path()); + if (use_dictionary) { + encoding = properties->dictionary_index_encoding(); + } + switch (descr->physical_type()) { + case Type::BOOLEAN: + return std::make_shared<TypedColumnWriterImpl<BooleanType>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::INT32: + return std::make_shared<TypedColumnWriterImpl<Int32Type>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::INT64: + return std::make_shared<TypedColumnWriterImpl<Int64Type>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::INT96: + return std::make_shared<TypedColumnWriterImpl<Int96Type>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::FLOAT: + return std::make_shared<TypedColumnWriterImpl<FloatType>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::DOUBLE: + return std::make_shared<TypedColumnWriterImpl<DoubleType>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::BYTE_ARRAY: + return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared<TypedColumnWriterImpl<FLBAType>>( + metadata, std::move(pager), use_dictionary, encoding, properties); + default: + ParquetException::NYI("type reader not implemented"); + } + // Unreachable code, but supress compiler warning + return std::shared_ptr<ColumnWriter>(nullptr); +} } // namespace parquet diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 254bf0d..5b9efb4 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -105,147 +105,47 @@ class PARQUET_EXPORT PageWriter { static constexpr int WRITE_BATCH_SIZE = 1000; class PARQUET_EXPORT ColumnWriter { public: - ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>, - bool has_dictionary, Encoding::type encoding, - const WriterProperties* properties); - virtual ~ColumnWriter() = default; static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>, const WriterProperties* properties); - Type::type type() const { return descr_->physical_type(); } - - const ColumnDescriptor* descr() const { return descr_; } - - /** - * Closes the ColumnWriter, commits any buffered values to pages. - * - * @return Total size of the column in bytes - */ - int64_t Close(); - - int64_t rows_written() const { return rows_written_; } - - // Only considers the size of the compressed pages + page header - // Some values might be still buffered an not written to a page yet - int64_t total_compressed_bytes() const { return total_compressed_bytes_; } - - int64_t total_bytes_written() const { return total_bytes_written_; } - - const WriterProperties* properties() { return properties_; } - - protected: - virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0; - - // Serializes Dictionary Page if enabled - virtual void WriteDictionaryPage() = 0; - - // Checks if the Dictionary Page size limit is reached - // If the limit is reached, the Dictionary and Data Pages are serialized - // The encoding is switched to PLAIN - - virtual void CheckDictionarySizeLimit() = 0; + /// \brief Closes the ColumnWriter, commits any buffered values to pages. + /// \return Total size of the column in bytes + virtual int64_t Close() = 0; - // Plain-encoded statistics of the current page - virtual EncodedStatistics GetPageStatistics() = 0; + /// \brief The physical Parquet type of the column + virtual Type::type type() const = 0; - // Plain-encoded statistics of the whole chunk - virtual EncodedStatistics GetChunkStatistics() = 0; - - // Merges page statistics into chunk statistics, then resets the values - virtual void ResetPageStatistics() = 0; - - // Adds Data Pages to an in memory buffer in dictionary encoding mode - // Serializes the Data Pages in other encoding modes - void AddDataPage(); - - // Serializes Data Pages - void WriteDataPage(const CompressedDataPage& page); - - // Write multiple definition levels - void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels); - - // Write multiple repetition levels - void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels); - - // RLE encode the src_buffer into dest_buffer and return the encoded size - int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer, - int16_t max_level); - - // Serialize the buffered Data Pages - void FlushBufferedDataPages(); - - ColumnChunkMetaDataBuilder* metadata_; - const ColumnDescriptor* descr_; - - std::unique_ptr<PageWriter> pager_; - - bool has_dictionary_; - Encoding::type encoding_; - const WriterProperties* properties_; + /// \brief The schema for the column + virtual const ColumnDescriptor* descr() const = 0; - LevelEncoder level_encoder_; + /// \brief The number of rows written so far + virtual int64_t rows_written() const = 0; - ::arrow::MemoryPool* allocator_; + /// \brief The total size of the compressed pages + page headers. Some values + /// might be still buffered an not written to a page yet + virtual int64_t total_compressed_bytes() const = 0; - // The total number of values stored in the data page. This is the maximum of - // the number of encoded definition levels or encoded values. For - // non-repeated, required columns, this is equal to the number of encoded - // values. For repeated or optional values, there may be fewer data values - // than levels, and this tells you how many encoded levels there are in that - // case. - int64_t num_buffered_values_; + /// \brief The total number of bytes written as serialized data and + /// dictionary pages to the ColumnChunk so far + virtual int64_t total_bytes_written() const = 0; - // The total number of stored values. For repeated or optional values, this - // number may be lower than num_buffered_values_. - int64_t num_buffered_encoded_values_; - - // Total number of rows written with this ColumnWriter - int rows_written_; - - // Records the total number of bytes written by the serializer - int64_t total_bytes_written_; - - // Records the current number of compressed bytes in a column - int64_t total_compressed_bytes_; - - // Flag to check if the Writer has been closed - bool closed_; - - // Flag to infer if dictionary encoding has fallen back to PLAIN - bool fallback_; - - std::unique_ptr<InMemoryOutputStream> definition_levels_sink_; - std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_; - - std::shared_ptr<ResizableBuffer> definition_levels_rle_; - std::shared_ptr<ResizableBuffer> repetition_levels_rle_; - - std::shared_ptr<ResizableBuffer> uncompressed_data_; - std::shared_ptr<ResizableBuffer> compressed_data_; - - std::vector<CompressedDataPage> data_pages_; - - private: - void InitSinks(); + /// \brief The file-level writer properties + virtual const WriterProperties* properties() = 0; }; // API to write values to a single column. This is the main client facing API. template <typename DType> -class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter { +class TypedColumnWriter : public ColumnWriter { public: - typedef typename DType::c_type T; - - TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, - std::unique_ptr<PageWriter> pager, const bool use_dictionary, - Encoding::type encoding, const WriterProperties* properties); + using T = typename DType::c_type; // Write a batch of repetition levels, definition levels, and values to the // column. - void WriteBatch(int64_t num_values, const int16_t* def_levels, - const int16_t* rep_levels, const T* values); + virtual void WriteBatch(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const T* values) = 0; /// Write a batch of repetition levels, definition levels, and values to the /// column. @@ -273,63 +173,21 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter { /// @param values The values in the lowest nested level including /// spacing for nulls on the lowest levels; input has the length /// of the number of rows on the lowest nesting level. - void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, - const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const T* values); + virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, + int64_t valid_bits_offset, const T* values) = 0; // Estimated size of the values that are not written to a page yet - int64_t EstimatedBufferedValueBytes() const { - return current_encoder_->EstimatedDataEncodedSize(); - } - - protected: - std::shared_ptr<Buffer> GetValuesBuffer() override { - return current_encoder_->FlushValues(); - } - void WriteDictionaryPage() override; - void CheckDictionarySizeLimit() override; - EncodedStatistics GetPageStatistics() override; - EncodedStatistics GetChunkStatistics() override; - void ResetPageStatistics() override; - - private: - int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels, - const int16_t* rep_levels, const T* values); - - int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels, - const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const T* values, - int64_t* num_spaced_written); - - // Write values to a temporary buffer before they are encoded into pages - void WriteValues(int64_t num_values, const T* values); - void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, - int64_t valid_bits_offset, const T* values); - - using ValueEncoderType = typename EncodingTraits<DType>::Encoder; - std::unique_ptr<Encoder> current_encoder_; - - typedef TypedRowGroupStatistics<DType> TypedStats; - std::unique_ptr<TypedStats> page_statistics_; - std::unique_ptr<TypedStats> chunk_statistics_; + virtual int64_t EstimatedBufferedValueBytes() const = 0; }; -typedef TypedColumnWriter<BooleanType> BoolWriter; -typedef TypedColumnWriter<Int32Type> Int32Writer; -typedef TypedColumnWriter<Int64Type> Int64Writer; -typedef TypedColumnWriter<Int96Type> Int96Writer; -typedef TypedColumnWriter<FloatType> FloatWriter; -typedef TypedColumnWriter<DoubleType> DoubleWriter; -typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter; -typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter; - -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<BooleanType>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int32Type>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int64Type>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int96Type>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FloatType>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<DoubleType>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<ByteArrayType>; -PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FLBAType>; +using BoolWriter = TypedColumnWriter<BooleanType>; +using Int32Writer = TypedColumnWriter<Int32Type>; +using Int64Writer = TypedColumnWriter<Int64Type>; +using Int96Writer = TypedColumnWriter<Int96Type>; +using FloatWriter = TypedColumnWriter<FloatType>; +using DoubleWriter = TypedColumnWriter<DoubleType>; +using ByteArrayWriter = TypedColumnWriter<ByteArrayType>; +using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>; } // namespace parquet