emkornfield commented on a change in pull request #11146: URL: https://github.com/apache/arrow/pull/11146#discussion_r734235844
########## File path: go/parquet/file/page_reader.go ########## @@ -0,0 +1,616 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "bytes" + "io" + "sync" + + "github.com/JohnCGriffin/overflow" + "github.com/apache/arrow/go/arrow/ipc" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/compress" + "github.com/apache/arrow/go/parquet/internal/encryption" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/apache/arrow/go/parquet/internal/thrift" + "github.com/apache/arrow/go/parquet/metadata" + "golang.org/x/xerrors" +) + +// PageReader is the interface used by the columnreader in order to read +// and handle DataPages and loop through them. +type PageReader interface { + // Set the maximum Page header size allowed to be read + SetMaxPageHeaderSize(int) + // Return the current page, or nil if there are no more + Page() Page + // Fetch the next page, returns false if there are no more pages + Next() bool + // if Next returns false, Err will return the error encountered or + // nil if there was no error and you just hit the end of the page + Err() error + // Reset allows reusing a page reader + Reset(r parquet.ReaderAtSeeker, nrows int64, compressType compress.Compression, ctx *CryptoContext) +} + +// Page is an interface for handling DataPages or Dictionary Pages +type Page interface { + // Returns which kind of page this is + Type() format.PageType + // Get the raw bytes of this page + Data() []byte + // return the encoding used for this page, Plain/RLE, etc. + Encoding() format.Encoding + // get the number of values in this page + NumValues() int32 + // release this page object back into the page pool for re-use + Release() +} + +type page struct { + buf *memory.Buffer + typ format.PageType + + nvals int32 + encoding format.Encoding +} + +func (p *page) Type() format.PageType { return p.typ } +func (p *page) Data() []byte { return p.buf.Bytes() } +func (p *page) NumValues() int32 { return p.nvals } +func (p *page) Encoding() format.Encoding { return p.encoding } + +// DataPage is the base interface for both DataPageV1 and DataPageV2 of the +// parquet spec. +type DataPage interface { + Page + UncompressedSize() int64 + Statistics() metadata.EncodedStatistics +} + +// Create some pools to use for reusing the data page objects themselves so that +// we can avoid tight loops that are creating and destroying tons of individual +// objects. This combined with a Release function on the pages themselves +// which will put them back into the pool creates significant memory usage +// and performance benefits + +var dataPageV1Pool = sync.Pool{ + New: func() interface{} { return (*DataPageV1)(nil) }, +} + +var dataPageV2Pool = sync.Pool{ + New: func() interface{} { return (*DataPageV2)(nil) }, +} + +var dictPagePool = sync.Pool{ + New: func() interface{} { return (*DictionaryPage)(nil) }, +} + +// DataPageV1 represents a DataPage version 1 from the parquet.thrift file +type DataPageV1 struct { + page + + defLvlEncoding format.Encoding + repLvlEncoding format.Encoding + uncompressedSize int64 + statistics metadata.EncodedStatistics +} + +// NewDataPageV1 returns a V1 data page with the given buffer as its data and the specified encoding information +// +// Will utilize objects that have been released back into the data page pool and +// re-use them if available as opposed to creating new objects. Calling Release on the +// data page object will release it back to the pool for re-use. +func NewDataPageV1(buffer *memory.Buffer, num int32, encoding, defEncoding, repEncoding parquet.Encoding, uncompressedSize int64) *DataPageV1 { + dp := dataPageV1Pool.Get().(*DataPageV1) + if dp == nil { + return &DataPageV1{ + page: page{buf: buffer, typ: format.PageType_DATA_PAGE, nvals: num, encoding: format.Encoding(encoding)}, + defLvlEncoding: format.Encoding(defEncoding), + repLvlEncoding: format.Encoding(repEncoding), + uncompressedSize: uncompressedSize, + } + } + + dp.buf, dp.nvals = buffer, num + dp.encoding = format.Encoding(encoding) + dp.defLvlEncoding, dp.repLvlEncoding = format.Encoding(defEncoding), format.Encoding(repEncoding) + dp.statistics.HasMax, dp.statistics.HasMin = false, false + dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, false + dp.uncompressedSize = uncompressedSize + return dp +} + +// NewDataPageV1WithStats is the same as NewDataPageV1, but also allows adding the stat info into the created page +func NewDataPageV1WithStats(buffer *memory.Buffer, num int32, encoding, defEncoding, repEncoding parquet.Encoding, uncompressedSize int64, stats metadata.EncodedStatistics) *DataPageV1 { + ret := NewDataPageV1(buffer, num, encoding, defEncoding, repEncoding, uncompressedSize) + ret.statistics = stats + return ret +} + +// Release this page back into the DataPage object pool so that it can be reused. +// +// After calling this function, the object should not be utilized anymore, otherwise +// conflicts can arise. +func (d *DataPageV1) Release() { + d.buf.Release() + d.buf = nil + dataPageV1Pool.Put(d) +} + +// UncompressedSize returns the size of the data in this data page when uncompressed +func (d *DataPageV1) UncompressedSize() int64 { return d.uncompressedSize } + +// Statistics returns the encoded statistics on this data page +func (d *DataPageV1) Statistics() metadata.EncodedStatistics { return d.statistics } + +// DefinitionLevelEncoding returns the encoding utilized for the Definition Levels +func (d *DataPageV1) DefinitionLevelEncoding() parquet.Encoding { + return parquet.Encoding(d.defLvlEncoding) +} + +// RepetitionLevelEncoding returns the encoding utilized for the Repetition Levels +func (d *DataPageV1) RepetitionLevelEncoding() parquet.Encoding { + return parquet.Encoding(d.repLvlEncoding) +} + +// DataPageV2 is the representation of the V2 data page from the parquet.thrift spec +type DataPageV2 struct { + page + + nulls int32 + nrows int32 + defLvlBytelen int32 Review comment: nit shold len be capialized? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
