emkornfield commented on a change in pull request #11146: URL: https://github.com/apache/arrow/pull/11146#discussion_r734239799
########## File path: go/parquet/file/page_reader.go ########## @@ -0,0 +1,616 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "bytes" + "io" + "sync" + + "github.com/JohnCGriffin/overflow" + "github.com/apache/arrow/go/arrow/ipc" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/compress" + "github.com/apache/arrow/go/parquet/internal/encryption" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/apache/arrow/go/parquet/internal/thrift" + "github.com/apache/arrow/go/parquet/metadata" + "golang.org/x/xerrors" +) + +// PageReader is the interface used by the columnreader in order to read +// and handle DataPages and loop through them. +type PageReader interface { + // Set the maximum Page header size allowed to be read + SetMaxPageHeaderSize(int) + // Return the current page, or nil if there are no more + Page() Page + // Fetch the next page, returns false if there are no more pages + Next() bool + // if Next returns false, Err will return the error encountered or + // nil if there was no error and you just hit the end of the page + Err() error + // Reset allows reusing a page reader + Reset(r parquet.ReaderAtSeeker, nrows int64, compressType compress.Compression, ctx *CryptoContext) +} + +// Page is an interface for handling DataPages or Dictionary Pages +type Page interface { + // Returns which kind of page this is + Type() format.PageType + // Get the raw bytes of this page + Data() []byte + // return the encoding used for this page, Plain/RLE, etc. + Encoding() format.Encoding + // get the number of values in this page + NumValues() int32 + // release this page object back into the page pool for re-use + Release() +} + +type page struct { + buf *memory.Buffer + typ format.PageType + + nvals int32 + encoding format.Encoding +} + +func (p *page) Type() format.PageType { return p.typ } +func (p *page) Data() []byte { return p.buf.Bytes() } +func (p *page) NumValues() int32 { return p.nvals } +func (p *page) Encoding() format.Encoding { return p.encoding } + +// DataPage is the base interface for both DataPageV1 and DataPageV2 of the +// parquet spec. +type DataPage interface { + Page + UncompressedSize() int64 + Statistics() metadata.EncodedStatistics +} + +// Create some pools to use for reusing the data page objects themselves so that +// we can avoid tight loops that are creating and destroying tons of individual +// objects. This combined with a Release function on the pages themselves +// which will put them back into the pool creates significant memory usage +// and performance benefits + +var dataPageV1Pool = sync.Pool{ + New: func() interface{} { return (*DataPageV1)(nil) }, +} + +var dataPageV2Pool = sync.Pool{ + New: func() interface{} { return (*DataPageV2)(nil) }, +} + +var dictPagePool = sync.Pool{ + New: func() interface{} { return (*DictionaryPage)(nil) }, +} + +// DataPageV1 represents a DataPage version 1 from the parquet.thrift file +type DataPageV1 struct { + page + + defLvlEncoding format.Encoding + repLvlEncoding format.Encoding + uncompressedSize int64 + statistics metadata.EncodedStatistics +} + +// NewDataPageV1 returns a V1 data page with the given buffer as its data and the specified encoding information +// +// Will utilize objects that have been released back into the data page pool and +// re-use them if available as opposed to creating new objects. Calling Release on the +// data page object will release it back to the pool for re-use. +func NewDataPageV1(buffer *memory.Buffer, num int32, encoding, defEncoding, repEncoding parquet.Encoding, uncompressedSize int64) *DataPageV1 { + dp := dataPageV1Pool.Get().(*DataPageV1) + if dp == nil { + return &DataPageV1{ + page: page{buf: buffer, typ: format.PageType_DATA_PAGE, nvals: num, encoding: format.Encoding(encoding)}, + defLvlEncoding: format.Encoding(defEncoding), + repLvlEncoding: format.Encoding(repEncoding), + uncompressedSize: uncompressedSize, + } + } + + dp.buf, dp.nvals = buffer, num + dp.encoding = format.Encoding(encoding) + dp.defLvlEncoding, dp.repLvlEncoding = format.Encoding(defEncoding), format.Encoding(repEncoding) + dp.statistics.HasMax, dp.statistics.HasMin = false, false + dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, false + dp.uncompressedSize = uncompressedSize + return dp +} + +// NewDataPageV1WithStats is the same as NewDataPageV1, but also allows adding the stat info into the created page +func NewDataPageV1WithStats(buffer *memory.Buffer, num int32, encoding, defEncoding, repEncoding parquet.Encoding, uncompressedSize int64, stats metadata.EncodedStatistics) *DataPageV1 { + ret := NewDataPageV1(buffer, num, encoding, defEncoding, repEncoding, uncompressedSize) + ret.statistics = stats + return ret +} + +// Release this page back into the DataPage object pool so that it can be reused. +// +// After calling this function, the object should not be utilized anymore, otherwise +// conflicts can arise. +func (d *DataPageV1) Release() { + d.buf.Release() + d.buf = nil + dataPageV1Pool.Put(d) +} + +// UncompressedSize returns the size of the data in this data page when uncompressed +func (d *DataPageV1) UncompressedSize() int64 { return d.uncompressedSize } + +// Statistics returns the encoded statistics on this data page +func (d *DataPageV1) Statistics() metadata.EncodedStatistics { return d.statistics } + +// DefinitionLevelEncoding returns the encoding utilized for the Definition Levels +func (d *DataPageV1) DefinitionLevelEncoding() parquet.Encoding { + return parquet.Encoding(d.defLvlEncoding) +} + +// RepetitionLevelEncoding returns the encoding utilized for the Repetition Levels +func (d *DataPageV1) RepetitionLevelEncoding() parquet.Encoding { + return parquet.Encoding(d.repLvlEncoding) +} + +// DataPageV2 is the representation of the V2 data page from the parquet.thrift spec +type DataPageV2 struct { + page + + nulls int32 + nrows int32 + defLvlBytelen int32 + repLvlBytelen int32 + compressed bool + uncompressedSize int64 + statistics metadata.EncodedStatistics +} + +// NewDataPageV2 constructs a new V2 data page with the provided information and a buffer of the raw data. +func NewDataPageV2(buffer *memory.Buffer, numValues, numNulls, numRows int32, encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen int32, uncompressed int64, isCompressed bool) *DataPageV2 { + dp := dataPageV2Pool.Get().(*DataPageV2) + if dp == nil { + return &DataPageV2{ + page: page{buf: buffer, typ: format.PageType_DATA_PAGE_V2, nvals: numValues, encoding: format.Encoding(encoding)}, + nulls: numNulls, + nrows: numRows, + defLvlBytelen: defLvlsByteLen, + repLvlBytelen: repLvlsByteLen, + compressed: isCompressed, + uncompressedSize: uncompressed, + } + } + + dp.buf, dp.nvals = buffer, numValues + dp.encoding = format.Encoding(encoding) + dp.nulls, dp.nrows = numNulls, numRows + dp.defLvlBytelen, dp.repLvlBytelen = defLvlsByteLen, repLvlsByteLen + dp.compressed, dp.uncompressedSize = isCompressed, uncompressed + dp.statistics.HasMax, dp.statistics.HasMin = false, false + dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, false + return dp +} + +// NewDataPageV2WithStats is the same as NewDataPageV2 but allows providing the encoded stats with the page. +func NewDataPageV2WithStats(buffer *memory.Buffer, numValues, numNulls, numRows int32, encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen int32, uncompressed int64, isCompressed bool, stats metadata.EncodedStatistics) *DataPageV2 { + ret := NewDataPageV2(buffer, numValues, numNulls, numRows, encoding, defLvlsByteLen, repLvlsByteLen, uncompressed, isCompressed) + ret.statistics = stats + return ret +} + +// Release this page back into the DataPage object pool so that it can be reused. +// +// After calling this function, the object should not be utilized anymore, otherwise +// conflicts can arise. +func (d *DataPageV2) Release() { + d.buf.Release() + d.buf = nil + dataPageV2Pool.Put(d) +} + +// UncompressedSize is the size of the raw page when uncompressed. If `IsCompressed` is true, then +// the raw data in the buffer is expected to be compressed. +func (d *DataPageV2) UncompressedSize() int64 { return d.uncompressedSize } + +// Statistics are the encoded statistics in the data page +func (d *DataPageV2) Statistics() metadata.EncodedStatistics { return d.statistics } + +// NumNulls is the reported number of nulls in this datapage +func (d *DataPageV2) NumNulls() int32 { return d.nulls } + +// DefinitionLevelByteLen is the number of bytes in the buffer that are used to represent the definition levels +func (d *DataPageV2) DefinitionLevelByteLen() int32 { return d.defLvlBytelen } + +// RepetitionLevelByteLen is the number of bytes in the buffer which are used to represent the repetition Levels +func (d *DataPageV2) RepetitionLevelByteLen() int32 { return d.repLvlBytelen } + +// IsCompressed returns true if the data of this page is compressed +func (d *DataPageV2) IsCompressed() bool { return d.compressed } + +// DictionaryPage represents the a page of data that uses dictionary encoding +type DictionaryPage struct { + page + + sorted bool +} + +// NewDictionaryPage constructs a new dictionary page with the provided data buffer and number of values. +func NewDictionaryPage(buffer *memory.Buffer, nvals int32, encoding parquet.Encoding) *DictionaryPage { + dp := dictPagePool.Get().(*DictionaryPage) + if dp == nil { + return &DictionaryPage{ + page: page{ + buf: buffer, + typ: format.PageType_DICTIONARY_PAGE, + nvals: nvals, + encoding: format.Encoding(encoding), + }, + } + } + + dp.buf = buffer + dp.nvals = nvals + dp.encoding = format.Encoding(encoding) + dp.sorted = false + return dp +} + +// Release this page back into the DataPage object pool so that it can be reused. +// +// After calling this function, the object should not be utilized anymore, otherwise +// conflicts can arise. +func (d *DictionaryPage) Release() { + d.buf.Release() + d.buf = nil + dictPagePool.Put(d) +} + +// IsSorted returns whether the dictionary itself is sorted +func (d *DictionaryPage) IsSorted() bool { return d.sorted } + +type serializedPageReader struct { + r ipc.ReadAtSeeker + nrows int64 + rowsSeen int64 + mem memory.Allocator + codec compress.Codec + + curPageHdr *format.PageHeader + buf *memory.Buffer + pageOrd int16 + maxPageHeaderSize int + + curPage Page + cryptoCtx CryptoContext + dataPageAad string + dataPageHeaderAad string + + decompressBuffer bytes.Buffer + err error +} + +// NewPageReader returns a page reader for the data which can be read from the provided reader and compression. +func NewPageReader(r parquet.ReaderAtSeeker, nrows int64, compressType compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader, error) { + if mem == nil { + mem = memory.NewGoAllocator() + } + + codec, err := compress.GetCodec(compressType) + if err != nil { + return nil, err + } + + rdr := &serializedPageReader{ + r: r, + maxPageHeaderSize: defaultMaxPageHeaderSize, + nrows: nrows, + mem: mem, + codec: codec, + buf: memory.NewResizableBuffer(mem), + } + rdr.decompressBuffer.Grow(defaultPageHeaderSize) + if ctx != nil { + rdr.cryptoCtx = *ctx + rdr.initDecryption() + } + return rdr, nil +} + +func (p *serializedPageReader) Reset(r parquet.ReaderAtSeeker, nrows int64, compressType compress.Compression, ctx *CryptoContext) { + p.rowsSeen, p.pageOrd = 0, 0 + p.curPageHdr, p.curPage, p.err = nil, nil, nil + p.r, p.nrows = r, nrows + + p.codec, p.err = compress.GetCodec(compressType) + if p.err != nil { + return + } + p.buf.ResizeNoShrink(0) + p.decompressBuffer.Reset() + if ctx != nil { + p.cryptoCtx = *ctx + p.initDecryption() + } else { + p.cryptoCtx = CryptoContext{} + p.dataPageAad = "" + p.dataPageHeaderAad = "" + } +} + +func (p *serializedPageReader) Err() error { return p.err } + +func (p *serializedPageReader) SetMaxPageHeaderSize(sz int) { + p.maxPageHeaderSize = sz +} + +func (p *serializedPageReader) initDecryption() { + if p.cryptoCtx.DataDecryptor != nil { + p.dataPageAad = encryption.CreateModuleAad(p.cryptoCtx.DataDecryptor.FileAad(), encryption.DataPageModule, + p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1) + } + if p.cryptoCtx.MetaDecryptor != nil { + p.dataPageHeaderAad = encryption.CreateModuleAad(p.cryptoCtx.MetaDecryptor.FileAad(), encryption.DataPageHeaderModule, + p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1) + } +} + +func (p *serializedPageReader) updateDecryption(decrypt encryption.Decryptor, moduleType int8, pageAad string) { + if p.cryptoCtx.StartDecryptWithDictionaryPage { + aad := encryption.CreateModuleAad(decrypt.FileAad(), moduleType, p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1) + decrypt.UpdateAad(aad) + } else { + pageaad := []byte(pageAad) + encryption.QuickUpdatePageAad(pageaad, p.pageOrd) + decrypt.UpdateAad(string(pageaad)) + } +} + +func (p *serializedPageReader) Page() Page { + return p.curPage +} + +func (p *serializedPageReader) decompress(lenCompressed int, buf []byte) ([]byte, error) { + p.decompressBuffer.Reset() + p.decompressBuffer.Grow(lenCompressed) + if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed)); err != nil { + return nil, err + } + + data := p.decompressBuffer.Bytes() + if p.cryptoCtx.DataDecryptor != nil { + data = p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()) + } + + return p.codec.Decode(buf, data), nil +} + +type dataheader interface { + IsSetStatistics() bool + GetStatistics() *format.Statistics +} + +func extractStats(dataHeader dataheader) (pageStats metadata.EncodedStatistics) { + if dataHeader.IsSetStatistics() { + stats := dataHeader.GetStatistics() + if stats.IsSetMaxValue() { + pageStats.SetMax(stats.GetMaxValue()) + } else if stats.IsSetMax() { + pageStats.SetMax(stats.GetMax()) + } + if stats.IsSetMinValue() { + pageStats.SetMin(stats.GetMinValue()) + } else if stats.IsSetMin() { + pageStats.SetMin(stats.GetMin()) + } + + if stats.IsSetNullCount() { + pageStats.SetNullCount(stats.GetNullCount()) + } + if stats.IsSetDistinctCount() { + pageStats.SetDistinctCount(stats.GetDistinctCount()) + } + } + return +} + +func (p *serializedPageReader) Next() bool { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + if p.curPage != nil { + p.curPage.Release() + } + p.curPage = nil + p.curPageHdr = format.NewPageHeader() + p.err = nil + + for p.rowsSeen < p.nrows { + // headerSize := 0 + allowedPgSz := defaultPageHeaderSize + + start, _ := p.r.Seek(0, io.SeekCurrent) + p.decompressBuffer.Reset() + // Page headers can be very large because of page statistics + // We try to deserialize a larger buffer progressively + // until a maximum allowed header limit + for { + n, err := io.CopyN(&p.decompressBuffer, p.r, int64(allowedPgSz)) + // view, err := p.r.Peek(allowedPgSz) + if err != nil && err != io.EOF { + p.err = err + return false + } + + if n == 0 { + return false + } + + view := p.decompressBuffer.Bytes() + + extra := 0 + if p.cryptoCtx.MetaDecryptor != nil { + p.updateDecryption(p.cryptoCtx.MetaDecryptor, encryption.DictPageHeaderModule, p.dataPageHeaderAad) + view = p.cryptoCtx.MetaDecryptor.Decrypt(view) + extra = p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta() + } + + remaining, err := thrift.DeserializeThrift(p.curPageHdr, view) + if err != nil { + allowedPgSz *= 2 + if allowedPgSz > p.maxPageHeaderSize { + p.err = xerrors.New("parquet: deserializing page header failed") + return false + } + continue + } + + p.r.Seek(start+int64(len(view)-int(remaining)+extra), io.SeekStart) + break + } + + lenCompressed := int(p.curPageHdr.GetCompressedPageSize()) + lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize()) + if lenCompressed < 0 || lenUncompressed < 0 { + p.err = xerrors.New("parquet: invalid page header") + return false + } + + if p.cryptoCtx.DataDecryptor != nil { + p.updateDecryption(p.cryptoCtx.DataDecryptor, encryption.DictPageModule, p.dataPageAad) + } + + p.buf.ResizeNoShrink(lenUncompressed) + + switch p.curPageHdr.GetType() { + case format.PageType_DICTIONARY_PAGE: + p.cryptoCtx.StartDecryptWithDictionaryPage = false + dictHeader := p.curPageHdr.GetDictionaryPageHeader() + if dictHeader.GetNumValues() < 0 { + p.err = xerrors.New("parquet: invalid page header (negative number of values)") + return false + } + + data, err := p.decompress(lenCompressed, p.buf.Bytes()) + if err != nil { Review comment: might want to check there that len(data) == lengthUncompressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
