This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new aecdc0b ARROW-13984: [Go][Parquet] file handling for go parquet, just
the readers
aecdc0b is described below
commit aecdc0bd75ef14095ec2a560885c3f4e059bc730
Author: Matthew Topol <[email protected]>
AuthorDate: Sat Oct 23 14:09:59 2021 -0400
ARROW-13984: [Go][Parquet] file handling for go parquet, just the readers
This implements the file/column and page readers for Parquet files. In
order to keep this smaller, I've only included what was necessary for the
readers and will make a separate PR for the file and column writers after this.
Closes #11146 from zeroshade/goparquet-file
Lead-authored-by: Matthew Topol <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/parquet/file/column_reader.go | 498 +++++++++++++++++
go/parquet/file/column_reader_test.go | 450 +++++++++++++++
go/parquet/file/column_reader_types.gen.go | 299 ++++++++++
go/parquet/file/column_reader_types.gen.go.tmpl | 62 +++
go/parquet/file/file_reader.go | 336 +++++++++++
go/parquet/file/file_reader_test.go | 304 ++++++++++
go/parquet/file/level_conversion.go | 262 +++++++++
go/parquet/file/level_conversion_test.go | 194 +++++++
go/parquet/file/page_reader.go | 620 +++++++++++++++++++++
go/parquet/file/row_group_reader.go | 130 +++++
go/parquet/go.sum | 1 +
go/parquet/internal/bmi/bitmap_bmi2_noasm.go | 24 +
go/parquet/internal/bmi/bmi.go | 2 +-
go/parquet/internal/encoding/boolean_decoder.go | 4 +-
go/parquet/internal/encoding/boolean_encoder.go | 3 +
go/parquet/internal/encoding/typed_encoder.gen.go | 158 +++++-
.../internal/encoding/typed_encoder.gen.go.tmpl | 46 +-
go/parquet/internal/testutils/pagebuilder.go | 297 ++++++++++
go/parquet/reader_properties.go | 3 +-
go/parquet/types.go | 10 +
20 files changed, 3689 insertions(+), 14 deletions(-)
diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
new file mode 100644
index 0000000..79c6479
--- /dev/null
+++ b/go/parquet/file/column_reader.go
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/encoding"
+ "github.com/apache/arrow/go/parquet/internal/encryption"
+ format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/utils"
+ "github.com/apache/arrow/go/parquet/schema"
+ "golang.org/x/xerrors"
+)
+
+const (
+ // 4 MB is the default maximum page header size
+ defaultMaxPageHeaderSize = 4 * 1024 * 1024
+ // 16 KB is the default expected page header size
+ defaultPageHeaderSize = 16 * 1024
+)
+
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i
-data=../internal/encoding/physical_types.tmpldata
column_reader_types.gen.go.tmpl
+
+func isDictIndexEncoding(e format.Encoding) bool {
+ return e == format.Encoding_RLE_DICTIONARY || e ==
format.Encoding_PLAIN_DICTIONARY
+}
+
+// CryptoContext is a context for keeping track of the current methods for
decrypting.
+// It keeps track of the row group and column numbers along with references to
the
+// decryptor objects.
+type CryptoContext struct {
+ StartDecryptWithDictionaryPage bool
+ RowGroupOrdinal int16
+ ColumnOrdinal int16
+ MetaDecryptor encryption.Decryptor
+ DataDecryptor encryption.Decryptor
+}
+
+// ColumnChunkReader is the basic interface for all column readers. It will use
+// a page reader to read all the pages in a column chunk from a row group.
+//
+// To actually Read out the column data, you need to convert to the properly
+// typed ColumnChunkReader type such as *BooleanColumnReader etc.
+//
+// Some things to clarify when working with column readers:
+//
+// "Values" refers to the physical data values in a data page.
+//
+// This is separate from the number of "rows" in a column and the total number
+// of "elements" in a column because null values aren't stored physically in
the
+// data page but are represented via definition levels, so the number of values
+// in a column can be less than the number of rows.
+//
+// The total number of "elements" in a column also differs because of potential
+// repeated fields, where you can have multiple values in the page which
+// together make up a single element (such as a list) or depending on the
repetition
+// level and definition level, could represent an entire null list or just a
null
+// element inside of a list.
+type ColumnChunkReader interface {
+ // HasNext returns whether there is more data to be read in this column
+ // and row group.
+ HasNext() bool
+ // Type returns the underlying physical type of the column
+ Type() parquet.Type
+ // Descriptor returns the column schema container
+ Descriptor() *schema.Column
+ // if HasNext returns false because of an error, this will return the
error
+ // it encountered. Otherwise this will be nil if it's just the end of
the
+ // column
+ Err() error
+ // Skip buffered values
+ consumeBufferedValues(int64)
+ // number of available buffered values that have not been decoded yet
+ // when this returns 0, you're at the end of a page.
+ numAvailValues() int64
+ // read the definition levels and return the number of definitions,
+ // and the number of values to be read (number of def levels == maxdef
level)
+ // it also populates the passed in slice which should be sized
appropriately.
+ readDefinitionLevels(levels []int16) (int, int64)
+ // read the repetition levels and return the number of repetition
levels read
+ // also populates the passed in slice, which should be sized
appropriately.
+ readRepetitionLevels(levels []int16) int
+ // a column is made up of potentially multiple pages across potentially
multiple
+ // row groups. A PageReader allows looping through the pages in a
single row group.
+ // When moving to another row group for reading, use setPageReader to
re-use the
+ // column reader for reading the pages of the new row group.
+ pager() PageReader
+ // set a page reader into the columnreader so it can be reused.
+ //
+ // This will clear any current error in the reader but does not
+ // automatically read the first page of the page reader passed in until
+ // HasNext which will read in the next page.
+ setPageReader(PageReader)
+}
+
+type columnChunkReader struct {
+ descr *schema.Column
+ rdr PageReader
+ repetitionDecoder encoding.LevelDecoder
+ definitionDecoder encoding.LevelDecoder
+
+ curPage Page
+ curEncoding format.Encoding
+ curDecoder encoding.TypedDecoder
+
+ // number of currently buffered values in the current page
+ numBuffered int64
+ // the number of values we've decoded so far
+ numDecoded int64
+ mem memory.Allocator
+
+ decoders map[format.Encoding]encoding.TypedDecoder
+ decoderTraits encoding.DecoderTraits
+
+ // is set when an error is encountered
+ err error
+ defLvlBuffer []int16
+}
+
+// NewColumnReader returns a column reader for the provided column initialized
with the given pagereader that will
+// provide the pages of data for this column. The type is determined from the
column passed in.
+func NewColumnReader(descr *schema.Column, pageReader PageReader, mem
memory.Allocator) ColumnChunkReader {
+ base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem,
decoders: make(map[format.Encoding]encoding.TypedDecoder)}
+ switch descr.PhysicalType() {
+ case parquet.Types.FixedLenByteArray:
+ base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
+ return &FixedLenByteArrayColumnChunkReader{base}
+ case parquet.Types.Float:
+ base.decoderTraits = &encoding.Float32DecoderTraits
+ return &Float32ColumnChunkReader{base}
+ case parquet.Types.Double:
+ base.decoderTraits = &encoding.Float64DecoderTraits
+ return &Float64ColumnChunkReader{base}
+ case parquet.Types.ByteArray:
+ base.decoderTraits = &encoding.ByteArrayDecoderTraits
+ return &ByteArrayColumnChunkReader{base}
+ case parquet.Types.Int32:
+ base.decoderTraits = &encoding.Int32DecoderTraits
+ return &Int32ColumnChunkReader{base}
+ case parquet.Types.Int64:
+ base.decoderTraits = &encoding.Int64DecoderTraits
+ return &Int64ColumnChunkReader{base}
+ case parquet.Types.Int96:
+ base.decoderTraits = &encoding.Int96DecoderTraits
+ return &Int96ColumnChunkReader{base}
+ case parquet.Types.Boolean:
+ base.decoderTraits = &encoding.BooleanDecoderTraits
+ return &BooleanColumnChunkReader{base}
+ }
+ return nil
+}
+
+func (c *columnChunkReader) Err() error { return c.err }
+func (c *columnChunkReader) Type() parquet.Type { return
c.descr.PhysicalType() }
+func (c *columnChunkReader) Descriptor() *schema.Column { return c.descr }
+func (c *columnChunkReader) consumeBufferedValues(n int64) { c.numDecoded += n
}
+func (c *columnChunkReader) numAvailValues() int64 { return
c.numBuffered - c.numDecoded }
+func (c *columnChunkReader) pager() PageReader { return c.rdr }
+func (c *columnChunkReader) setPageReader(rdr PageReader) {
+ c.rdr, c.err = rdr, nil
+ c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
+ c.numBuffered, c.numDecoded = 0, 0
+}
+
+func (c *columnChunkReader) getDefLvlBuffer(sz int64) []int16 {
+ if int64(len(c.defLvlBuffer)) < sz {
+ c.defLvlBuffer = make([]int16, sz)
+ return c.defLvlBuffer
+ }
+
+ return c.defLvlBuffer[:sz]
+}
+
+// HasNext returns whether there is more data to be read in this column
+// and row group.
+func (c *columnChunkReader) HasNext() bool {
+ if c.numBuffered == 0 || c.numDecoded == c.numBuffered {
+ return c.readNewPage() && c.numBuffered != 0
+ }
+ return true
+}
+
+func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
+ enc := page.encoding
+ if enc == format.Encoding_PLAIN_DICTIONARY || enc ==
format.Encoding_PLAIN {
+ enc = format.Encoding_RLE_DICTIONARY
+ }
+
+ if _, ok := c.decoders[enc]; ok {
+ return xerrors.New("parquet: column chunk cannot have more than
one dictionary.")
+ }
+
+ switch page.Encoding() {
+ case format.Encoding_PLAIN, format.Encoding_PLAIN_DICTIONARY:
+ dict := c.decoderTraits.Decoder(parquet.Encodings.Plain,
c.descr, false, c.mem)
+ dict.SetData(int(page.NumValues()), page.Data())
+
+ decoder := c.decoderTraits.Decoder(parquet.Encodings.Plain,
c.descr, true, c.mem).(encoding.DictDecoder)
+ decoder.SetDict(dict)
+ c.decoders[enc] = decoder
+ default:
+ return xerrors.New("parquet: dictionary index must be plain
encoding")
+ }
+
+ c.curDecoder = c.decoders[enc]
+ return nil
+}
+
+// read a new page from the page reader
+func (c *columnChunkReader) readNewPage() bool {
+ for c.rdr.Next() { // keep going until we get a data page
+ c.curPage = c.rdr.Page()
+ if c.curPage == nil {
+ break
+ }
+
+ var lvlByteLen int64
+ switch p := c.curPage.(type) {
+ case *DictionaryPage:
+ if err := c.configureDict(p); err != nil {
+ c.err = err
+ return false
+ }
+ continue
+ case *DataPageV1:
+ lvlByteLen, c.err = c.initLevelDecodersV1(p,
p.repLvlEncoding, p.defLvlEncoding)
+ if c.err != nil {
+ return false
+ }
+ case *DataPageV2:
+ lvlByteLen, c.err = c.initLevelDecodersV2(p)
+ if c.err != nil {
+ return false
+ }
+ default:
+ // we can skip non-data pages
+ continue
+ }
+
+ c.err = c.initDataDecoder(c.curPage, lvlByteLen)
+ return c.err == nil
+ }
+ c.err = c.rdr.Err()
+ return false
+}
+
+func (c *columnChunkReader) initLevelDecodersV2(page *DataPageV2) (int64,
error) {
+ c.numBuffered = int64(page.nvals)
+ c.numDecoded = 0
+ buf := page.Data()
+ totalLvlLen := int64(page.repLvlByteLen) + int64(page.defLvlByteLen)
+
+ if totalLvlLen > int64(len(buf)) {
+ return totalLvlLen, xerrors.New("parquet: data page too small
for levels (corrupt header?)")
+ }
+
+ if c.descr.MaxRepetitionLevel() > 0 {
+ c.repetitionDecoder.SetDataV2(page.repLvlByteLen,
c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
+ buf = buf[page.repLvlByteLen:]
+ }
+
+ if c.descr.MaxDefinitionLevel() > 0 {
+ c.definitionDecoder.SetDataV2(page.defLvlByteLen,
c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
+ }
+
+ return totalLvlLen, nil
+}
+
+func (c *columnChunkReader) initLevelDecodersV1(page *DataPageV1,
repLvlEncoding, defLvlEncoding format.Encoding) (int64, error) {
+ c.numBuffered = int64(page.nvals)
+ c.numDecoded = 0
+
+ buf := page.Data()
+ maxSize := len(buf)
+ levelsByteLen := int64(0)
+
+ // Data page layout: Repetition Levels - Definition Levels - encoded
values.
+ // Levels are encoded as rle or bit-packed
+ if c.descr.MaxRepetitionLevel() > 0 {
+ repBytes, err :=
c.repetitionDecoder.SetData(parquet.Encoding(repLvlEncoding),
c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
+ if err != nil {
+ return levelsByteLen, err
+ }
+ buf = buf[repBytes:]
+ maxSize -= repBytes
+ levelsByteLen += int64(repBytes)
+ }
+
+ if c.descr.MaxDefinitionLevel() > 0 {
+ defBytes, err :=
c.definitionDecoder.SetData(parquet.Encoding(defLvlEncoding),
c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
+ if err != nil {
+ return levelsByteLen, err
+ }
+ levelsByteLen += int64(defBytes)
+ maxSize -= defBytes
+ }
+
+ return levelsByteLen, nil
+}
+
+func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error
{
+ buf := page.Data()
+ if int64(len(buf)) < lvlByteLen {
+ return xerrors.New("parquet: page smaller than size of encoded
levels")
+ }
+
+ buf = buf[lvlByteLen:]
+ encoding := page.Encoding()
+
+ if isDictIndexEncoding(encoding) {
+ encoding = format.Encoding_RLE_DICTIONARY
+ }
+
+ if decoder, ok := c.decoders[encoding]; ok {
+ c.curDecoder = decoder
+ } else {
+ switch encoding {
+ case format.Encoding_PLAIN,
+ format.Encoding_DELTA_BYTE_ARRAY,
+ format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
+ format.Encoding_DELTA_BINARY_PACKED:
+ c.curDecoder =
c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
+ c.decoders[encoding] = c.curDecoder
+ case format.Encoding_RLE_DICTIONARY:
+ return xerrors.New("parquet: dictionary page must be
before data page")
+ case format.Encoding_BYTE_STREAM_SPLIT:
+ return xerrors.Errorf("parquet: unsupported data
encoding %s", encoding)
+ default:
+ return xerrors.Errorf("parquet: unknown encoding type
%s", encoding)
+ }
+ }
+
+ c.curEncoding = encoding
+ c.curDecoder.SetData(int(c.numBuffered), buf)
+ return nil
+}
+
+// readDefinitionLevels decodes the definition levels from the page and returns
+// it returns the total number of levels that were decoded (and thus populated
+// in the passed in slice) and the number of physical values that exist to read
+// (the number of levels that are equal to the max definition level).
+//
+// If the max definition level is 0, the assumption is that there no nulls in
the
+// column and therefore no definition levels to read, so it will always return
0, 0
+func (c *columnChunkReader) readDefinitionLevels(levels []int16) (totalDecoded
int, valuesToRead int64) {
+ if c.descr.MaxDefinitionLevel() == 0 {
+ return 0, 0
+ }
+
+ return c.definitionDecoder.Decode(levels)
+}
+
+// readRepetitionLevels decodes the repetition levels from the page and returns
+// the total number of values decoded (and thus populated in the passed in
levels
+// slice).
+//
+// If max repetition level is 0, it is assumed there are no repetition levels,
+// and thus will always return 0.
+func (c *columnChunkReader) readRepetitionLevels(levels []int16) int {
+ if c.descr.MaxRepetitionLevel() == 0 {
+ return 0
+ }
+
+ nlevels, _ := c.repetitionDecoder.Decode(levels)
+ return nlevels
+}
+
+// determineNumToRead reads the definition levels (and optionally populates
the repetition levels)
+// in order to determine how many values need to be read to fulfill this batch
read.
+//
+// batchLen is the number of values it is desired to read. defLvls must be
either nil (in which case
+// a buffer will be used) or must be at least batchLen in length to be safe.
repLvls should be either nil
+// (in which case it is ignored) or should be at least batchLen in length to
be safe.
+//
+// In the return values: ndef is the number of definition levels that were
actually read in which will
+// typically be the minimum of batchLen and numAvailValues.
+// toRead is the number of physical values that should be read in based on the
definition levels (the number
+// of definition levels that were equal to maxDefinitionLevel). and err being
either nil or any error encountered
+func (c *columnChunkReader) determineNumToRead(batchLen int64, defLvls,
repLvls []int16) (ndefs int, toRead int64, err error) {
+ if !c.HasNext() {
+ return 0, 0, c.err
+ }
+
+ size := utils.Min(batchLen, c.numBuffered-c.numDecoded)
+
+ if c.descr.MaxDefinitionLevel() > 0 {
+ if defLvls == nil {
+ defLvls = c.getDefLvlBuffer(size)
+ }
+ ndefs, toRead = c.readDefinitionLevels(defLvls[:size])
+ } else {
+ toRead = size
+ }
+
+ if c.descr.MaxRepetitionLevel() > 0 && repLvls != nil {
+ nreps := c.readRepetitionLevels(repLvls[:size])
+ if defLvls != nil && ndefs != nreps {
+ err = xerrors.New("parquet: number of decoded rep/def
levels did not match")
+ }
+ }
+ return
+}
+
+// skipValues some number of rows using readFn as the function to read the
data and throw it away.
+// If we can skipValues a whole page based on its metadata, then we do so,
otherwise we read the
+// page until we have skipped the number of rows desired.
+func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64,
buf []byte) (int64, error)) (int64, error) {
+ var err error
+ toskip := nvalues
+ for c.HasNext() && toskip > 0 {
+ // if number to skip is more than the number of undecoded
values, skip the page
+ if toskip > (c.numBuffered - c.numDecoded) {
+ toskip -= c.numBuffered - c.numDecoded
+ c.numDecoded = c.numBuffered
+ } else {
+ var (
+ batchSize int64 = 1024
+ valsRead int64 = 0
+ )
+
+ scratch := memory.NewResizableBuffer(c.mem)
+
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize)))
+ defer scratch.Release()
+
+ for {
+ batchSize = utils.Min(batchSize, toskip)
+ valsRead, err = readFn(batchSize, scratch.Buf())
+ toskip -= valsRead
+ if valsRead <= 0 || toskip <= 0 || err != nil {
+ break
+ }
+ }
+ }
+ }
+ if c.err != nil {
+ err = c.err
+ }
+ return nvalues - toskip, err
+}
+
+type readerFunc func(int64, int64) (int, error)
+
+// base function for reading a batch of values, this will read until it either
reads in batchSize values or
+// it hits the end of the column chunk, including reading multiple pages.
+//
+// totalValues is the total number of values which were read in, and thus
would be the total number
+// of definition levels and repetition levels which were populated (if they
were non-nil). totalRead
+// is the number of physical values that were read in (ie: the number of
non-null values)
+func (c *columnChunkReader) readBatch(batchSize int64, defLvls, repLvls
[]int16, readFn readerFunc) (totalLvls int64, totalRead int, err error) {
+ var (
+ read int
+ defs []int16
+ reps []int16
+ ndefs int
+ toRead int64
+ )
+
+ for c.HasNext() && totalLvls < batchSize && err == nil {
+ if defLvls != nil {
+ defs = defLvls[totalLvls:]
+ }
+ if repLvls != nil {
+ reps = repLvls[totalLvls:]
+ }
+ ndefs, toRead, err = c.determineNumToRead(batchSize-totalLvls,
defs, reps)
+ if err != nil {
+ return totalLvls, totalRead, err
+ }
+
+ read, err = readFn(int64(totalRead), toRead)
+ // the total number of values processed here is the maximum of
+ // the number of definition levels or the number of physical
values read.
+ // if this is a required field, ndefs will be 0 since there is
no definition
+ // levels stored with it and `read` will be the number of
values, otherwise
+ // we use ndefs since it will be equal to or greater than read.
+ totalVals := int64(utils.MaxInt(ndefs, read))
+ c.consumeBufferedValues(totalVals)
+
+ totalLvls += totalVals
+ totalRead += read
+ }
+ return totalLvls, totalRead, err
+}
diff --git a/go/parquet/file/column_reader_test.go
b/go/parquet/file/column_reader_test.go
new file mode 100644
index 0000000..d22e365
--- /dev/null
+++ b/go/parquet/file/column_reader_test.go
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file_test
+
+import (
+ "math"
+ "math/rand"
+ "reflect"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/file"
+ "github.com/apache/arrow/go/parquet/internal/testutils"
+ "github.com/apache/arrow/go/parquet/internal/utils"
+ "github.com/apache/arrow/go/parquet/schema"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/suite"
+)
+
+func initValues(values reflect.Value) {
+ if values.Kind() != reflect.Slice {
+ panic("must init values with slice")
+ }
+
+ r := rand.New(rand.NewSource(0))
+ typ := values.Type().Elem()
+ switch {
+ case typ.Bits() <= 32:
+ max := int64(math.MaxInt32)
+ min := int64(math.MinInt32)
+ for i := 0; i < values.Len(); i++ {
+ values.Index(i).Set(reflect.ValueOf(r.Int63n(max-min+1)
+ min).Convert(reflect.TypeOf(int32(0))))
+ }
+ case typ.Bits() <= 64:
+ max := int64(math.MaxInt64)
+ min := int64(math.MinInt64)
+ for i := 0; i < values.Len(); i++ {
+ values.Index(i).Set(reflect.ValueOf(r.Int63n(max-min+1)
+ min))
+ }
+ }
+}
+
+func initDictValues(values reflect.Value, numDicts int) {
+ repeatFactor := values.Len() / numDicts
+ initValues(values)
+ // add some repeated values
+ for j := 1; j < repeatFactor; j++ {
+ for i := 0; i < numDicts; i++ {
+ values.Index(numDicts*j + i).Set(values.Index(i))
+ }
+ }
+ // computed only dict_per_page * repeat_factor - 1 values < num_values
compute remaining
+ for i := numDicts * repeatFactor; i < values.Len(); i++ {
+ values.Index(i).Set(values.Index(i - numDicts*repeatFactor))
+ }
+}
+
+func makePages(version parquet.DataPageVersion, d *schema.Column, npages,
lvlsPerPage int, typ reflect.Type, enc parquet.Encoding) ([]file.Page, int,
reflect.Value, []int16, []int16) {
+ nlevels := lvlsPerPage * npages
+ nvalues := 0
+
+ maxDef := d.MaxDefinitionLevel()
+ maxRep := d.MaxRepetitionLevel()
+
+ var (
+ defLevels []int16
+ repLevels []int16
+ )
+
+ valuesPerPage := make([]int, npages)
+ if maxDef > 0 {
+ defLevels = make([]int16, nlevels)
+ testutils.FillRandomInt16(0, 0, maxDef, defLevels)
+ for idx := range valuesPerPage {
+ numPerPage := 0
+ for i := 0; i < lvlsPerPage; i++ {
+ if defLevels[i+idx*lvlsPerPage] == maxDef {
+ numPerPage++
+ nvalues++
+ }
+ }
+ valuesPerPage[idx] = numPerPage
+ }
+ } else {
+ nvalues = nlevels
+ valuesPerPage[0] = lvlsPerPage
+ for i := 1; i < len(valuesPerPage); i *= 2 {
+ copy(valuesPerPage[i:], valuesPerPage[:i])
+ }
+ }
+
+ if maxRep > 0 {
+ repLevels = make([]int16, nlevels)
+ testutils.FillRandomInt16(0, 0, maxRep, repLevels)
+ }
+
+ values := reflect.MakeSlice(reflect.SliceOf(typ), nvalues, nvalues)
+ if enc == parquet.Encodings.Plain {
+ initValues(values)
+ return testutils.PaginatePlain(version, d, values, defLevels,
repLevels, maxDef, maxRep, lvlsPerPage, valuesPerPage,
parquet.Encodings.Plain), nvalues, values, defLevels, repLevels
+ } else if enc == parquet.Encodings.PlainDict || enc ==
parquet.Encodings.RLEDict {
+ initDictValues(values, lvlsPerPage)
+ return testutils.PaginateDict(version, d, values, defLevels,
repLevels, maxDef, maxRep, lvlsPerPage, valuesPerPage,
parquet.Encodings.RLEDict), nvalues, values, defLevels, repLevels
+ }
+ panic("invalid encoding type for make pages")
+}
+
+func compareVectorWithDefLevels(left, right reflect.Value, defLevels []int16,
maxDef, maxRep int16) assert.Comparison {
+ return func() bool {
+ if left.Kind() != reflect.Slice || right.Kind() !=
reflect.Slice {
+ return false
+ }
+
+ if left.Type().Elem() != right.Type().Elem() {
+ return false
+ }
+
+ iLeft, iRight := 0, 0
+ for _, def := range defLevels {
+ if def == maxDef {
+ if
!reflect.DeepEqual(left.Index(iLeft).Interface(),
right.Index(iRight).Interface()) {
+ return false
+ }
+ iLeft++
+ iRight++
+ } else if def == (maxDef - 1) {
+ // null entry on the lowest nested level
+ iRight++
+ } else if def < (maxDef - 1) {
+ // null entry on higher nesting level, only
supported for non-repeating data
+ if maxRep == 0 {
+ iRight++
+ }
+ }
+ }
+ return true
+ }
+}
+
+var mem = memory.DefaultAllocator
+
+type PrimitiveReaderSuite struct {
+ suite.Suite
+
+ dataPageVersion parquet.DataPageVersion
+ pager file.PageReader
+ reader file.ColumnChunkReader
+ pages []file.Page
+ values reflect.Value
+ defLevels []int16
+ repLevels []int16
+ nlevels int
+ nvalues int
+ maxDefLvl int16
+ maxRepLvl int16
+}
+
+func (p *PrimitiveReaderSuite) TearDownTest() {
+ p.clear()
+}
+
+func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
+ m := new(testutils.MockPageReader)
+ m.Test(p.T())
+ m.TestData().Set("pages", p.pages)
+ m.On("Err").Return((error)(nil))
+ p.pager = m
+ p.reader = file.NewColumnReader(d, m, mem)
+}
+
+func (p *PrimitiveReaderSuite) checkResults() {
+ vresult := make([]int32, p.nvalues)
+ dresult := make([]int16, p.nlevels)
+ rresult := make([]int16, p.nlevels)
+
+ var (
+ read int64 = 0
+ totalRead int = 0
+ batchActual int = 0
+ batchSize int32 = 8
+ batch int = 0
+ )
+
+ rdr := p.reader.(*file.Int32ColumnChunkReader)
+ p.Require().NotNil(rdr)
+
+ // this will cover both cases:
+ // 1) batch size < page size (multiple ReadBatch from a single page)
+ // 2) batch size > page size (BatchRead limits to single page)
+ for {
+ read, batch, _ = rdr.ReadBatch(int64(batchSize),
vresult[totalRead:], dresult[batchActual:], rresult[batchActual:])
+ totalRead += batch
+ batchActual += int(read)
+ batchSize = int32(utils.MinInt(1<<24,
utils.MaxInt(int(batchSize*2), 4096)))
+ if batch <= 0 {
+ break
+ }
+ }
+
+ p.Equal(p.nlevels, batchActual)
+ p.Equal(p.nvalues, totalRead)
+ p.Equal(p.values.Interface(), vresult)
+ if p.maxDefLvl > 0 {
+ p.Equal(p.defLevels, dresult)
+ }
+ if p.maxRepLvl > 0 {
+ p.Equal(p.repLevels, rresult)
+ }
+
+ // catch improper writes at EOS
+ read, batchActual, _ = rdr.ReadBatch(5, vresult, nil, nil)
+ p.Zero(batchActual)
+ p.Zero(read)
+}
+
+func (p *PrimitiveReaderSuite) clear() {
+ p.values = reflect.ValueOf(nil)
+ p.defLevels = nil
+ p.repLevels = nil
+ p.pages = nil
+ p.pager = nil
+ p.reader = nil
+}
+
+func (p *PrimitiveReaderSuite) testPlain(npages, levels int, d *schema.Column)
{
+ p.pages, p.nvalues, p.values, p.defLevels, p.repLevels =
makePages(p.dataPageVersion, d, npages, levels, reflect.TypeOf(int32(0)),
parquet.Encodings.Plain)
+ p.nlevels = npages * levels
+ p.initReader(d)
+ p.checkResults()
+ p.clear()
+}
+
+func (p *PrimitiveReaderSuite) testDict(npages, levels int, d *schema.Column) {
+ p.pages, p.nvalues, p.values, p.defLevels, p.repLevels =
makePages(p.dataPageVersion, d, npages, levels, reflect.TypeOf(int32(0)),
parquet.Encodings.RLEDict)
+ p.nlevels = npages * levels
+ p.initReader(d)
+ p.checkResults()
+ p.clear()
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatRequired() {
+ const (
+ levelsPerPage int = 100
+ npages int = 50
+ )
+
+ p.maxDefLvl = 0
+ p.maxRepLvl = 0
+
+ typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+ p.testPlain(npages, levelsPerPage, d)
+ p.testDict(npages, levelsPerPage, d)
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatOptional() {
+ const (
+ levelsPerPage int = 100
+ npages int = 50
+ )
+
+ p.maxDefLvl = 4
+ p.maxRepLvl = 0
+ typ := schema.NewInt32Node("b", parquet.Repetitions.Optional, -1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+ p.testPlain(npages, levelsPerPage, d)
+ p.testDict(npages, levelsPerPage, d)
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatRepeated() {
+ const (
+ levelsPerPage int = 100
+ npages int = 50
+ )
+
+ p.maxDefLvl = 4
+ p.maxRepLvl = 2
+ typ := schema.NewInt32Node("c", parquet.Repetitions.Repeated, -1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+ p.testPlain(npages, levelsPerPage, d)
+ p.testDict(npages, levelsPerPage, d)
+}
+
+func (p *PrimitiveReaderSuite) TestReadBatchMultiPage() {
+ const (
+ levelsPerPage int = 100
+ npages int = 3
+ )
+
+ p.maxDefLvl = 0
+ p.maxRepLvl = 0
+ typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+ p.pages, p.nvalues, p.values, p.defLevels, p.repLevels =
makePages(p.dataPageVersion, d, npages, levelsPerPage,
reflect.TypeOf(int32(0)), parquet.Encodings.Plain)
+ p.initReader(d)
+
+ vresult := make([]int32, levelsPerPage*npages)
+ dresult := make([]int16, levelsPerPage*npages)
+ rresult := make([]int16, levelsPerPage*npages)
+
+ rdr := p.reader.(*file.Int32ColumnChunkReader)
+ total, read, err := rdr.ReadBatch(int64(levelsPerPage*npages), vresult,
dresult, rresult)
+ p.NoError(err)
+ p.EqualValues(levelsPerPage*npages, total)
+ p.EqualValues(levelsPerPage*npages, read)
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatRequiredSkip() {
+ const (
+ levelsPerPage int = 100
+ npages int = 5
+ )
+
+ p.maxDefLvl = 0
+ p.maxRepLvl = 0
+ typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+ p.pages, p.nvalues, p.values, p.defLevels, p.repLevels =
makePages(p.dataPageVersion, d, npages, levelsPerPage,
reflect.TypeOf(int32(0)), parquet.Encodings.Plain)
+ p.initReader(d)
+
+ vresult := make([]int32, levelsPerPage/2)
+ dresult := make([]int16, levelsPerPage/2)
+ rresult := make([]int16, levelsPerPage/2)
+
+ rdr := p.reader.(*file.Int32ColumnChunkReader)
+
+ p.Run("skip_size > page_size", func() {
+ // Skip first 2 pages
+ skipped, _ := rdr.Skip(int64(2 * levelsPerPage))
+ p.Equal(int64(2*levelsPerPage), skipped)
+
+ rdr.ReadBatch(int64(levelsPerPage/2), vresult, dresult, rresult)
+ subVals := p.values.Slice(2*levelsPerPage,
int(2.5*float64(levelsPerPage))).Interface().([]int32)
+ p.Equal(subVals, vresult)
+ })
+
+ p.Run("skip_size == page_size", func() {
+ // skip across two pages
+ skipped, _ := rdr.Skip(int64(levelsPerPage))
+ p.Equal(int64(levelsPerPage), skipped)
+ // read half a page
+ rdr.ReadBatch(int64(levelsPerPage/2), vresult, dresult, rresult)
+ subVals := p.values.Slice(int(3.5*float64(levelsPerPage)),
4*levelsPerPage).Interface().([]int32)
+ p.Equal(subVals, vresult)
+ })
+
+ p.Run("skip_size < page_size", func() {
+ // skip limited to a single page
+ // Skip half a page
+ skipped, _ := rdr.Skip(int64(levelsPerPage / 2))
+ p.Equal(int64(0.5*float32(levelsPerPage)), skipped)
+ // Read half a page
+ rdr.ReadBatch(int64(levelsPerPage/2), vresult, dresult, rresult)
+ subVals := p.values.Slice(int(4.5*float64(levelsPerPage)),
p.values.Len()).Interface().([]int32)
+ p.Equal(subVals, vresult)
+ })
+}
+
+func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() {
+ p.maxDefLvl = 0
+ p.maxRepLvl = 0
+ typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+ descr := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+ dummy := memory.NewResizableBuffer(mem)
+
+ p.Run("Dict: Plain, Data: RLEDict", func() {
+ dictPage := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.Plain)
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+
+ p.pages = append(p.pages, dictPage, dataPage)
+ p.initReader(descr)
+ p.NotPanics(func() { p.reader.HasNext() })
+ p.NoError(p.reader.Err())
+ p.pages = p.pages[:0]
+ })
+
+ p.Run("Dict: Plain Dictionary, Data: Plain Dictionary", func() {
+ dictPage := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.PlainDict)
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.PlainDict, dummy, nil, nil, 0, 0)
+ p.pages = append(p.pages, dictPage, dataPage)
+ p.initReader(descr)
+ p.NotPanics(func() { p.reader.HasNext() })
+ p.NoError(p.reader.Err())
+ p.pages = p.pages[:0]
+ })
+
+ p.Run("Panic if dict page not first", func() {
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+ p.pages = append(p.pages, dataPage)
+ p.initReader(descr)
+ p.NotPanics(func() { p.False(p.reader.HasNext()) })
+ p.Error(p.reader.Err())
+ p.pages = p.pages[:0]
+ })
+
+ p.Run("Only RLE is supported", func() {
+ dictPage := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.DeltaByteArray)
+ p.pages = append(p.pages, dictPage)
+ p.initReader(descr)
+ p.NotPanics(func() { p.False(p.reader.HasNext()) })
+ p.Error(p.reader.Err())
+ p.pages = p.pages[:0]
+ })
+
+ p.Run("Cannot have more than one dict", func() {
+ dictPage1 := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.PlainDict)
+ dictPage2 := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.Plain)
+ p.pages = append(p.pages, dictPage1, dictPage2)
+ p.initReader(descr)
+ p.NotPanics(func() { p.False(p.reader.HasNext()) })
+ p.Error(p.reader.Err())
+ p.pages = p.pages[:0]
+ })
+
+ p.Run("Unsupported encoding", func() {
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.DeltaByteArray, dummy, nil, nil, 0, 0)
+ p.pages = append(p.pages, dataPage)
+ p.initReader(descr)
+ p.Panics(func() { p.reader.HasNext() })
+ // p.Error(p.reader.Err())
+ p.pages = p.pages[:0]
+ })
+
+ p.pages = p.pages[:2]
+}
+
+func TestPrimitiveReader(t *testing.T) {
+ t.Parallel()
+ t.Run("datapage v1", func(t *testing.T) {
+ suite.Run(t, new(PrimitiveReaderSuite))
+ })
+ t.Run("datapage v2", func(t *testing.T) {
+ suite.Run(t, &PrimitiveReaderSuite{dataPageVersion:
parquet.DataPageV2})
+ })
+}
diff --git a/go/parquet/file/column_reader_types.gen.go
b/go/parquet/file/column_reader_types.gen.go
new file mode 100644
index 0000000..ab1fd53
--- /dev/null
+++ b/go/parquet/file/column_reader_types.gen.go
@@ -0,0 +1,299 @@
+// Code generated by column_reader_types.gen.go.tmpl. DO NOT EDIT.
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "unsafe"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/encoding"
+)
+
+// Int32ColumnChunkReader is the Typed Column chunk reader instance for reading
+// Int32 column data.
+type Int32ColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Int32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ arrow.Int32Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *Int32ColumnChunkReader) ReadBatch(batchSize int64, values []int32,
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.Int32Decoder).Decode(values[start : start+len])
+ })
+}
+
+// Int64ColumnChunkReader is the Typed Column chunk reader instance for reading
+// Int64 column data.
+type Int64ColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Int64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ arrow.Int64Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *Int64ColumnChunkReader) ReadBatch(batchSize int64, values []int64,
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.Int64Decoder).Decode(values[start : start+len])
+ })
+}
+
+// Int96ColumnChunkReader is the Typed Column chunk reader instance for reading
+// Int96 column data.
+type Int96ColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Int96ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ parquet.Int96Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *Int96ColumnChunkReader) ReadBatch(batchSize int64, values
[]parquet.Int96, defLvls, repLvls []int16) (total int64, valuesRead int, err
error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.Int96Decoder).Decode(values[start : start+len])
+ })
+}
+
+// Float32ColumnChunkReader is the Typed Column chunk reader instance for
reading
+// Float32 column data.
+type Float32ColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Float32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ arrow.Float32Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *Float32ColumnChunkReader) ReadBatch(batchSize int64, values
[]float32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.Float32Decoder).Decode(values[start : start+len])
+ })
+}
+
+// Float64ColumnChunkReader is the Typed Column chunk reader instance for
reading
+// Float64 column data.
+type Float64ColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Float64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ arrow.Float64Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *Float64ColumnChunkReader) ReadBatch(batchSize int64, values
[]float64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.Float64Decoder).Decode(values[start : start+len])
+ })
+}
+
+// BooleanColumnChunkReader is the Typed Column chunk reader instance for
reading
+// Boolean column data.
+type BooleanColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *BooleanColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ *(*[]bool)(unsafe.Pointer(&buf)),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *BooleanColumnChunkReader) ReadBatch(batchSize int64, values []bool,
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[start : start+len])
+ })
+}
+
+// ByteArrayColumnChunkReader is the Typed Column chunk reader instance for
reading
+// ByteArray column data.
+type ByteArrayColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ parquet.ByteArrayTraits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *ByteArrayColumnChunkReader) ReadBatch(batchSize int64, values
[]parquet.ByteArray, defLvls, repLvls []int16) (total int64, valuesRead int,
err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+ })
+}
+
+// FixedLenByteArrayColumnChunkReader is the Typed Column chunk reader
instance for reading
+// FixedLenByteArray column data.
+type FixedLenByteArrayColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues int64) (int64,
error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+
parquet.FixedLenByteArrayTraits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *FixedLenByteArrayColumnChunkReader) ReadBatch(batchSize int64,
values []parquet.FixedLenByteArray, defLvls, repLvls []int16) (total int64,
valuesRead int, err error) {
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start :
start+len])
+ })
+}
diff --git a/go/parquet/file/column_reader_types.gen.go.tmpl
b/go/parquet/file/column_reader_types.gen.go.tmpl
new file mode 100644
index 0000000..23b7d3e
--- /dev/null
+++ b/go/parquet/file/column_reader_types.gen.go.tmpl
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/encoding"
+)
+
+{{range .In}}
+// {{.Name}}ColumnChunkReader is the Typed Column chunk reader instance for
reading
+// {{.Name}} column data.
+type {{.Name}}ColumnChunkReader struct {
+ columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+ return cr.columnChunkReader.skipValues(nvalues,
+ func(batch int64, buf []byte) (int64, error) {
+ vals, _, err := cr.ReadBatch(batch,
+ {{- if ne .Name "Boolean"}}
+ {{.prefix}}.{{.Name}}Traits.CastFromBytes(buf),
+ {{- else}}
+ *(*[]bool)(unsafe.Pointer(&buf)),
+ {{- end}}
+ arrow.Int16Traits.CastFromBytes(buf),
+ arrow.Int16Traits.CastFromBytes(buf))
+ return vals, err
+ })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number
of physical values
+// that were read excluding nulls
+func (cr *{{.Name}}ColumnChunkReader) ReadBatch(batchSize int64, values
[]{{.name}}, defLvls, repLvls []int16) (total int64, valuesRead int, err error)
{
+ return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
+ return
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+ })
+}
+{{end}}
diff --git a/go/parquet/file/file_reader.go b/go/parquet/file/file_reader.go
new file mode 100644
index 0000000..8b95223
--- /dev/null
+++ b/go/parquet/file/file_reader.go
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "bytes"
+ "encoding/binary"
+ "io"
+ "os"
+
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/encryption"
+ "github.com/apache/arrow/go/parquet/metadata"
+ "golang.org/x/exp/mmap"
+ "golang.org/x/xerrors"
+)
+
+const (
+ footerSize uint32 = 8
+)
+
+var (
+ magicBytes = []byte("PAR1")
+ magicEBytes = []byte("PARE")
+ errInconsistentFileMetadata = xerrors.New("parquet: file is smaller
than indicated metadata size")
+)
+
+// Reader is the main interface for reading a parquet file
+type Reader struct {
+ r parquet.ReaderAtSeeker
+ props *parquet.ReaderProperties
+ metadata *metadata.FileMetaData
+ footerOffset int64
+ fileDecryptor encryption.FileDecryptor
+}
+
+// an adapter for mmap'd files
+type mmapAdapter struct {
+ *mmap.ReaderAt
+
+ pos int64
+}
+
+func (m *mmapAdapter) Close() error {
+ return m.ReaderAt.Close()
+}
+
+func (m *mmapAdapter) ReadAt(p []byte, off int64) (int, error) {
+ return m.ReaderAt.ReadAt(p, off)
+}
+
+func (m *mmapAdapter) Read(p []byte) (n int, err error) {
+ n, err = m.ReaderAt.ReadAt(p, m.pos)
+ m.pos += int64(n)
+ return
+}
+
+func (m *mmapAdapter) Seek(offset int64, whence int) (int64, error) {
+ newPos, offs := int64(0), offset
+ switch whence {
+ case io.SeekStart:
+ newPos = offs
+ case io.SeekCurrent:
+ newPos = m.pos + offs
+ case io.SeekEnd:
+ newPos = int64(m.ReaderAt.Len()) + offs
+ }
+ if newPos < 0 {
+ return 0, xerrors.New("negative result pos")
+ }
+ if newPos > int64(m.ReaderAt.Len()) {
+ return 0, xerrors.New("new position exceeds size of file")
+ }
+ m.pos = newPos
+ return newPos, nil
+}
+
+type ReadOption func(*Reader)
+
+// WithReadProps specifies a specific reader properties instance to use, rather
+// than using the default ReaderProperties.
+func WithReadProps(props *parquet.ReaderProperties) ReadOption {
+ return func(r *Reader) {
+ r.props = props
+ }
+}
+
+// WithMetadata allows providing a specific FileMetaData object rather than
reading
+// the file metadata from the file itself.
+func WithMetadata(m *metadata.FileMetaData) ReadOption {
+ return func(r *Reader) {
+ r.metadata = m
+ }
+}
+
+// OpenParquetFile will return a Reader for the given parquet file on the
local file system.
+//
+// Optionally the file can be memory mapped for faster reading. If no read
properties are provided
+// then the default ReaderProperties will be used. The WithMetadata option can
be used to provide
+// a FileMetaData object rather than reading the file metadata from the file.
+func OpenParquetFile(filename string, memoryMap bool, opts ...ReadOption)
(*Reader, error) {
+ var source parquet.ReaderAtSeeker
+
+ var err error
+ if memoryMap {
+ rdr, err := mmap.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ source = &mmapAdapter{rdr, 0}
+ } else {
+ source, err = os.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return NewParquetReader(source, opts...)
+}
+
+// NewParquetReader returns a FileReader instance that reads a parquet file
which can be read from r.
+// This reader needs to support Read, ReadAt and Seeking.
+//
+// If no read properties are provided then the default ReaderProperties will
be used. The WithMetadata
+// option can be used to provide a FileMetaData object rather than reading the
file metadata from the file.
+func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader,
error) {
+ var err error
+ f := &Reader{r: r}
+ for _, o := range opts {
+ o(f)
+ }
+
+ if f.footerOffset <= 0 {
+ f.footerOffset, err = r.Seek(0, io.SeekEnd)
+ if err != nil {
+ return nil, xerrors.Errorf("parquet: could not retrieve
footer offset: %w", err)
+ }
+ }
+
+ if f.props == nil {
+ f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
+ }
+
+ if f.metadata == nil {
+ return f, f.parseMetaData()
+ }
+
+ return f, nil
+}
+
+// Close will close the current reader, and if the underlying reader being used
+// is an `io.Closer` then Close will be called on it too.
+func (f *Reader) Close() error {
+ if r, ok := f.r.(io.Closer); ok {
+ return r.Close()
+ }
+ return nil
+}
+
+// MetaData returns the underlying FileMetadata object
+func (f *Reader) MetaData() *metadata.FileMetaData { return f.metadata }
+
+// parseMetaData handles parsing the metadata from the opened file.
+func (f *Reader) parseMetaData() error {
+ if f.footerOffset <= int64(footerSize) {
+ return xerrors.Errorf("parquet: file too small (size=%d)",
f.footerOffset)
+ }
+
+ buf := make([]byte, footerSize)
+ // backup 8 bytes to read the footer size (first four bytes) and the
magic bytes (last 4 bytes)
+ n, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize))
+ if err != nil {
+ return xerrors.Errorf("parquet: could not read footer: %w", err)
+ }
+ if n != len(buf) {
+ return xerrors.Errorf("parquet: could not read %d bytes from
end of file", len(buf))
+ }
+
+ size := int64(binary.LittleEndian.Uint32(buf[:4]))
+ if size < 0 || size+int64(footerSize) > f.footerOffset {
+ return errInconsistentFileMetadata
+ }
+
+ fileDecryptProps := f.props.FileDecryptProps
+
+ switch {
+ case bytes.Equal(buf[4:], magicBytes): // non-encrypted metadata
+ buf = make([]byte, size)
+ if _, err := f.r.ReadAt(buf,
f.footerOffset-int64(footerSize)-size); err != nil {
+ return xerrors.Errorf("parquet: could not read footer:
%w", err)
+ }
+
+ f.metadata, err = metadata.NewFileMetaData(buf, nil)
+ if err != nil {
+ return xerrors.Errorf("parquet: could not read footer:
%w", err)
+ }
+
+ if !f.metadata.IsSetEncryptionAlgorithm() {
+ if fileDecryptProps != nil &&
!fileDecryptProps.PlaintextFilesAllowed() {
+ return xerrors.Errorf("parquet: applying
decryption properties on plaintext file")
+ }
+ } else {
+ if err :=
f.parseMetaDataEncryptedFilePlaintextFooter(fileDecryptProps, buf); err != nil {
+ return err
+ }
+ }
+ case bytes.Equal(buf[4:], magicEBytes): // encrypted metadata
+ buf = make([]byte, size)
+ if _, err := f.r.ReadAt(buf,
f.footerOffset-int64(footerSize)-size); err != nil {
+ return xerrors.Errorf("parquet: could not read footer:
%w", err)
+ }
+
+ if fileDecryptProps == nil {
+ return xerrors.New("could not read encrypted metadata,
no decryption found in reader's properties")
+ }
+
+ fileCryptoMetadata, err := metadata.NewFileCryptoMetaData(buf)
+ if err != nil {
+ return err
+ }
+ algo := fileCryptoMetadata.EncryptionAlgorithm()
+ fileAad, err := f.handleAadPrefix(fileDecryptProps, &algo)
+ if err != nil {
+ return err
+ }
+ f.fileDecryptor = encryption.NewFileDecryptor(fileDecryptProps,
fileAad, algo.Algo, string(fileCryptoMetadata.KeyMetadata()),
f.props.Allocator())
+
+ f.metadata, err =
metadata.NewFileMetaData(buf[fileCryptoMetadata.Len():], f.fileDecryptor)
+ if err != nil {
+ return xerrors.Errorf("parquet: could not read footer:
%w", err)
+ }
+ default:
+ return xerrors.Errorf("parquet: magic bytes not found in
footer. Either the file is corrupted or this isn't a parquet file")
+ }
+
+ return nil
+}
+
+func (f *Reader) handleAadPrefix(fileDecrypt
*parquet.FileDecryptionProperties, algo *parquet.Algorithm) (string, error) {
+ aadPrefixInProps := fileDecrypt.AadPrefix()
+ aadPrefix := []byte(aadPrefixInProps)
+ fileHasAadPrefix := algo.Aad.AadPrefix != nil &&
len(algo.Aad.AadPrefix) > 0
+ aadPrefixInFile := algo.Aad.AadPrefix
+
+ if algo.Aad.SupplyAadPrefix && aadPrefixInProps == "" {
+ return "", xerrors.New("AAD Prefix used for file encryption but
not stored in file and not suppliedin decryption props")
+ }
+
+ if fileHasAadPrefix {
+ if aadPrefixInProps != "" {
+ if aadPrefixInProps != string(aadPrefixInFile) {
+ return "", xerrors.New("AAD prefix in file and
in properties but not the same")
+ }
+ }
+ aadPrefix = aadPrefixInFile
+ if fileDecrypt.Verifier != nil {
+ fileDecrypt.Verifier.Verify(string(aadPrefix))
+ }
+ } else {
+ if !algo.Aad.SupplyAadPrefix && aadPrefixInProps != "" {
+ return "", xerrors.New("AAD Prefix set in
decryptionproperties but was not used for file encryption")
+ }
+ if fileDecrypt.Verifier != nil {
+ return "", xerrors.New("AAD Prefix Verifier is set but
AAD Prefix not found in file")
+ }
+ }
+ return string(append(aadPrefix, algo.Aad.AadFileUnique...)), nil
+}
+
+func (f *Reader) parseMetaDataEncryptedFilePlaintextFooter(decryptProps
*parquet.FileDecryptionProperties, data []byte) error {
+ if decryptProps != nil {
+ algo := f.metadata.EncryptionAlgorithm()
+ fileAad, err := f.handleAadPrefix(decryptProps, &algo)
+ if err != nil {
+ return err
+ }
+ f.fileDecryptor = encryption.NewFileDecryptor(decryptProps,
fileAad, algo.Algo, string(f.metadata.GetFooterSigningKeyMetadata()),
f.props.Allocator())
+ // set the InternalFileDecryptor in the metadata as well, as
it's used
+ // for signature verification and for ColumnChunkMetaData
creation.
+ f.metadata.FileDecryptor = f.fileDecryptor
+ if decryptProps.PlaintextFooterIntegrity() {
+ if len(data)-f.metadata.Size() !=
encryption.GcmTagLength+encryption.NonceLength {
+ return xerrors.New("failed reading metadata for
encryption signature")
+ }
+
+ if
!f.metadata.VerifySignature(data[f.metadata.Size():]) {
+ return xerrors.New("parquet crypto signature
verification failed")
+ }
+ }
+ }
+ return nil
+}
+
+// WriterVersion returns the Application Version that was written in the file
+// metadata
+func (f *Reader) WriterVersion() *metadata.AppVersion {
+ return f.metadata.WriterVersion()
+}
+
+// NumRows returns the total number of rows in this parquet file.
+func (f *Reader) NumRows() int64 {
+ return f.metadata.GetNumRows()
+}
+
+// NumRowGroups returns the total number of row groups in this file.
+func (f *Reader) NumRowGroups() int {
+ return len(f.metadata.GetRowGroups())
+}
+
+// RowGroup returns a reader for the desired (0-based) row group
+func (f *Reader) RowGroup(i int) *RowGroupReader {
+ rg := f.metadata.RowGroups[i]
+
+ return &RowGroupReader{
+ fileMetadata: f.metadata,
+ rgMetadata: metadata.NewRowGroupMetaData(rg,
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
+ props: f.props,
+ r: f.r,
+ sourceSz: f.footerOffset,
+ fileDecryptor: f.fileDecryptor,
+ }
+}
diff --git a/go/parquet/file/file_reader_test.go
b/go/parquet/file/file_reader_test.go
new file mode 100644
index 0000000..6dfb1fa
--- /dev/null
+++ b/go/parquet/file/file_reader_test.go
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file_test
+
+import (
+ "bytes"
+ "encoding/binary"
+ "io"
+ "math/rand"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/apache/arrow/go/parquet/compress"
+ "github.com/apache/arrow/go/parquet/file"
+ "github.com/apache/arrow/go/parquet/internal/encoding"
+ format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/thrift"
+ "github.com/apache/arrow/go/parquet/metadata"
+ libthrift "github.com/apache/thrift/lib/go/thrift"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/suite"
+)
+
+func getDummyStats(statSize int, fillAll bool) *format.Statistics {
+ statBytes := make([]byte, statSize)
+ memory.Set(statBytes, 1)
+
+ ret := format.NewStatistics()
+ ret.Max = statBytes
+ if fillAll {
+ ret.Min = statBytes
+ ret.NullCount = libthrift.Int64Ptr(42)
+ ret.DistinctCount = libthrift.Int64Ptr(1)
+ }
+ return ret
+}
+
+func checkStatistics(t *testing.T, stats format.Statistics, actual
metadata.EncodedStatistics) {
+ if stats.IsSetMax() {
+ assert.Equal(t, stats.Max, actual.Max)
+ }
+ if stats.IsSetMin() {
+ assert.Equal(t, stats.Min, actual.Min)
+ }
+ if stats.IsSetNullCount() {
+ assert.Equal(t, stats.GetNullCount(), actual.NullCount)
+ }
+ if stats.IsSetDistinctCount() {
+ assert.Equal(t, stats.GetDistinctCount(), actual.DistinctCount)
+ }
+}
+
+type PageSerdeSuite struct {
+ suite.Suite
+
+ sink *encoding.BufferWriter
+ buffer *memory.Buffer
+
+ pageHdr format.PageHeader
+ dataPageHdr format.DataPageHeader
+ dataPageHdrV2 format.DataPageHeaderV2
+
+ pageReader file.PageReader
+}
+
+func TestFileDeserializing(t *testing.T) {
+ t.Parallel()
+ suite.Run(t, new(PageSerdeSuite))
+}
+
+func (p *PageSerdeSuite) ResetStream() {
+ p.sink = encoding.NewBufferWriter(0, memory.DefaultAllocator)
+}
+
+func (p *PageSerdeSuite) EndStream() {
+ p.buffer = p.sink.Finish()
+}
+
+func (p *PageSerdeSuite) SetupTest() {
+ p.dataPageHdr.Encoding = format.Encoding_PLAIN
+ p.dataPageHdr.DefinitionLevelEncoding = format.Encoding_RLE
+ p.dataPageHdr.RepetitionLevelEncoding = format.Encoding_RLE
+
+ p.ResetStream()
+}
+
+func (p *PageSerdeSuite) InitSerializedPageReader(nrows int64, codec
compress.Compression) {
+ p.EndStream()
+
+ p.pageReader, _ = file.NewPageReader(bytes.NewReader(p.buffer.Bytes()),
nrows, codec, memory.DefaultAllocator, nil)
+}
+
+func (p *PageSerdeSuite) WriteDataPageHeader(maxSerialized int, uncompressed,
compressed int32) {
+ // simplifying writing serialized data page headers which may or may
+ // not have meaningful data associated with them
+
+ p.pageHdr.DataPageHeader = &p.dataPageHdr
+ p.pageHdr.UncompressedPageSize = uncompressed
+ p.pageHdr.CompressedPageSize = compressed
+ p.pageHdr.Type = format.PageType_DATA_PAGE
+
+ serializer := thrift.NewThriftSerializer()
+ p.NotPanics(func() {
+ serializer.Serialize(&p.pageHdr, p.sink, nil)
+ })
+}
+
+func (p *PageSerdeSuite) WriteDataPageHeaderV2(maxSerialized int,
uncompressed, compressed int32) {
+ p.pageHdr.DataPageHeaderV2 = &p.dataPageHdrV2
+ p.pageHdr.UncompressedPageSize = uncompressed
+ p.pageHdr.CompressedPageSize = compressed
+ p.pageHdr.Type = format.PageType_DATA_PAGE_V2
+
+ serializer := thrift.NewThriftSerializer()
+ p.NotPanics(func() {
+ serializer.Serialize(&p.pageHdr, p.sink, nil)
+ })
+}
+
+func (p *PageSerdeSuite) CheckDataPageHeader(expected format.DataPageHeader,
page file.Page) {
+ p.Equal(format.PageType_DATA_PAGE, page.Type())
+
+ p.IsType(&file.DataPageV1{}, page)
+ p.Equal(expected.NumValues, page.NumValues())
+ p.Equal(expected.Encoding, page.Encoding())
+ p.EqualValues(expected.DefinitionLevelEncoding,
page.(*file.DataPageV1).DefinitionLevelEncoding())
+ p.EqualValues(expected.RepetitionLevelEncoding,
page.(*file.DataPageV1).RepetitionLevelEncoding())
+ checkStatistics(p.T(), *expected.Statistics,
page.(file.DataPage).Statistics())
+}
+
+func (p *PageSerdeSuite) CheckDataPageHeaderV2(expected
format.DataPageHeaderV2, page file.Page) {
+ p.Equal(format.PageType_DATA_PAGE_V2, page.Type())
+
+ p.IsType(&file.DataPageV2{}, page)
+ p.Equal(expected.NumValues, page.NumValues())
+ p.Equal(expected.Encoding, page.Encoding())
+ p.Equal(expected.NumNulls, page.(*file.DataPageV2).NumNulls())
+ p.Equal(expected.DefinitionLevelsByteLength,
page.(*file.DataPageV2).DefinitionLevelByteLen())
+ p.Equal(expected.RepetitionLevelsByteLength,
page.(*file.DataPageV2).RepetitionLevelByteLen())
+ p.Equal(expected.IsCompressed, page.(*file.DataPageV2).IsCompressed())
+ checkStatistics(p.T(), *expected.Statistics,
page.(file.DataPage).Statistics())
+}
+
+func (p *PageSerdeSuite) TestDataPageV1() {
+ const (
+ statsSize = 512
+ nrows = 4444
+ )
+ p.dataPageHdr.Statistics = getDummyStats(statsSize, true)
+ p.dataPageHdr.NumValues = nrows
+
+ p.WriteDataPageHeader(1024, 0, 0)
+ p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+ p.True(p.pageReader.Next())
+ currentPage := p.pageReader.Page()
+ p.CheckDataPageHeader(p.dataPageHdr, currentPage)
+}
+
+func (p *PageSerdeSuite) TestDataPageV2() {
+ const (
+ statsSize = 512
+ nrows = 4444
+ )
+ p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
+ p.dataPageHdrV2.NumValues = nrows
+ p.WriteDataPageHeaderV2(1024, 0, 0)
+ p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+ p.True(p.pageReader.Next())
+ p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
+}
+
+func (p *PageSerdeSuite) TestLargePageHeaders() {
+ const (
+ statsSize = 256 * 1024 // 256KB
+ nrows = 4141
+ maxHeaderSize = 512 * 1024 // 512KB
+ )
+
+ p.dataPageHdr.Statistics = getDummyStats(statsSize, false)
+ p.dataPageHdr.NumValues = nrows
+ p.WriteDataPageHeader(maxHeaderSize, 0, 0)
+ pos, err := p.sink.Seek(0, io.SeekCurrent)
+ p.NoError(err)
+ p.GreaterOrEqual(maxHeaderSize, int(pos))
+ p.LessOrEqual(statsSize, int(pos))
+ p.GreaterOrEqual(16*1024*1024, int(pos))
+
+ p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+ p.True(p.pageReader.Next())
+ p.CheckDataPageHeader(p.dataPageHdr, p.pageReader.Page())
+}
+
+func (p *PageSerdeSuite) TestFailLargePageHeaders() {
+ const (
+ statsSize = 256 * 1024 // 256KB
+ nrows = 1337 // dummy value
+ maxHeaderSize = 512 * 1024 // 512 KB
+ smallerMaxSize = 128 * 1024 // 128KB
+ )
+ p.dataPageHdr.Statistics = getDummyStats(statsSize, false)
+ p.WriteDataPageHeader(maxHeaderSize, 0, 0)
+ pos, err := p.sink.Seek(0, io.SeekCurrent)
+ p.NoError(err)
+ p.GreaterOrEqual(maxHeaderSize, int(pos))
+
+ p.LessOrEqual(smallerMaxSize, int(pos))
+ p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+ p.pageReader.SetMaxPageHeaderSize(smallerMaxSize)
+ p.NotPanics(func() { p.False(p.pageReader.Next()) })
+ p.Error(p.pageReader.Err())
+}
+
+func (p *PageSerdeSuite) TestCompression() {
+ codecs := []compress.Compression{
+ compress.Codecs.Snappy,
+ compress.Codecs.Brotli,
+ compress.Codecs.Gzip,
+ // compress.Codecs.Lz4, // not yet implemented
+ compress.Codecs.Zstd,
+ }
+
+ const (
+ nrows = 32 // dummy value
+ npages = 10
+ )
+ p.dataPageHdr.NumValues = nrows
+
+ fauxData := make([][]byte, npages)
+ for idx := range fauxData {
+ // each page is larger
+ fauxData[idx] = make([]byte, (idx+1)*64)
+ rand.Read(fauxData[idx])
+ }
+ for _, c := range codecs {
+ p.Run(c.String(), func() {
+ codec, _ := compress.GetCodec(c)
+ for _, data := range fauxData {
+ maxCompressed :=
codec.CompressBound(int64(len(data)))
+ buffer := make([]byte, maxCompressed)
+ buffer = codec.Encode(buffer, data)
+ p.WriteDataPageHeader(1024, int32(len(data)),
int32(len(buffer)))
+ _, err := p.sink.Write(buffer)
+ p.NoError(err)
+ }
+
+ p.InitSerializedPageReader(nrows*npages, c)
+
+ for _, data := range fauxData {
+ p.True(p.pageReader.Next())
+ page := p.pageReader.Page()
+ p.IsType(&file.DataPageV1{}, page)
+ p.Equal(data, page.Data())
+ }
+ p.ResetStream()
+ })
+ }
+}
+
+func TestInvalidHeaders(t *testing.T) {
+ badHeader := []byte("PAR2")
+ _, err := file.NewParquetReader(bytes.NewReader(badHeader))
+ assert.Error(t, err)
+}
+
+func TestInvalidFooter(t *testing.T) {
+ // file is smaller than FOOTER_SIZE
+ badFile := []byte("PAR1PAR")
+ _, err := file.NewParquetReader(bytes.NewReader(badFile))
+ assert.Error(t, err)
+
+ // Magic Number Incorrect
+ badFile2 := []byte("PAR1PAR2")
+ _, err = file.NewParquetReader(bytes.NewReader(badFile2))
+ assert.Error(t, err)
+}
+
+func TestIncompleteMetadata(t *testing.T) {
+ sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+ magic := []byte("PAR1")
+
+ sink.Write(magic)
+ sink.Write(make([]byte, 10))
+ const metadataLen = 24
+ binary.Write(sink, binary.LittleEndian, uint32(metadataLen))
+ sink.Write(magic)
+ buf := sink.Finish()
+ defer buf.Release()
+ _, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ assert.Error(t, err)
+}
diff --git a/go/parquet/file/level_conversion.go
b/go/parquet/file/level_conversion.go
new file mode 100644
index 0000000..6c56c13
--- /dev/null
+++ b/go/parquet/file/level_conversion.go
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "math"
+ "math/bits"
+ "unsafe"
+
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/bmi"
+ "github.com/apache/arrow/go/parquet/internal/utils"
+ "github.com/apache/arrow/go/parquet/schema"
+ "golang.org/x/xerrors"
+)
+
+type LevelInfo struct {
+ // How many slots an undefined but present (i.e. null) element in
+ // parquet consumes when decoding to Arrow.
+ // "Slot" is used in the same context as the Arrow specification
+ // (i.e. a value holder).
+ // This is only ever >1 for descendents of FixedSizeList.
+ NullSlotUsage int32
+ // The definition level at which the value for the field
+ // is considered not null (definition levels greater than
+ // or equal to this value indicate a not-null
+ // value for the field). For list fields definition levels
+ // greater than or equal to this field indicate a present,
+ // possibly null, child value.
+ DefLevel int16
+ // The repetition level corresponding to this element
+ // or the closest repeated ancestor. Any repetition
+ // level less than this indicates either a new list OR
+ // an empty list (which is determined in conjunction
+ // with definition levels).
+ RepLevel int16
+ // The definition level indicating the level at which the closest
+ // repeated ancestor is not empty. This is used to discriminate
+ // between a value less than |def_level| being null or excluded
entirely.
+ // For instance if we have an arrow schema like:
+ // list(struct(f0: int)). Then then there are the following
+ // definition levels:
+ // 0 = null list
+ // 1 = present but empty list.
+ // 2 = a null value in the list
+ // 3 = a non null struct but null integer.
+ // 4 = a present integer.
+ // When reconstructing, the struct and integer arrays'
+ // repeated_ancestor_def_level would be 2. Any
+ // def_level < 2 indicates that there isn't a corresponding
+ // child value in the list.
+ // i.e. [null, [], [null], [{f0: null}], [{f0: 1}]]
+ // has the def levels [0, 1, 2, 3, 4]. The actual
+ // struct array is only of length 3: [not-set, set, set] and
+ // the int array is also of length 3: [N/A, null, 1].
+ RepeatedAncestorDefLevel int16
+}
+
+func newDefaultLevelInfo() *LevelInfo {
+ return &LevelInfo{NullSlotUsage: 1}
+}
+
+func (l *LevelInfo) Equal(rhs *LevelInfo) bool {
+ return l.NullSlotUsage == rhs.NullSlotUsage &&
+ l.DefLevel == rhs.DefLevel &&
+ l.RepLevel == rhs.RepLevel &&
+ l.RepeatedAncestorDefLevel == rhs.RepeatedAncestorDefLevel
+}
+
+func (l *LevelInfo) HasNullableValues() bool {
+ return l.RepeatedAncestorDefLevel < l.DefLevel
+}
+
+func (l *LevelInfo) IncrementOptional() {
+ l.DefLevel++
+}
+
+func (l *LevelInfo) IncrementRepeated() int16 {
+ lastRepAncestor := l.RepeatedAncestorDefLevel
+ // Repeated fields add both a repetition and definition level. This is
used
+ // to distinguish between an empty list and a list with an item in it.
+ l.RepLevel++
+ l.DefLevel++
+
+ // For levels >= repeated_ancenstor_def_level it indicates the list was
+ // non-null and had at least one element. This is important
+ // for later decoding because we need to add a slot for these
+ // values. for levels < current_def_level no slots are added
+ // to arrays.
+ l.RepeatedAncestorDefLevel = l.DefLevel
+ return lastRepAncestor
+}
+
+func (l *LevelInfo) Increment(n schema.Node) {
+ switch n.RepetitionType() {
+ case parquet.Repetitions.Repeated:
+ l.IncrementRepeated()
+ case parquet.Repetitions.Optional:
+ l.IncrementOptional()
+ }
+}
+
+// Input/Output structure for reconstructed validity bitmaps.
+type ValidityBitmapInputOutput struct {
+ // Input only.
+ // The maximum number of values_read expected (actual
+ // values read must be less than or equal to this value).
+ // If this number is exceeded methods will throw a
+ // ParquetException. Exceeding this limit indicates
+ // either a corrupt or incorrectly written file.
+ ReadUpperBound int64
+ // Output only. The number of values added to the encountered
+ // (this is logically the count of the number of elements
+ // for an Arrow array).
+ Read int64
+ // Input/Output. The number of nulls encountered.
+ NullCount int64
+ // Output only. The validity bitmap to populate. May be be null only
+ // for DefRepLevelsToListInfo (if all that is needed is list offsets).
+ ValidBits []byte
+ // Input only, offset into valid_bits to start at.
+ ValidBitsOffset int64
+}
+
+const extractBitsSize int64 = 8 * int64(unsafe.Sizeof(uint64(0)))
+
+// create a bitmap out of the definition Levels and return the number of
non-null values
+func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info
LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) uint64 {
+ definedBitmap := bmi.GreaterThanBitmap(defLevels, info.DefLevel-1)
+
+ if hasRepeatedParent {
+ // Greater than level_info.repeated_ancestor_def_level - 1
implies >= the
+ // repeated_ancestor_def_level
+ presentBitmap := bmi.GreaterThanBitmap(defLevels,
info.RepeatedAncestorDefLevel-1)
+ selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap)
+ selectedCount := int64(bits.OnesCount64(presentBitmap))
+ if selectedCount > remainingUpperBound {
+ panic("values read exceeded upper bound")
+ }
+ wr.AppendWord(selectedBits, selectedCount)
+ return uint64(bits.OnesCount64(selectedBits))
+ }
+
+ if int64(len(defLevels)) > remainingUpperBound {
+ panic("values read exceed upper bound")
+ }
+
+ wr.AppendWord(definedBitmap, int64(len(defLevels)))
+ return uint64(bits.OnesCount64(definedBitmap))
+}
+
+// create a bitmap out of the definition Levels
+func defLevelsToBitmapInternal(defLevels []int16, info LevelInfo, out
*ValidityBitmapInputOutput, hasRepeatedParent bool) {
+ wr := utils.NewFirstTimeBitmapWriter(out.ValidBits,
out.ValidBitsOffset, int64(len(defLevels)))
+ defer wr.Finish()
+ setCount := defLevelsBatchToBitmap(defLevels, out.ReadUpperBound, info,
wr, hasRepeatedParent)
+ out.Read = int64(wr.Pos())
+ out.NullCount += out.Read - int64(setCount)
+}
+
+// DefLevelsToBitmap creates a validitybitmap out of the passed in definition
levels and info object.
+func DefLevelsToBitmap(defLevels []int16, info LevelInfo, out
*ValidityBitmapInputOutput) {
+ hasRepeatedParent := false
+ if info.RepLevel > 0 {
+ hasRepeatedParent = true
+ }
+ defLevelsToBitmapInternal(defLevels, info, out, hasRepeatedParent)
+}
+
+// DefRepLevelsToListInfo takes in the definition and repetition levels in
order to populate the validity bitmap
+// and properly handle nested lists and update the offsets for them.
+func DefRepLevelsToListInfo(defLevels, repLevels []int16, info LevelInfo, out
*ValidityBitmapInputOutput, offsets []int32) error {
+ var wr utils.BitmapWriter
+ if out.ValidBits != nil {
+ wr = utils.NewFirstTimeBitmapWriter(out.ValidBits,
out.ValidBitsOffset, out.ReadUpperBound)
+ defer wr.Finish()
+ }
+ offsetPos := 0
+ for idx := range defLevels {
+ // skip items that belong to empty or null ancestor lists and
further nested lists
+ if defLevels[idx] < info.RepeatedAncestorDefLevel ||
repLevels[idx] > info.RepLevel {
+ continue
+ }
+
+ if repLevels[idx] == info.RepLevel {
+ // continuation of an existing list.
+ // offsets can be null for structs with repeated
children
+ if offsetPos < len(offsets) {
+ if offsets[offsetPos] == math.MaxInt32 {
+ return xerrors.New("list index
overflow")
+ }
+ offsets[offsetPos]++
+ }
+ } else {
+ if (wr != nil && int64(wr.Pos()) >= out.ReadUpperBound)
|| (offsetPos >= int(out.ReadUpperBound)) {
+ return xerrors.Errorf("definition levels
exceeded upper bound: %d", out.ReadUpperBound)
+ }
+
+ // current_rep < list rep_level i.e. start of a list
(ancestor empty lists
+ // are filtered out above)
+ // offsets can be null for structs with repeated
children
+ if offsetPos+1 < len(offsets) {
+ offsetPos++
+ // use cumulative offsets because variable size
lists are more common
+ // than fixed size lists so it should be
cheaper to make these
+ // cumulative and subtract when validating
fixed size lists
+ offsets[offsetPos] = offsets[offsetPos-1]
+ if defLevels[idx] >= info.DefLevel {
+ if offsets[offsetPos] == math.MaxInt32 {
+ return xerrors.New("list index
overflow")
+ }
+ offsets[offsetPos]++
+ }
+ }
+
+ if wr != nil {
+ // the level info def level for lists reflects
element present level
+ // the prior level distinguishes between empty
lists
+ if defLevels[idx] >= info.DefLevel-1 {
+ wr.Set()
+ } else {
+ out.NullCount++
+ wr.Clear()
+ }
+ wr.Next()
+ }
+ }
+ }
+
+ if len(offsets) > 0 {
+ out.Read = int64(offsetPos)
+ } else if wr != nil {
+ out.Read = int64(wr.Pos())
+ }
+
+ if out.NullCount > 0 && info.NullSlotUsage > 1 {
+ return xerrors.New("null values with null_slot_usage > 1 not
supported.")
+ }
+ return nil
+}
+
+// DefRepLevelsToBitmap constructs a full validitybitmap out of the definition
and repetition levels
+// properly handling nested lists and parents.
+func DefRepLevelsToBitmap(defLevels, repLevels []int16, info LevelInfo, out
*ValidityBitmapInputOutput) error {
+ info.RepLevel++
+ info.DefLevel++
+ return DefRepLevelsToListInfo(defLevels, repLevels, info, out, nil)
+}
diff --git a/go/parquet/file/level_conversion_test.go
b/go/parquet/file/level_conversion_test.go
new file mode 100644
index 0000000..08d2fe3
--- /dev/null
+++ b/go/parquet/file/level_conversion_test.go
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow/bitutil"
+ "github.com/apache/arrow/go/parquet/internal/bmi"
+ "github.com/apache/arrow/go/parquet/internal/utils"
+ "github.com/stretchr/testify/assert"
+)
+
+func bitmapToString(bitmap []byte, bitCount int64) string {
+ var bld strings.Builder
+ bld.Grow(int(bitCount))
+ for i := 0; i < int(bitCount); i++ {
+ if bitutil.BitIsSet(bitmap, i) {
+ bld.WriteByte('1')
+ } else {
+ bld.WriteByte('0')
+ }
+ }
+ return bld.String()
+}
+
+func TestDefLevelsToBitmap(t *testing.T) {
+ defLevels := []int16{3, 3, 3, 2, 3, 3, 3, 3, 3}
+ validBits := []byte{2, 0}
+
+ var info LevelInfo
+ info.DefLevel = 3
+ info.RepLevel = 1
+
+ var io ValidityBitmapInputOutput
+ io.ReadUpperBound = int64(len(defLevels))
+ io.Read = -1
+ io.ValidBits = validBits
+
+ DefLevelsToBitmap(defLevels, info, &io)
+ assert.Equal(t, int64(9), io.Read)
+ assert.Equal(t, int64(1), io.NullCount)
+
+ // call again with 0 definition levels make sure that valid bits is
unmodified
+ curByte := validBits[1]
+ io.NullCount = 0
+ DefLevelsToBitmap(defLevels[:0], info, &io)
+
+ assert.Zero(t, io.Read)
+ assert.Zero(t, io.NullCount)
+ assert.Equal(t, curByte, validBits[1])
+}
+
+func TestDefLevelstToBitmapPowerOf2(t *testing.T) {
+ defLevels := []int16{3, 3, 3, 2, 3, 3, 3, 3}
+ validBits := []byte{1, 0}
+
+ var (
+ info LevelInfo
+ io ValidityBitmapInputOutput
+ )
+
+ info.RepLevel = 1
+ info.DefLevel = 3
+ io.Read = -1
+ io.ReadUpperBound = int64(len(defLevels))
+ io.ValidBits = validBits
+
+ DefLevelsToBitmap(defLevels[4:8], info, &io)
+ assert.Equal(t, int64(4), io.Read)
+ assert.Zero(t, io.NullCount)
+}
+
+func TestGreaterThanBitmapGeneratesExpectedBitmasks(t *testing.T) {
+ defLevels := []int16{
+ 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+ 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+ 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+ 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}
+
+ tests := []struct {
+ name string
+ num int
+ rhs int16
+ expected uint64
+ }{
+ {"no levels", 0, 0, 0},
+ {"64 and 8", 64, 8, 0},
+ {"64 and -1", 64, -1, 0xFFFFFFFFFFFFFFFF},
+ // should be zero padded
+ {"zero pad 47, -1", 47, -1, 0x7FFFFFFFFFFF},
+ {"zero pad 64 and 6", 64, 6, 0x8080808080808080},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert.Equal(t, tt.expected,
bmi.GreaterThanBitmap(defLevels[:tt.num], tt.rhs))
+ })
+ }
+}
+
+func TestWithRepetitionlevelFiltersOutEmptyListValues(t *testing.T) {
+ validityBitmap := make([]byte, 8)
+ io := ValidityBitmapInputOutput{
+ ReadUpperBound: 64,
+ Read: 1,
+ NullCount: 5,
+ ValidBits: validityBitmap,
+ ValidBitsOffset: 1,
+ }
+
+ info := LevelInfo{
+ RepeatedAncestorDefLevel: 1,
+ DefLevel: 2,
+ RepLevel: 1,
+ }
+
+ defLevels := []int16{0, 0, 0, 2, 2, 1, 0, 2}
+ DefLevelsToBitmap(defLevels, info, &io)
+
+ assert.Equal(t, bitmapToString(validityBitmap, 8), "01101000")
+ for _, x := range validityBitmap[1:] {
+ assert.Zero(t, x)
+ }
+ assert.EqualValues(t, 6, io.NullCount)
+ assert.EqualValues(t, 4, io.Read)
+}
+
+type MultiLevelTestData struct {
+ defLevels []int16
+ repLevels []int16
+}
+
+func TriplNestedList() MultiLevelTestData {
+ // Triply nested list values borrow from write_path
+ // [null, [[1, null, 3], []], []],
+ // [[[]], [[], [1, 2]], null, [[3]]],
+ // null,
+ // []
+ return MultiLevelTestData{
+ defLevels: []int16{2, 7, 6, 7, 5, 3, // first row
+ 5, 5, 7, 7, 2, 7, // second row
+ 0, // third row
+ 1},
+ repLevels: []int16{0, 1, 3, 3, 2, 1, // first row
+ 0, 1, 2, 3, 1, 1, // second row
+ 0, 0},
+ }
+}
+
+func TestActualCase(t *testing.T) {
+ out := make([]byte, 512)
+ defs := make([]int16, 64)
+ for i := range defs {
+ defs[i] = 3
+ }
+
+ defs[0] = 0
+ defs[25] = 0
+ defs[33] = 0
+ defs[49] = 0
+ defs[58] = 0
+ defs[59] = 0
+ defs[60] = 0
+ defs[61] = 0
+
+ remaining := int64(4096)
+ info := LevelInfo{
+ NullSlotUsage: 0,
+ DefLevel: 3,
+ RepLevel: 1,
+ RepeatedAncestorDefLevel: 2,
+ }
+
+ wr := utils.NewFirstTimeBitmapWriter(out, 0, 4096)
+ v := defLevelsBatchToBitmap(defs, remaining, info, wr, true)
+ assert.EqualValues(t, 56, v)
+ assert.Equal(t, []byte{255, 255, 255, 255}, out[:4])
+}
diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
new file mode 100644
index 0000000..251499a
--- /dev/null
+++ b/go/parquet/file/page_reader.go
@@ -0,0 +1,620 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "bytes"
+ "io"
+ "sync"
+
+ "github.com/JohnCGriffin/overflow"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/compress"
+ "github.com/apache/arrow/go/parquet/internal/debug"
+ "github.com/apache/arrow/go/parquet/internal/encryption"
+ format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/thrift"
+ "github.com/apache/arrow/go/parquet/metadata"
+ "golang.org/x/xerrors"
+)
+
+// PageReader is the interface used by the columnreader in order to read
+// and handle DataPages and loop through them.
+type PageReader interface {
+ // Set the maximum Page header size allowed to be read
+ SetMaxPageHeaderSize(int)
+ // Return the current page, or nil if there are no more
+ Page() Page
+ // Fetch the next page, returns false if there are no more pages
+ Next() bool
+ // if Next returns false, Err will return the error encountered or
+ // nil if there was no error and you just hit the end of the page
+ Err() error
+ // Reset allows reusing a page reader
+ Reset(r parquet.ReaderAtSeeker, nrows int64, compressType
compress.Compression, ctx *CryptoContext)
+}
+
+// Page is an interface for handling DataPages or Dictionary Pages
+type Page interface {
+ // Returns which kind of page this is
+ Type() format.PageType
+ // Get the raw bytes of this page
+ Data() []byte
+ // return the encoding used for this page, Plain/RLE, etc.
+ Encoding() format.Encoding
+ // get the number of values in this page
+ NumValues() int32
+ // release this page object back into the page pool for re-use
+ Release()
+}
+
+type page struct {
+ buf *memory.Buffer
+ typ format.PageType
+
+ nvals int32
+ encoding format.Encoding
+}
+
+func (p *page) Type() format.PageType { return p.typ }
+func (p *page) Data() []byte { return p.buf.Bytes() }
+func (p *page) NumValues() int32 { return p.nvals }
+func (p *page) Encoding() format.Encoding { return p.encoding }
+
+// DataPage is the base interface for both DataPageV1 and DataPageV2 of the
+// parquet spec.
+type DataPage interface {
+ Page
+ UncompressedSize() int64
+ Statistics() metadata.EncodedStatistics
+}
+
+// Create some pools to use for reusing the data page objects themselves so
that
+// we can avoid tight loops that are creating and destroying tons of individual
+// objects. This combined with a Release function on the pages themselves
+// which will put them back into the pool yields significant memory reduction
+// and performance benefits
+
+var dataPageV1Pool = sync.Pool{
+ New: func() interface{} { return (*DataPageV1)(nil) },
+}
+
+var dataPageV2Pool = sync.Pool{
+ New: func() interface{} { return (*DataPageV2)(nil) },
+}
+
+var dictPagePool = sync.Pool{
+ New: func() interface{} { return (*DictionaryPage)(nil) },
+}
+
+// DataPageV1 represents a DataPage version 1 from the parquet.thrift file
+type DataPageV1 struct {
+ page
+
+ defLvlEncoding format.Encoding
+ repLvlEncoding format.Encoding
+ uncompressedSize int64
+ statistics metadata.EncodedStatistics
+}
+
+// NewDataPageV1 returns a V1 data page with the given buffer as its data and
the specified encoding information
+//
+// Will utilize objects that have been released back into the data page pool
and
+// re-use them if available as opposed to creating new objects. Calling
Release on the
+// data page object will release it back to the pool for re-use.
+func NewDataPageV1(buffer *memory.Buffer, num int32, encoding, defEncoding,
repEncoding parquet.Encoding, uncompressedSize int64) *DataPageV1 {
+ dp := dataPageV1Pool.Get().(*DataPageV1)
+ if dp == nil {
+ return &DataPageV1{
+ page: page{buf: buffer, typ:
format.PageType_DATA_PAGE, nvals: num, encoding: format.Encoding(encoding)},
+ defLvlEncoding: format.Encoding(defEncoding),
+ repLvlEncoding: format.Encoding(repEncoding),
+ uncompressedSize: uncompressedSize,
+ }
+ }
+
+ dp.buf, dp.nvals = buffer, num
+ dp.encoding = format.Encoding(encoding)
+ dp.defLvlEncoding, dp.repLvlEncoding = format.Encoding(defEncoding),
format.Encoding(repEncoding)
+ dp.statistics.HasMax, dp.statistics.HasMin = false, false
+ dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false,
false
+ dp.uncompressedSize = uncompressedSize
+ return dp
+}
+
+// NewDataPageV1WithStats is the same as NewDataPageV1, but also allows adding
the stat info into the created page
+func NewDataPageV1WithStats(buffer *memory.Buffer, num int32, encoding,
defEncoding, repEncoding parquet.Encoding, uncompressedSize int64, stats
metadata.EncodedStatistics) *DataPageV1 {
+ ret := NewDataPageV1(buffer, num, encoding, defEncoding, repEncoding,
uncompressedSize)
+ ret.statistics = stats
+ return ret
+}
+
+// Release this page back into the DataPage object pool so that it can be
reused.
+//
+// After calling this function, the object should not be utilized anymore,
otherwise
+// conflicts can arise.
+func (d *DataPageV1) Release() {
+ d.buf.Release()
+ d.buf = nil
+ dataPageV1Pool.Put(d)
+}
+
+// UncompressedSize returns the size of the data in this data page when
uncompressed
+func (d *DataPageV1) UncompressedSize() int64 { return d.uncompressedSize }
+
+// Statistics returns the encoded statistics on this data page
+func (d *DataPageV1) Statistics() metadata.EncodedStatistics { return
d.statistics }
+
+// DefinitionLevelEncoding returns the encoding utilized for the Definition
Levels
+func (d *DataPageV1) DefinitionLevelEncoding() parquet.Encoding {
+ return parquet.Encoding(d.defLvlEncoding)
+}
+
+// RepetitionLevelEncoding returns the encoding utilized for the Repetition
Levels
+func (d *DataPageV1) RepetitionLevelEncoding() parquet.Encoding {
+ return parquet.Encoding(d.repLvlEncoding)
+}
+
+// DataPageV2 is the representation of the V2 data page from the
parquet.thrift spec
+type DataPageV2 struct {
+ page
+
+ nulls int32
+ nrows int32
+ defLvlByteLen int32
+ repLvlByteLen int32
+ compressed bool
+ uncompressedSize int64
+ statistics metadata.EncodedStatistics
+}
+
+// NewDataPageV2 constructs a new V2 data page with the provided information
and a buffer of the raw data.
+func NewDataPageV2(buffer *memory.Buffer, numValues, numNulls, numRows int32,
encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen int32, uncompressed
int64, isCompressed bool) *DataPageV2 {
+ dp := dataPageV2Pool.Get().(*DataPageV2)
+ if dp == nil {
+ return &DataPageV2{
+ page: page{buf: buffer, typ:
format.PageType_DATA_PAGE_V2, nvals: numValues, encoding:
format.Encoding(encoding)},
+ nulls: numNulls,
+ nrows: numRows,
+ defLvlByteLen: defLvlsByteLen,
+ repLvlByteLen: repLvlsByteLen,
+ compressed: isCompressed,
+ uncompressedSize: uncompressed,
+ }
+ }
+
+ dp.buf, dp.nvals = buffer, numValues
+ dp.encoding = format.Encoding(encoding)
+ dp.nulls, dp.nrows = numNulls, numRows
+ dp.defLvlByteLen, dp.repLvlByteLen = defLvlsByteLen, repLvlsByteLen
+ dp.compressed, dp.uncompressedSize = isCompressed, uncompressed
+ dp.statistics.HasMax, dp.statistics.HasMin = false, false
+ dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false,
false
+ return dp
+}
+
+// NewDataPageV2WithStats is the same as NewDataPageV2 but allows providing
the encoded stats with the page.
+func NewDataPageV2WithStats(buffer *memory.Buffer, numValues, numNulls,
numRows int32, encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen int32,
uncompressed int64, isCompressed bool, stats metadata.EncodedStatistics)
*DataPageV2 {
+ ret := NewDataPageV2(buffer, numValues, numNulls, numRows, encoding,
defLvlsByteLen, repLvlsByteLen, uncompressed, isCompressed)
+ ret.statistics = stats
+ return ret
+}
+
+// Release this page back into the DataPage object pool so that it can be
reused.
+//
+// After calling this function, the object should not be utilized anymore,
otherwise
+// conflicts can arise.
+func (d *DataPageV2) Release() {
+ d.buf.Release()
+ d.buf = nil
+ dataPageV2Pool.Put(d)
+}
+
+// UncompressedSize is the size of the raw page when uncompressed. If
`IsCompressed` is true, then
+// the raw data in the buffer is expected to be compressed.
+func (d *DataPageV2) UncompressedSize() int64 { return d.uncompressedSize }
+
+// Statistics are the encoded statistics in the data page
+func (d *DataPageV2) Statistics() metadata.EncodedStatistics { return
d.statistics }
+
+// NumNulls is the reported number of nulls in this datapage
+func (d *DataPageV2) NumNulls() int32 { return d.nulls }
+
+// DefinitionLevelByteLen is the number of bytes in the buffer that are used
to represent the definition levels
+func (d *DataPageV2) DefinitionLevelByteLen() int32 { return d.defLvlByteLen }
+
+// RepetitionLevelByteLen is the number of bytes in the buffer which are used
to represent the repetition Levels
+func (d *DataPageV2) RepetitionLevelByteLen() int32 { return d.repLvlByteLen }
+
+// IsCompressed returns true if the data of this page is compressed
+func (d *DataPageV2) IsCompressed() bool { return d.compressed }
+
+// DictionaryPage represents the a page of data that uses dictionary encoding
+type DictionaryPage struct {
+ page
+
+ sorted bool
+}
+
+// NewDictionaryPage constructs a new dictionary page with the provided data
buffer and number of values.
+func NewDictionaryPage(buffer *memory.Buffer, nvals int32, encoding
parquet.Encoding) *DictionaryPage {
+ dp := dictPagePool.Get().(*DictionaryPage)
+ if dp == nil {
+ return &DictionaryPage{
+ page: page{
+ buf: buffer,
+ typ: format.PageType_DICTIONARY_PAGE,
+ nvals: nvals,
+ encoding: format.Encoding(encoding),
+ },
+ }
+ }
+
+ dp.buf = buffer
+ dp.nvals = nvals
+ dp.encoding = format.Encoding(encoding)
+ dp.sorted = false
+ return dp
+}
+
+// Release this page back into the DataPage object pool so that it can be
reused.
+//
+// After calling this function, the object should not be utilized anymore,
otherwise
+// conflicts can arise.
+func (d *DictionaryPage) Release() {
+ d.buf.Release()
+ d.buf = nil
+ dictPagePool.Put(d)
+}
+
+// IsSorted returns whether the dictionary itself is sorted
+func (d *DictionaryPage) IsSorted() bool { return d.sorted }
+
+type serializedPageReader struct {
+ r ipc.ReadAtSeeker
+ nrows int64
+ rowsSeen int64
+ mem memory.Allocator
+ codec compress.Codec
+
+ curPageHdr *format.PageHeader
+ buf *memory.Buffer
+ pageOrd int16
+ maxPageHeaderSize int
+
+ curPage Page
+ cryptoCtx CryptoContext
+ dataPageAad string
+ dataPageHeaderAad string
+
+ decompressBuffer bytes.Buffer
+ err error
+}
+
+// NewPageReader returns a page reader for the data which can be read from the
provided reader and compression.
+func NewPageReader(r parquet.ReaderAtSeeker, nrows int64, compressType
compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader,
error) {
+ if mem == nil {
+ mem = memory.NewGoAllocator()
+ }
+
+ codec, err := compress.GetCodec(compressType)
+ if err != nil {
+ return nil, err
+ }
+
+ rdr := &serializedPageReader{
+ r: r,
+ maxPageHeaderSize: defaultMaxPageHeaderSize,
+ nrows: nrows,
+ mem: mem,
+ codec: codec,
+ buf: memory.NewResizableBuffer(mem),
+ }
+ rdr.decompressBuffer.Grow(defaultPageHeaderSize)
+ if ctx != nil {
+ rdr.cryptoCtx = *ctx
+ rdr.initDecryption()
+ }
+ return rdr, nil
+}
+
+func (p *serializedPageReader) Reset(r parquet.ReaderAtSeeker, nrows int64,
compressType compress.Compression, ctx *CryptoContext) {
+ p.rowsSeen, p.pageOrd = 0, 0
+ p.curPageHdr, p.curPage, p.err = nil, nil, nil
+ p.r, p.nrows = r, nrows
+
+ p.codec, p.err = compress.GetCodec(compressType)
+ if p.err != nil {
+ return
+ }
+ p.buf.ResizeNoShrink(0)
+ p.decompressBuffer.Reset()
+ if ctx != nil {
+ p.cryptoCtx = *ctx
+ p.initDecryption()
+ } else {
+ p.cryptoCtx = CryptoContext{}
+ p.dataPageAad = ""
+ p.dataPageHeaderAad = ""
+ }
+}
+
+func (p *serializedPageReader) Err() error { return p.err }
+
+func (p *serializedPageReader) SetMaxPageHeaderSize(sz int) {
+ p.maxPageHeaderSize = sz
+}
+
+func (p *serializedPageReader) initDecryption() {
+ if p.cryptoCtx.DataDecryptor != nil {
+ p.dataPageAad =
encryption.CreateModuleAad(p.cryptoCtx.DataDecryptor.FileAad(),
encryption.DataPageModule,
+ p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal,
-1)
+ }
+ if p.cryptoCtx.MetaDecryptor != nil {
+ p.dataPageHeaderAad =
encryption.CreateModuleAad(p.cryptoCtx.MetaDecryptor.FileAad(),
encryption.DataPageHeaderModule,
+ p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal,
-1)
+ }
+}
+
+func (p *serializedPageReader) updateDecryption(decrypt encryption.Decryptor,
moduleType int8, pageAad string) {
+ if p.cryptoCtx.StartDecryptWithDictionaryPage {
+ aad := encryption.CreateModuleAad(decrypt.FileAad(),
moduleType, p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1)
+ decrypt.UpdateAad(aad)
+ } else {
+ pageaad := []byte(pageAad)
+ encryption.QuickUpdatePageAad(pageaad, p.pageOrd)
+ decrypt.UpdateAad(string(pageaad))
+ }
+}
+
+func (p *serializedPageReader) Page() Page {
+ return p.curPage
+}
+
+func (p *serializedPageReader) decompress(lenCompressed int, buf []byte)
([]byte, error) {
+ p.decompressBuffer.Reset()
+ p.decompressBuffer.Grow(lenCompressed)
+ if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed));
err != nil {
+ return nil, err
+ }
+
+ data := p.decompressBuffer.Bytes()
+ if p.cryptoCtx.DataDecryptor != nil {
+ data =
p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+ }
+
+ return p.codec.Decode(buf, data), nil
+}
+
+type dataheader interface {
+ IsSetStatistics() bool
+ GetStatistics() *format.Statistics
+}
+
+func extractStats(dataHeader dataheader) (pageStats
metadata.EncodedStatistics) {
+ if dataHeader.IsSetStatistics() {
+ stats := dataHeader.GetStatistics()
+ if stats.IsSetMaxValue() {
+ pageStats.SetMax(stats.GetMaxValue())
+ } else if stats.IsSetMax() {
+ pageStats.SetMax(stats.GetMax())
+ }
+ if stats.IsSetMinValue() {
+ pageStats.SetMin(stats.GetMinValue())
+ } else if stats.IsSetMin() {
+ pageStats.SetMin(stats.GetMin())
+ }
+
+ if stats.IsSetNullCount() {
+ pageStats.SetNullCount(stats.GetNullCount())
+ }
+ if stats.IsSetDistinctCount() {
+ pageStats.SetDistinctCount(stats.GetDistinctCount())
+ }
+ }
+ return
+}
+
+func (p *serializedPageReader) Next() bool {
+ // Loop here because there may be unhandled page types that we skip
until
+ // finding a page that we do know what to do with
+ if p.curPage != nil {
+ p.curPage.Release()
+ }
+ p.curPage = nil
+ p.curPageHdr = format.NewPageHeader()
+ p.err = nil
+
+ for p.rowsSeen < p.nrows {
+ // headerSize := 0
+ allowedPgSz := defaultPageHeaderSize
+
+ start, _ := p.r.Seek(0, io.SeekCurrent)
+ p.decompressBuffer.Reset()
+ // Page headers can be very large because of page statistics
+ // We try to deserialize a larger buffer progressively
+ // until a maximum allowed header limit
+ for {
+ n, err := io.CopyN(&p.decompressBuffer, p.r,
int64(allowedPgSz))
+ // view, err := p.r.Peek(allowedPgSz)
+ if err != nil && err != io.EOF {
+ p.err = err
+ return false
+ }
+
+ if n == 0 {
+ return false
+ }
+
+ view := p.decompressBuffer.Bytes()
+
+ extra := 0
+ if p.cryptoCtx.MetaDecryptor != nil {
+ p.updateDecryption(p.cryptoCtx.MetaDecryptor,
encryption.DictPageHeaderModule, p.dataPageHeaderAad)
+ view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
+ extra =
p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
+ }
+
+ remaining, err :=
thrift.DeserializeThrift(p.curPageHdr, view)
+ if err != nil {
+ allowedPgSz *= 2
+ if allowedPgSz > p.maxPageHeaderSize {
+ p.err = xerrors.New("parquet:
deserializing page header failed")
+ return false
+ }
+ continue
+ }
+
+ p.r.Seek(start+int64(len(view)-int(remaining)+extra),
io.SeekStart)
+ break
+ }
+
+ lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
+ lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
+ if lenCompressed < 0 || lenUncompressed < 0 {
+ p.err = xerrors.New("parquet: invalid page header")
+ return false
+ }
+
+ if p.cryptoCtx.DataDecryptor != nil {
+ p.updateDecryption(p.cryptoCtx.DataDecryptor,
encryption.DictPageModule, p.dataPageAad)
+ }
+
+ p.buf.ResizeNoShrink(lenUncompressed)
+
+ switch p.curPageHdr.GetType() {
+ case format.PageType_DICTIONARY_PAGE:
+ p.cryptoCtx.StartDecryptWithDictionaryPage = false
+ dictHeader := p.curPageHdr.GetDictionaryPageHeader()
+ if dictHeader.GetNumValues() < 0 {
+ p.err = xerrors.New("parquet: invalid page
header (negative number of values)")
+ return false
+ }
+
+ data, err := p.decompress(lenCompressed, p.buf.Bytes())
+ if err != nil {
+ p.err = err
+ return false
+ }
+ debug.Assert(len(data) == lenUncompressed, "len(data)
!= lenUncompressed")
+
+ // p.buf.Resize(lenUncompressed)
+ // make dictionary page
+ p.curPage = &DictionaryPage{
+ page: page{
+ buf: memory.NewBufferBytes(data),
+ typ: p.curPageHdr.Type,
+ nvals: dictHeader.GetNumValues(),
+ encoding: dictHeader.GetEncoding(),
+ },
+ sorted: dictHeader.IsSetIsSorted() &&
dictHeader.GetIsSorted(),
+ }
+
+ case format.PageType_DATA_PAGE:
+ p.pageOrd++
+ dataHeader := p.curPageHdr.GetDataPageHeader()
+ if dataHeader.GetNumValues() < 0 {
+ p.err = xerrors.New("parquet: invalid page
header (negative number of values)")
+ return false
+ }
+
+ p.rowsSeen += int64(dataHeader.GetNumValues())
+ data, err := p.decompress(lenCompressed, p.buf.Bytes())
+ if err != nil {
+ p.err = err
+ return false
+ }
+ debug.Assert(len(data) == lenUncompressed, "len(data)
!= lenUncompressed")
+
+ // make datapagev1
+ p.curPage = &DataPageV1{
+ page: page{
+ buf: memory.NewBufferBytes(data),
+ typ: p.curPageHdr.Type,
+ nvals: dataHeader.GetNumValues(),
+ encoding: dataHeader.GetEncoding(),
+ },
+ defLvlEncoding:
dataHeader.GetDefinitionLevelEncoding(),
+ repLvlEncoding:
dataHeader.GetRepetitionLevelEncoding(),
+ uncompressedSize: int64(lenUncompressed),
+ statistics: extractStats(dataHeader),
+ }
+ case format.PageType_DATA_PAGE_V2:
+ p.pageOrd++
+ dataHeader := p.curPageHdr.GetDataPageHeaderV2()
+ if dataHeader.GetNumValues() < 0 {
+ p.err = xerrors.New("parquet: invalid page
header (negative number of values)")
+ return false
+ }
+
+ if dataHeader.GetDefinitionLevelsByteLength() < 0 ||
dataHeader.GetRepetitionLevelsByteLength() < 0 {
+ p.err = xerrors.New("parquet: invalid page
header (negative levels byte length)")
+ return false
+ }
+
+ compressed := dataHeader.GetIsCompressed()
+ // extract stats
+ p.rowsSeen += int64(dataHeader.GetNumValues())
+ levelsBytelen, ok :=
overflow.Add(int(dataHeader.GetDefinitionLevelsByteLength()),
int(dataHeader.GetRepetitionLevelsByteLength()))
+ if !ok {
+ p.err = xerrors.New("parquet: levels size too
large (corrupt file?)")
+ return false
+ }
+
+ var data []byte
+ if compressed {
+ if levelsBytelen > 0 {
+ io.ReadFull(p.r,
p.buf.Bytes()[:levelsBytelen])
+ }
+ if data, p.err =
p.decompress(lenCompressed-levelsBytelen, p.buf.Bytes()[levelsBytelen:]); p.err
!= nil {
+ return false
+ }
+ } else {
+ io.ReadFull(p.r, p.buf.Bytes())
+ data = p.buf.Bytes()
+ }
+ debug.Assert(len(data) == lenUncompressed, "len(data)
!= lenUncompressed")
+
+ // make datapage v2
+ p.curPage = &DataPageV2{
+ page: page{
+ buf: memory.NewBufferBytes(data),
+ typ: p.curPageHdr.Type,
+ nvals: dataHeader.GetNumValues(),
+ encoding: dataHeader.GetEncoding(),
+ },
+ nulls: dataHeader.GetNumNulls(),
+ nrows: dataHeader.GetNumRows(),
+ defLvlByteLen:
dataHeader.GetDefinitionLevelsByteLength(),
+ repLvlByteLen:
dataHeader.GetRepetitionLevelsByteLength(),
+ compressed: compressed,
+ uncompressedSize: int64(lenUncompressed),
+ statistics: extractStats(dataHeader),
+ }
+ default:
+ // we don't know this page type, we're allowed to skip
non-data pages
+ continue
+ }
+
+ p.buf = memory.NewResizableBuffer(p.mem)
+ return true
+ }
+
+ return false
+}
diff --git a/go/parquet/file/row_group_reader.go
b/go/parquet/file/row_group_reader.go
new file mode 100644
index 0000000..9c74a25
--- /dev/null
+++ b/go/parquet/file/row_group_reader.go
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/internal/encryption"
+ "github.com/apache/arrow/go/parquet/internal/utils"
+ "github.com/apache/arrow/go/parquet/metadata"
+ "golang.org/x/xerrors"
+)
+
+const (
+ maxDictHeaderSize int64 = 100
+)
+
+// RowGroupReader is the primary interface for reading a single row group
+type RowGroupReader struct {
+ r ipc.ReadAtSeeker
+ sourceSz int64
+ fileMetadata *metadata.FileMetaData
+ rgMetadata *metadata.RowGroupMetaData
+ props *parquet.ReaderProperties
+ fileDecryptor encryption.FileDecryptor
+}
+
+// MetaData returns the metadata of the current Row Group
+func (r *RowGroupReader) MetaData() *metadata.RowGroupMetaData { return
r.rgMetadata }
+
+// NumColumns returns the number of columns of data as defined in the metadata
of this row group
+func (r *RowGroupReader) NumColumns() int { return r.rgMetadata.NumColumns() }
+
+// NumRows returns the number of rows in just this row group
+func (r *RowGroupReader) NumRows() int64 { return r.rgMetadata.NumRows() }
+
+// ByteSize returns the full byte size of this row group as defined in its
metadata
+func (r *RowGroupReader) ByteSize() int64 { return
r.rgMetadata.TotalByteSize() }
+
+// Column returns a column reader for the requested (0-indexed) column
+//
+// panics if passed a column not in the range [0, NumColumns)
+func (r *RowGroupReader) Column(i int) ColumnChunkReader {
+ if i >= r.NumColumns() || i < 0 {
+ panic(xerrors.Errorf("parquet: trying to read column index %d
but row group metadata only has %d columns", i, r.rgMetadata.NumColumns()))
+ }
+
+ descr := r.fileMetadata.Schema.Column(i)
+ pageRdr, err := r.GetColumnPageReader(i)
+ if err != nil {
+ panic(xerrors.Errorf("parquet: unable to initialize page
reader: %w", err))
+ }
+ return NewColumnReader(descr, pageRdr, r.props.Allocator())
+}
+
+func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
+ col, err := r.rgMetadata.ColumnChunk(i)
+ if err != nil {
+ return nil, err
+ }
+
+ colStart := col.DataPageOffset()
+ if col.HasDictionaryPage() && col.DictionaryPageOffset() > 0 &&
colStart > col.DictionaryPageOffset() {
+ colStart = col.DictionaryPageOffset()
+ }
+
+ colLen := col.TotalCompressedSize()
+ if
r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
+ bytesRemain := r.sourceSz - (colStart + colLen)
+ padding := utils.Min(maxDictHeaderSize, bytesRemain)
+ colLen += padding
+ }
+
+ stream, err := r.props.GetStream(r.r, colStart, colLen)
+ if err != nil {
+ return nil, err
+ }
+
+ cryptoMetadata := col.CryptoMetadata()
+ if cryptoMetadata == nil {
+ return NewPageReader(stream, col.NumValues(),
col.Compression(), r.props.Allocator(), nil)
+ }
+
+ if r.fileDecryptor == nil {
+ return nil, xerrors.New("column in rowgroup is encrypted, but
no file decryptor")
+ }
+
+ const encryptedRowGroupsLimit = 32767
+ if i > encryptedRowGroupsLimit {
+ return nil, xerrors.New("encrypted files cannot contain more
than 32767 column chunks")
+ }
+
+ if cryptoMetadata.IsSetENCRYPTION_WITH_FOOTER_KEY() {
+ ctx := CryptoContext{
+ StartDecryptWithDictionaryPage: col.HasDictionaryPage(),
+ RowGroupOrdinal: r.rgMetadata.Ordinal(),
+ ColumnOrdinal: int16(i),
+ MetaDecryptor:
r.fileDecryptor.GetFooterDecryptorForColumnMeta(""),
+ DataDecryptor:
r.fileDecryptor.GetFooterDecryptorForColumnData(""),
+ }
+ return NewPageReader(stream, col.NumValues(),
col.Compression(), r.props.Allocator(), &ctx)
+ }
+
+ // column encrypted with it's own key
+ columnKeyMeta :=
cryptoMetadata.GetENCRYPTION_WITH_COLUMN_KEY().KeyMetadata
+ columnPath :=
cryptoMetadata.GetENCRYPTION_WITH_COLUMN_KEY().PathInSchema
+
+ ctx := CryptoContext{
+ StartDecryptWithDictionaryPage: col.HasDictionaryPage(),
+ RowGroupOrdinal: r.rgMetadata.Ordinal(),
+ ColumnOrdinal: int16(i),
+ MetaDecryptor:
r.fileDecryptor.GetColumnMetaDecryptor(parquet.ColumnPath(columnPath).String(),
string(columnKeyMeta), ""),
+ DataDecryptor:
r.fileDecryptor.GetColumnDataDecryptor(parquet.ColumnPath(columnPath).String(),
string(columnKeyMeta), ""),
+ }
+ return NewPageReader(stream, col.NumValues(), col.Compression(),
r.props.Allocator(), &ctx)
+}
diff --git a/go/parquet/go.sum b/go/parquet/go.sum
index cf7b678..46b4f5a 100644
--- a/go/parquet/go.sum
+++ b/go/parquet/go.sum
@@ -75,6 +75,7 @@ github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod
h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
+github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
diff --git a/go/parquet/internal/bmi/bitmap_bmi2_noasm.go
b/go/parquet/internal/bmi/bitmap_bmi2_noasm.go
new file mode 100644
index 0000000..6dc4a39
--- /dev/null
+++ b/go/parquet/internal/bmi/bitmap_bmi2_noasm.go
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build noasm
+
+package bmi
+
+func init() {
+ funclist.extractBits = extractBitsGo
+ funclist.gtbitmap = greaterThanBitmapGo
+}
diff --git a/go/parquet/internal/bmi/bmi.go b/go/parquet/internal/bmi/bmi.go
index ea0f6e3..a12af3e 100644
--- a/go/parquet/internal/bmi/bmi.go
+++ b/go/parquet/internal/bmi/bmi.go
@@ -254,7 +254,7 @@ func extractBitsGo(bitmap, selectBitmap uint64) uint64 {
for selectBitmap != 0 {
maskLen := bits.OnesCount32(uint32(selectBitmap & lookupMask))
value := pextTable[selectBitmap&lookupMask][bitmap&lookupMask]
- bitValue |= uint64(value << bitLen)
+ bitValue |= uint64(value) << bitLen
bitLen += maskLen
bitmap >>= lookupBits
selectBitmap >>= lookupBits
diff --git a/go/parquet/internal/encoding/boolean_decoder.go
b/go/parquet/internal/encoding/boolean_decoder.go
index a33b21a..bdf1fd5 100644
--- a/go/parquet/internal/encoding/boolean_decoder.go
+++ b/go/parquet/internal/encoding/boolean_decoder.go
@@ -45,7 +45,7 @@ func (dec *PlainBooleanDecoder) Decode(out []bool) (int,
error) {
unalignedExtract := func(start, end, curBitOffset int) int {
i := start
- for ; curBitOffset < end; i, curBitOffset = i+1, curBitOffset+1
{
+ for ; curBitOffset < end && i < max; i, curBitOffset = i+1,
curBitOffset+1 {
out[i] = (dec.data[0] & byte(1<<curBitOffset)) != 0
}
return i // return the number of bits we extracted
@@ -56,7 +56,7 @@ func (dec *PlainBooleanDecoder) Decode(out []bool) (int,
error) {
i := 0
if dec.bitOffset != 0 {
i = unalignedExtract(0, 8, dec.bitOffset)
- dec.bitOffset = 0
+ dec.bitOffset = (dec.bitOffset + i) % 8
}
// determine the number of full bytes worth of bits we can decode
diff --git a/go/parquet/internal/encoding/boolean_encoder.go
b/go/parquet/internal/encoding/boolean_encoder.go
index ba06d4c..81f523b 100644
--- a/go/parquet/internal/encoding/boolean_encoder.go
+++ b/go/parquet/internal/encoding/boolean_encoder.go
@@ -47,6 +47,9 @@ func (enc *PlainBooleanEncoder) Put(in []bool) {
if enc.wr == nil {
enc.wr = utils.NewBitmapWriter(enc.bitsBuffer, 0, boolsInBuf)
}
+ if len(in) == 0 {
+ return
+ }
n := enc.wr.AppendBools(in)
for n < len(in) {
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go
b/go/parquet/internal/encoding/typed_encoder.gen.go
index 211f062..54f0f56 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go
@@ -495,10 +495,9 @@ type int96EncoderTraits struct{}
// Encoder returns an encoder for int96 type data, using the specified
encoding type and whether or not
// it should be dictionary encoded.
-// dictionary encoding does not exist for this type and Encoder will panic if
useDict is true
func (int96EncoderTraits) Encoder(e format.Encoding, useDict bool, descr
*schema.Column, mem memory.Allocator) TypedEncoder {
if useDict {
- panic("parquet: no parquet.Int96 dictionary encoding")
+ return &DictInt96Encoder{newDictEncoderBase(descr,
NewBinaryDictionary(mem), mem)}
}
switch e {
@@ -521,7 +520,7 @@ func (int96DecoderTraits) BytesRequired(n int) int {
// Decoder returns a decoder for int96 typed data of the requested encoding
type if available
func (int96DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column,
useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
- panic("dictionary decoding unimplemented for int96")
+ return &DictInt96Decoder{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
}
switch e {
@@ -532,6 +531,157 @@ func (int96DecoderTraits) Decoder(e parquet.Encoding,
descr *schema.Column, useD
}
}
+// DictInt96Encoder is an encoder for parquet.Int96 data using dictionary
encoding
+type DictInt96Encoder struct {
+ dictEncoder
+}
+
+// Type returns the underlying physical type that can be encoded with this
encoder
+func (enc *DictInt96Encoder) Type() parquet.Type {
+ return parquet.Types.Int96
+}
+
+// WriteDict populates the byte slice with the dictionary index
+func (enc *DictInt96Encoder) WriteDict(out []byte) {
+ enc.memo.(BinaryMemoTable).CopyFixedWidthValues(0,
parquet.Int96SizeBytes, out)
+}
+
+// Put encodes the values passed in, adding to the index as needed
+func (enc *DictInt96Encoder) Put(in []parquet.Int96) {
+ for _, v := range in {
+ memoIdx, found, err := enc.memo.GetOrInsert(v)
+ if err != nil {
+ panic(err)
+ }
+ if !found {
+ enc.dictEncodedSize += parquet.Int96SizeBytes
+ }
+ enc.addIndex(memoIdx)
+ }
+}
+
+// PutSpaced is like Put but assumes space for nulls
+func (enc *DictInt96Encoder) PutSpaced(in []parquet.Int96, validBits []byte,
validBitsOffset int64) {
+ utils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)),
func(pos, length int64) error {
+ enc.Put(in[pos : pos+length])
+ return nil
+ })
+}
+
+// DictInt96Decoder is a decoder for decoding dictionary encoded data for
parquet.Int96 columns
+type DictInt96Decoder struct {
+ dictDecoder
+}
+
+// Type returns the underlying physical type that can be decoded with this
decoder
+func (DictInt96Decoder) Type() parquet.Type {
+ return parquet.Types.Int96
+}
+
+// Decode populates the passed in slice with min(len(out), remaining values)
values,
+// decoding using hte dictionary to get the actual values. Returns the number
of values
+// actually decoded and any error encountered.
+func (d *DictInt96Decoder) Decode(out []parquet.Int96) (int, error) {
+ vals := utils.MinInt(len(out), d.nvals)
+ decoded, err := d.decode(out[:vals])
+ if err != nil {
+ return decoded, err
+ }
+ if vals != decoded {
+ return decoded, xerrors.New("parquet: dict eof exception")
+ }
+ d.nvals -= vals
+ return vals, nil
+}
+
+// Decode spaced is like Decode but will space out the data leaving slots for
null values
+// based on the provided bitmap.
+func (d *DictInt96Decoder) DecodeSpaced(out []parquet.Int96, nullCount int,
validBits []byte, validBitsOffset int64) (int, error) {
+ vals := utils.MinInt(len(out), d.nvals)
+ decoded, err := d.decodeSpaced(out[:vals], nullCount, validBits,
validBitsOffset)
+ if err != nil {
+ return decoded, err
+ }
+ if vals != decoded {
+ return decoded, xerrors.New("parquet: dict spaced eof
exception")
+ }
+ d.nvals -= vals
+ return vals, nil
+}
+
+// Int96DictConverter is a helper for dictionary handling which is used for
converting
+// run length encoded indexes into the actual values that are stored in the
dictionary index page.
+type Int96DictConverter struct {
+ valueDecoder Int96Decoder
+ dict []parquet.Int96
+ zeroVal parquet.Int96
+}
+
+// ensure validates that we've decoded dictionary values up to the index
+// provided so that we don't need to decode the entire dictionary at start.
+func (dc *Int96DictConverter) ensure(idx utils.IndexType) error {
+ if len(dc.dict) <= int(idx) {
+ if cap(dc.dict) <= int(idx) {
+ val := make([]parquet.Int96, int(idx+1)-len(dc.dict))
+ n, err := dc.valueDecoder.Decode(val)
+ if err != nil {
+ return err
+ }
+ dc.dict = append(dc.dict, val[:n]...)
+ } else {
+ cur := len(dc.dict)
+ n, err := dc.valueDecoder.Decode(dc.dict[cur : idx+1])
+ if err != nil {
+ return err
+ }
+ dc.dict = dc.dict[:cur+n]
+ }
+ }
+ return nil
+}
+
+// IsValid verifies that the set of indexes passed in are all valid indexes
+// in the dictionary and if necessary decodes dictionary indexes up to the
index
+// requested.
+func (dc *Int96DictConverter) IsValid(idxes ...utils.IndexType) bool {
+ min, max := utils.GetMinMaxInt32(*(*[]int32)(unsafe.Pointer(&idxes)))
+ dc.ensure(utils.IndexType(max))
+
+ return min >= 0 && int(min) < len(dc.dict) && int(max) >= 0 && int(max)
< len(dc.dict)
+}
+
+// Fill populates the slice passed in entirely with the value at dictionary
index indicated by val
+func (dc *Int96DictConverter) Fill(out interface{}, val utils.IndexType) error
{
+ o := out.([]parquet.Int96)
+ if err := dc.ensure(val); err != nil {
+ return err
+ }
+ o[0] = dc.dict[val]
+ for i := 1; i < len(o); i *= 2 {
+ copy(o[i:], o[:i])
+ }
+ return nil
+}
+
+// FillZero populates the entire slice of out with the zero value for
parquet.Int96
+func (dc *Int96DictConverter) FillZero(out interface{}) {
+ o := out.([]parquet.Int96)
+ o[0] = dc.zeroVal
+ for i := 1; i < len(o); i *= 2 {
+ copy(o[i:], o[:i])
+ }
+}
+
+// Copy populates the slice provided with the values in the dictionary at the
indexes
+// in the vals slice.
+func (dc *Int96DictConverter) Copy(out interface{}, vals []utils.IndexType)
error {
+ o := out.([]parquet.Int96)
+ for idx, val := range vals {
+ o[idx] = dc.dict[val]
+ }
+ return nil
+}
+
// Float32Encoder is the interface for all encoding types that implement
encoding
// float32 values.
type Float32Encoder interface {
@@ -1385,6 +1535,8 @@ func NewDictConverter(dict TypedDecoder)
utils.DictionaryConverter {
return &Int32DictConverter{valueDecoder: dict.(Int32Decoder),
dict: make([]int32, 0, dict.ValuesLeft())}
case parquet.Types.Int64:
return &Int64DictConverter{valueDecoder: dict.(Int64Decoder),
dict: make([]int64, 0, dict.ValuesLeft())}
+ case parquet.Types.Int96:
+ return &Int96DictConverter{valueDecoder: dict.(Int96Decoder),
dict: make([]parquet.Int96, 0, dict.ValuesLeft())}
case parquet.Types.Float:
return &Float32DictConverter{valueDecoder:
dict.(Float32Decoder), dict: make([]float32, 0, dict.ValuesLeft())}
case parquet.Types.Double:
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
index d2ebbe4..14c1e9a 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
@@ -56,15 +56,15 @@ type {{.lower}}EncoderTraits struct{}
// Encoder returns an encoder for {{.lower}} type data, using the specified
encoding type and whether or not
// it should be dictionary encoded.
-{{- if or (eq .Name "Boolean") (eq .Name "Int96")}}
+{{- if or (eq .Name "Boolean") }}
// dictionary encoding does not exist for this type and Encoder will panic if
useDict is true
{{- end }}
func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr
*schema.Column, mem memory.Allocator) TypedEncoder {
if useDict {
-{{- if or (eq .Name "Boolean") (eq .Name "Int96")}}
+{{- if or (eq .Name "Boolean") }}
panic("parquet: no {{.name}} dictionary encoding")
{{- else}}
- return &Dict{{.Name}}Encoder{newDictEncoderBase(descr, New{{if and (ne
.Name "ByteArray") (ne .Name
"FixedLenByteArray")}}{{.Name}}Dictionary(){{else}}BinaryDictionary(mem){{end}},
mem)}
+ return &Dict{{.Name}}Encoder{newDictEncoderBase(descr, New{{if and (ne
.Name "Int96") (ne .Name "ByteArray") (ne .Name
"FixedLenByteArray")}}{{.Name}}Dictionary(){{else}}BinaryDictionary(mem){{end}},
mem)}
{{- end}}
}
@@ -105,7 +105,7 @@ func ({{.lower}}DecoderTraits) BytesRequired(n int) int {
// Decoder returns a decoder for {{.lower}} typed data of the requested
encoding type if available
func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr
*schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
if useDict {
-{{- if and (ne .Name "Boolean") (ne .Name "Int96")}}
+{{- if and (ne .Name "Boolean") }}
return &Dict{{.Name}}Decoder{dictDecoder{decoder:
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
{{- else}}
panic("dictionary decoding unimplemented for {{.lower}}")
@@ -150,7 +150,7 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding,
descr *schema.Column,
}
}
-{{if and (ne .Name "Boolean") (ne .Name "Int96")}}
+{{if and (ne .Name "Boolean") }}
// Dict{{.Name}}Encoder is an encoder for {{.name}} data using dictionary
encoding
type Dict{{.Name}}Encoder struct {
dictEncoder
@@ -162,6 +162,12 @@ func (enc *Dict{{.Name}}Encoder) Type() parquet.Type {
}
{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}
+{{if (ne .Name "Int96")}}
+// WriteDict populates the byte slice with the dictionary index
+func (enc *Dict{{.Name}}Encoder) WriteDict(out []byte) {
+ enc.memo.CopyValues({{.prefix}}.{{.Name}}Traits.CastFromBytes(out))
+}
+
// Put encodes the values passed in, adding to the index as needed.
func (enc *Dict{{.Name}}Encoder) Put(in []{{.name}}) {
for _, val := range in {
@@ -179,6 +185,34 @@ func (enc *Dict{{.Name}}Encoder) PutSpaced(in []{{.name}},
validBits []byte, val
return nil
})
}
+{{else}}
+// WriteDict populates the byte slice with the dictionary index
+func (enc *DictInt96Encoder) WriteDict(out []byte) {
+ enc.memo.(BinaryMemoTable).CopyFixedWidthValues(0, parquet.Int96SizeBytes,
out)
+}
+
+// Put encodes the values passed in, adding to the index as needed
+func (enc *DictInt96Encoder) Put(in []parquet.Int96) {
+ for _, v := range in {
+ memoIdx, found, err := enc.memo.GetOrInsert(v)
+ if err != nil {
+ panic(err)
+ }
+ if !found {
+ enc.dictEncodedSize += parquet.Int96SizeBytes
+ }
+ enc.addIndex(memoIdx)
+ }
+}
+
+// PutSpaced is like Put but assumes space for nulls
+func (enc *DictInt96Encoder) PutSpaced(in []parquet.Int96, validBits []byte,
validBitsOffset int64) {
+ utils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), func(pos,
length int64) error {
+ enc.Put(in[pos : pos+length])
+ return nil
+ })
+}
+{{end}}
{{end}}
// Dict{{.Name}}Decoder is a decoder for decoding dictionary encoded data for
{{.name}} columns
@@ -302,7 +336,7 @@ func (dc *{{.Name}}DictConverter) Copy(out interface{},
vals []utils.IndexType)
// decoder as the decoder to decode the dictionary index.
func NewDictConverter(dict TypedDecoder) utils.DictionaryConverter {
switch dict.Type() {
- {{ range .In }}{{ if and (ne .Name "Boolean") (ne .Name "Int96") -}}
+ {{ range .In }}{{ if and (ne .Name "Boolean") -}}
case parquet.Types.{{if .physical }}{{.physical}}{{else}}{{.Name}}{{end}}:
return &{{.Name}}DictConverter{valueDecoder: dict.({{.Name}}Decoder),
dict: make([]{{.name}}, 0, dict.ValuesLeft())}
{{ end }}{{ end -}}
diff --git a/go/parquet/internal/testutils/pagebuilder.go
b/go/parquet/internal/testutils/pagebuilder.go
new file mode 100644
index 0000000..f742f1a
--- /dev/null
+++ b/go/parquet/internal/testutils/pagebuilder.go
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutils
+
+import (
+ "encoding/binary"
+ "io"
+ "reflect"
+
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/apache/arrow/go/parquet"
+ "github.com/apache/arrow/go/parquet/compress"
+ "github.com/apache/arrow/go/parquet/file"
+ "github.com/apache/arrow/go/parquet/internal/encoding"
+ "github.com/apache/arrow/go/parquet/internal/utils"
+ "github.com/apache/arrow/go/parquet/schema"
+ "github.com/stretchr/testify/mock"
+)
+
+type DataPageBuilder struct {
+ sink io.Writer
+ version parquet.DataPageVersion
+
+ nvals int
+ encoding parquet.Encoding
+ defLvlEncoding parquet.Encoding
+ repLvlEncoding parquet.Encoding
+ defLvlBytesLen int
+ repLvlBytesLen int
+ hasDefLvls bool
+ hasRepLvls bool
+ hasValues bool
+}
+
+var mem = memory.NewGoAllocator()
+
+func (d *DataPageBuilder) appendLevels(lvls []int16, maxLvl int16, e
parquet.Encoding) int {
+ if e != parquet.Encodings.RLE {
+ panic("parquet: only rle encoding currently implemented")
+ }
+
+ buf := encoding.NewBufferWriter(encoding.LevelEncodingMaxBufferSize(e,
maxLvl, len(lvls)), memory.DefaultAllocator)
+ var enc encoding.LevelEncoder
+ enc.Init(e, maxLvl, buf)
+ enc.Encode(lvls)
+
+ rleBytes := enc.Len()
+ if d.version == parquet.DataPageV1 {
+ if err := binary.Write(d.sink, binary.LittleEndian,
int32(rleBytes)); err != nil {
+ panic(err)
+ }
+ }
+
+ if _, err := d.sink.Write(buf.Bytes()[:rleBytes]); err != nil {
+ panic(err)
+ }
+ return rleBytes
+}
+
+func (d *DataPageBuilder) AppendDefLevels(lvls []int16, maxLvl int16) {
+ d.defLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE)
+
+ d.nvals = utils.MaxInt(len(lvls), d.nvals)
+ d.defLvlEncoding = parquet.Encodings.RLE
+ d.hasDefLvls = true
+}
+
+func (d *DataPageBuilder) AppendRepLevels(lvls []int16, maxLvl int16) {
+ d.repLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE)
+
+ d.nvals = utils.MaxInt(len(lvls), d.nvals)
+ d.repLvlEncoding = parquet.Encodings.RLE
+ d.hasRepLvls = true
+}
+
+func (d *DataPageBuilder) AppendValues(desc *schema.Column, values
interface{}, e parquet.Encoding) {
+ enc := encoding.NewEncoder(desc.PhysicalType(), e, false, desc, mem)
+ var sz int
+ switch v := values.(type) {
+ case []int32:
+ enc.(encoding.Int32Encoder).Put(v)
+ sz = len(v)
+ case []int64:
+ enc.(encoding.Int64Encoder).Put(v)
+ sz = len(v)
+ case []parquet.Int96:
+ enc.(encoding.Int96Encoder).Put(v)
+ sz = len(v)
+ case []float32:
+ enc.(encoding.Float32Encoder).Put(v)
+ sz = len(v)
+ case []float64:
+ enc.(encoding.Float64Encoder).Put(v)
+ sz = len(v)
+ case []parquet.ByteArray:
+ enc.(encoding.ByteArrayEncoder).Put(v)
+ sz = len(v)
+ }
+ buf, _ := enc.FlushValues()
+ _, err := d.sink.Write(buf.Bytes())
+ if err != nil {
+ panic(err)
+ }
+
+ d.nvals = utils.MaxInt(sz, d.nvals)
+ d.encoding = e
+ d.hasValues = true
+}
+
+type DictionaryPageBuilder struct {
+ traits encoding.DictEncoder
+ numDictValues int32
+ hasValues bool
+}
+
+func NewDictionaryPageBuilder(d *schema.Column) *DictionaryPageBuilder {
+ return &DictionaryPageBuilder{
+ encoding.NewEncoder(d.PhysicalType(), parquet.Encodings.Plain,
true, d, mem).(encoding.DictEncoder),
+ 0, false}
+}
+
+func (d *DictionaryPageBuilder) AppendValues(values interface{})
encoding.Buffer {
+ switch v := values.(type) {
+ case []int32:
+ d.traits.(encoding.Int32Encoder).Put(v)
+ case []int64:
+ d.traits.(encoding.Int64Encoder).Put(v)
+ case []parquet.Int96:
+ d.traits.(encoding.Int96Encoder).Put(v)
+ case []float32:
+ d.traits.(encoding.Float32Encoder).Put(v)
+ case []float64:
+ d.traits.(encoding.Float64Encoder).Put(v)
+ case []parquet.ByteArray:
+ d.traits.(encoding.ByteArrayEncoder).Put(v)
+ }
+
+ d.numDictValues = int32(d.traits.NumEntries())
+ d.hasValues = true
+ buf, _ := d.traits.FlushValues()
+ return buf
+}
+
+func (d *DictionaryPageBuilder) WriteDict() *memory.Buffer {
+ buf := memory.NewBufferBytes(make([]byte, d.traits.DictEncodedSize()))
+ d.traits.WriteDict(buf.Bytes())
+ return buf
+}
+
+func (d *DictionaryPageBuilder) NumValues() int32 {
+ return d.numDictValues
+}
+
+func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column,
values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer,
defLvls, repLvls []int16, maxDef, maxRep int16) file.Page {
+ num := 0
+
+ stream := encoding.NewBufferWriter(1024, mem)
+ builder := DataPageBuilder{sink: stream, version: dataPageVersion}
+
+ if len(repLvls) > 0 {
+ builder.AppendRepLevels(repLvls, maxRep)
+ }
+ if len(defLvls) > 0 {
+ builder.AppendDefLevels(defLvls, maxDef)
+ }
+
+ if e == parquet.Encodings.Plain {
+ builder.AppendValues(d, values, e)
+ num = builder.nvals
+ } else {
+ stream.Write(indexBuffer.Bytes())
+ num = utils.MaxInt(builder.nvals, nvals)
+ }
+
+ buf := stream.Finish()
+ if dataPageVersion == parquet.DataPageV1 {
+ return file.NewDataPageV1(buf, int32(num), e,
builder.defLvlEncoding, builder.repLvlEncoding, int64(buf.Len()))
+ }
+ return file.NewDataPageV2(buf, int32(num), 0, int32(num), e,
int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), int64(buf.Len()),
false)
+}
+
+func MakeDictPage(d *schema.Column, values interface{}, valuesPerPage []int, e
parquet.Encoding) (*file.DictionaryPage, []encoding.Buffer) {
+ bldr := NewDictionaryPageBuilder(d)
+ npages := len(valuesPerPage)
+
+ ref := reflect.ValueOf(values)
+ valStart := 0
+
+ rleIndices := make([]encoding.Buffer, 0, npages)
+ for _, nvals := range valuesPerPage {
+ rleIndices = append(rleIndices,
bldr.AppendValues(ref.Slice(valStart, valStart+nvals).Interface()))
+ valStart += nvals
+ }
+
+ buffer := bldr.WriteDict()
+ return file.NewDictionaryPage(buffer, bldr.NumValues(),
parquet.Encodings.Plain), rleIndices
+}
+
+type MockPageReader struct {
+ mock.Mock
+
+ curpage int
+}
+
+func (m *MockPageReader) Err() error {
+ return m.Called().Error(0)
+}
+
+func (m *MockPageReader) Reset(parquet.ReaderAtSeeker, int64,
compress.Compression, *file.CryptoContext) {
+}
+
+func (m *MockPageReader) SetMaxPageHeaderSize(int) {}
+
+func (m *MockPageReader) Page() file.Page {
+ return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1]
+}
+
+func (m *MockPageReader) Next() bool {
+ pageList := m.TestData().Get("pages").Data().([]file.Page)
+ m.curpage++
+ return len(pageList) >= m.curpage
+}
+
+func PaginatePlain(version parquet.DataPageVersion, d *schema.Column, values
reflect.Value, defLevels, repLevels []int16,
+ maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc
parquet.Encoding) []file.Page {
+
+ var (
+ npages = len(valuesPerPage)
+ defLvlStart = 0
+ defLvlEnd = 0
+ repLvlStart = 0
+ repLvlEnd = 0
+ valueStart = 0
+ )
+
+ pageList := make([]file.Page, 0, npages)
+ for i := 0; i < npages; i++ {
+ if maxDef > 0 {
+ defLvlStart = i * lvlsPerPage
+ defLvlEnd = (i + 1) * lvlsPerPage
+ }
+ if maxRep > 0 {
+ repLvlStart = i * lvlsPerPage
+ repLvlEnd = (i + 1) * lvlsPerPage
+ }
+
+ page := MakeDataPage(version, d,
+ values.Slice(valueStart,
valueStart+valuesPerPage[i]).Interface(),
+ valuesPerPage[i], enc, nil,
defLevels[defLvlStart:defLvlEnd],
+ repLevels[repLvlStart:repLvlEnd], maxDef, maxRep)
+ valueStart += valuesPerPage[i]
+ pageList = append(pageList, page)
+ }
+ return pageList
+}
+
+func PaginateDict(version parquet.DataPageVersion, d *schema.Column, values
reflect.Value, defLevels, repLevels []int16, maxDef, maxRep int16, lvlsPerPage
int, valuesPerPage []int, enc parquet.Encoding) []file.Page {
+ var (
+ npages = len(valuesPerPage)
+ pages = make([]file.Page, 0, npages)
+ defStart = 0
+ defEnd = 0
+ repStart = 0
+ repEnd = 0
+ )
+
+ dictPage, rleIndices := MakeDictPage(d, values.Interface(),
valuesPerPage, enc)
+ pages = append(pages, dictPage)
+ for i := 0; i < npages; i++ {
+ if maxDef > 0 {
+ defStart = i * lvlsPerPage
+ defEnd = (i + 1) * lvlsPerPage
+ }
+ if maxRep > 0 {
+ repStart = i * lvlsPerPage
+ repEnd = (i + 1) * lvlsPerPage
+ }
+ page := MakeDataPage(version, d, nil, valuesPerPage[i], enc,
rleIndices[i],
+ defLevels[defStart:defEnd], repLevels[repStart:repEnd],
maxDef, maxRep)
+ pages = append(pages, page)
+ }
+ return pages
+}
diff --git a/go/parquet/reader_properties.go b/go/parquet/reader_properties.go
index 92abae5..7e99d9f 100644
--- a/go/parquet/reader_properties.go
+++ b/go/parquet/reader_properties.go
@@ -20,7 +20,6 @@ import (
"bytes"
"io"
- "github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
"golang.org/x/xerrors"
)
@@ -61,7 +60,7 @@ func (r *ReaderProperties) Allocator() memory.Allocator {
return r.alloc }
//
// If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise
it will read the entire section
// into a buffer in memory and return a bytes.NewReader for that buffer.
-func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64)
(ipc.ReadAtSeeker, error) {
+func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64)
(ReaderAtSeeker, error) {
if r.BufferedStreamEnabled {
return io.NewSectionReader(source, start, nbytes), nil
}
diff --git a/go/parquet/types.go b/go/parquet/types.go
index e568984..630244c 100644
--- a/go/parquet/types.go
+++ b/go/parquet/types.go
@@ -18,6 +18,7 @@ package parquet
import (
"encoding/binary"
+ "io"
"reflect"
"strings"
"time"
@@ -47,6 +48,15 @@ var (
FixedLenByteArraySizeBytes int =
int(reflect.TypeOf(FixedLenByteArray{}).Size())
)
+// ReaderAtSeeker is a combination of the ReaderAt and ReadSeeker interfaces
+// from the io package defining the only functionality that is required
+// in order for a parquet file to be read by the file functions. We just need
+// to be able to call ReadAt, Read, and Seek
+type ReaderAtSeeker interface {
+ io.ReaderAt
+ io.ReadSeeker
+}
+
// NewInt96 creates a new Int96 from the given 3 uint32 values.
func NewInt96(v [3]uint32) (out Int96) {
binary.LittleEndian.PutUint32(out[0:], v[0])