emkornfield commented on a change in pull request #11538: URL: https://github.com/apache/arrow/pull/11538#discussion_r741420463
########## File path: go/parquet/file/column_writer.go ########## @@ -0,0 +1,567 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "bytes" + "encoding/binary" + "io" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/internal/encoding" + "github.com/apache/arrow/go/parquet/metadata" + "github.com/apache/arrow/go/parquet/schema" +) + +//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata column_writer_types.gen.go.tmpl + +// ColumnChunkWriter is the base interface for all columnwriters. To directly write +// data to the column, you need to assert it to the correctly typed ColumnChunkWriter +// instance, such as Int32ColumnWriter. +type ColumnChunkWriter interface { + // Close ends this column and returns the number of bytes written + Close() error + // Type returns the underlying physical parquet type for this column + Type() parquet.Type + // Descr returns the column information for this writer + Descr() *schema.Column + // RowsWritten returns the number of rows that have so far been written with this writer + RowsWritten() int64 + // TotalCompressedBytes returns the number of bytes, after compression, that have been written so far + TotalCompressedBytes() int64 + // TotalBytesWritten includes the bytes for writing dictionary pages, while TotalCompressedBytes is + // just the data and page headers + TotalBytesWritten() int64 + // Properties returns the current WriterProperties in use for this writer + Properties() *parquet.WriterProperties + + LevelInfo() LevelInfo + SetBitsBuffer(*memory.Buffer) +} + +func computeLevelInfo(descr *schema.Column) (info LevelInfo) { + info.DefLevel = descr.MaxDefinitionLevel() + info.RepLevel = descr.MaxRepetitionLevel() + + minSpacedDefLevel := descr.MaxDefinitionLevel() + n := descr.SchemaNode() + for n != nil && n.RepetitionType() != parquet.Repetitions.Repeated { + if n.RepetitionType() == parquet.Repetitions.Optional { + minSpacedDefLevel-- + } + n = n.Parent() + } + info.RepeatedAncestorDefLevel = minSpacedDefLevel + return +} + +type columnWriter struct { + metaData *metadata.ColumnChunkMetaDataBuilder + descr *schema.Column + + // scratch buffer if validity bits need to be recalculated + bitsBuffer *memory.Buffer + levelInfo LevelInfo + pager PageWriter + hasDict bool + encoding parquet.Encoding + props *parquet.WriterProperties + defEncoder encoding.LevelEncoder + repEncoder encoding.LevelEncoder + mem memory.Allocator + + pageStatistics metadata.TypedStatistics + chunkStatistics metadata.TypedStatistics + + // total number of values stored in the data page. this is the maximum + // of the number of encoded def levels or encoded values. for + // non-repeated, required columns, this is equal to the number of encoded + // values. For repeated or optional values, there may be fewer data values + // than levels, and this tells you how many encoded levels there are in that case + numBuffered int64 + + // the total number of stored values. for repeated or optional values. this + // number may be lower than numBuffered + numBufferedEncoded int64 + + rowsWritten int + totalBytesWritten int64 + // records the current number of compressed bytes in a column + totalCompressedBytes int64 + closed bool + fallback bool + + pages []DataPage + + defLevelSink *encoding.PooledBufferWriter + repLevelSink *encoding.PooledBufferWriter + + uncompressedData bytes.Buffer + compressedTemp *bytes.Buffer + + currentEncoder encoding.TypedEncoder +} + +func newColumnWriterBase(metaData *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) columnWriter { + ret := columnWriter{ + metaData: metaData, + descr: metaData.Descr(), + levelInfo: computeLevelInfo(metaData.Descr()), + pager: pager, + hasDict: useDict, + encoding: enc, + props: props, + mem: props.Allocator(), + defLevelSink: encoding.NewPooledBufferWriter(0), + repLevelSink: encoding.NewPooledBufferWriter(0), + } + if pager.HasCompressor() { + ret.compressedTemp = new(bytes.Buffer) + } + if props.StatisticsEnabledFor(ret.descr.Path()) && ret.descr.SortOrder() != schema.SortUNKNOWN { + ret.pageStatistics = metadata.NewStatistics(ret.descr, props.Allocator()) + ret.chunkStatistics = metadata.NewStatistics(ret.descr, props.Allocator()) + } + + if ret.props.DataPageVersion() == parquet.DataPageV1 { + if ret.descr.MaxDefinitionLevel() > 0 { + ret.defLevelSink.SetOffset(arrow.Uint32SizeBytes) + } + if ret.descr.MaxRepetitionLevel() > 0 { + ret.repLevelSink.SetOffset(arrow.Uint32SizeBytes) + } + } + + ret.defEncoder.Init(parquet.Encodings.RLE, ret.descr.MaxDefinitionLevel(), ret.defLevelSink) + ret.repEncoder.Init(parquet.Encodings.RLE, ret.descr.MaxRepetitionLevel(), ret.repLevelSink) + return ret +} + +func (w *columnWriter) SetBitsBuffer(buf *memory.Buffer) { w.bitsBuffer = buf } + +func (w *columnWriter) LevelInfo() LevelInfo { return w.levelInfo } + +func (w *columnWriter) Type() parquet.Type { + return w.descr.PhysicalType() +} + +func (w *columnWriter) Descr() *schema.Column { + return w.descr +} + +func (w *columnWriter) Properties() *parquet.WriterProperties { + return w.props +} + +func (w *columnWriter) TotalCompressedBytes() int64 { + return w.totalCompressedBytes +} + +func (w *columnWriter) TotalBytesWritten() int64 { + return w.totalBytesWritten +} + +func (w *columnWriter) RowsWritten() int64 { + return int64(w.rowsWritten) +} + +func (w *columnWriter) WriteDataPage(page DataPage) error { + written, err := w.pager.WriteDataPage(page) + w.totalBytesWritten += written + return err +} + +func (w *columnWriter) WriteDefinitionLevels(levels []int16) { + w.defEncoder.EncodeNoFlush(levels) +} + +func (w *columnWriter) WriteRepetitionLevels(levels []int16) { + w.repEncoder.EncodeNoFlush(levels) +} + +func (w *columnWriter) init() { Review comment: would reset be a better name for this function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org