This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new aecdc0b  ARROW-13984: [Go][Parquet] file handling for go parquet, just 
the readers
aecdc0b is described below

commit aecdc0bd75ef14095ec2a560885c3f4e059bc730
Author: Matthew Topol <[email protected]>
AuthorDate: Sat Oct 23 14:09:59 2021 -0400

    ARROW-13984: [Go][Parquet] file handling for go parquet, just the readers
    
    This implements the file/column and page readers for Parquet files. In 
order to keep this smaller, I've only included what was necessary for the 
readers and will make a separate PR for the file and column writers after this.
    
    Closes #11146 from zeroshade/goparquet-file
    
    Lead-authored-by: Matthew Topol <[email protected]>
    Co-authored-by: Matt Topol <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/parquet/file/column_reader.go                   | 498 +++++++++++++++++
 go/parquet/file/column_reader_test.go              | 450 +++++++++++++++
 go/parquet/file/column_reader_types.gen.go         | 299 ++++++++++
 go/parquet/file/column_reader_types.gen.go.tmpl    |  62 +++
 go/parquet/file/file_reader.go                     | 336 +++++++++++
 go/parquet/file/file_reader_test.go                | 304 ++++++++++
 go/parquet/file/level_conversion.go                | 262 +++++++++
 go/parquet/file/level_conversion_test.go           | 194 +++++++
 go/parquet/file/page_reader.go                     | 620 +++++++++++++++++++++
 go/parquet/file/row_group_reader.go                | 130 +++++
 go/parquet/go.sum                                  |   1 +
 go/parquet/internal/bmi/bitmap_bmi2_noasm.go       |  24 +
 go/parquet/internal/bmi/bmi.go                     |   2 +-
 go/parquet/internal/encoding/boolean_decoder.go    |   4 +-
 go/parquet/internal/encoding/boolean_encoder.go    |   3 +
 go/parquet/internal/encoding/typed_encoder.gen.go  | 158 +++++-
 .../internal/encoding/typed_encoder.gen.go.tmpl    |  46 +-
 go/parquet/internal/testutils/pagebuilder.go       | 297 ++++++++++
 go/parquet/reader_properties.go                    |   3 +-
 go/parquet/types.go                                |  10 +
 20 files changed, 3689 insertions(+), 14 deletions(-)

diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
new file mode 100644
index 0000000..79c6479
--- /dev/null
+++ b/go/parquet/file/column_reader.go
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/internal/encryption"
+       format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/apache/arrow/go/parquet/schema"
+       "golang.org/x/xerrors"
+)
+
+const (
+       // 4 MB is the default maximum page header size
+       defaultMaxPageHeaderSize = 4 * 1024 * 1024
+       // 16 KB is the default expected page header size
+       defaultPageHeaderSize = 16 * 1024
+)
+
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i 
-data=../internal/encoding/physical_types.tmpldata 
column_reader_types.gen.go.tmpl
+
+func isDictIndexEncoding(e format.Encoding) bool {
+       return e == format.Encoding_RLE_DICTIONARY || e == 
format.Encoding_PLAIN_DICTIONARY
+}
+
+// CryptoContext is a context for keeping track of the current methods for 
decrypting.
+// It keeps track of the row group and column numbers along with references to 
the
+// decryptor objects.
+type CryptoContext struct {
+       StartDecryptWithDictionaryPage bool
+       RowGroupOrdinal                int16
+       ColumnOrdinal                  int16
+       MetaDecryptor                  encryption.Decryptor
+       DataDecryptor                  encryption.Decryptor
+}
+
+// ColumnChunkReader is the basic interface for all column readers. It will use
+// a page reader to read all the pages in a column chunk from a row group.
+//
+// To actually Read out the column data, you need to convert to the properly
+// typed ColumnChunkReader type such as *BooleanColumnReader etc.
+//
+// Some things to clarify when working with column readers:
+//
+// "Values" refers to the physical data values in a data page.
+//
+// This is separate from the number of "rows" in a column and the total number
+// of "elements" in a column because null values aren't stored physically in 
the
+// data page but are represented via definition levels, so the number of values
+// in a column can be less than the number of rows.
+//
+// The total number of "elements" in a column also differs because of potential
+// repeated fields, where you can have multiple values in the page which
+// together make up a single element (such as a list) or depending on the 
repetition
+// level and definition level, could represent an entire null list or just a 
null
+// element inside of a list.
+type ColumnChunkReader interface {
+       // HasNext returns whether there is more data to be read in this column
+       // and row group.
+       HasNext() bool
+       // Type returns the underlying physical type of the column
+       Type() parquet.Type
+       // Descriptor returns the column schema container
+       Descriptor() *schema.Column
+       // if HasNext returns false because of an error, this will return the 
error
+       // it encountered. Otherwise this will be nil if it's just the end of 
the
+       // column
+       Err() error
+       // Skip buffered values
+       consumeBufferedValues(int64)
+       // number of available buffered values that have not been decoded yet
+       // when this returns 0, you're at the end of a page.
+       numAvailValues() int64
+       // read the definition levels and return the number of definitions,
+       // and the number of values to be read (number of def levels == maxdef 
level)
+       // it also populates the passed in slice which should be sized 
appropriately.
+       readDefinitionLevels(levels []int16) (int, int64)
+       // read the repetition levels and return the number of repetition 
levels read
+       // also populates the passed in slice, which should be sized 
appropriately.
+       readRepetitionLevels(levels []int16) int
+       // a column is made up of potentially multiple pages across potentially 
multiple
+       // row groups. A PageReader allows looping through the pages in a 
single row group.
+       // When moving to another row group for reading, use setPageReader to 
re-use the
+       // column reader for reading the pages of the new row group.
+       pager() PageReader
+       // set a page reader into the columnreader so it can be reused.
+       //
+       // This will clear any current error in the reader but does not
+       // automatically read the first page of the page reader passed in until
+       // HasNext which will read in the next page.
+       setPageReader(PageReader)
+}
+
+type columnChunkReader struct {
+       descr             *schema.Column
+       rdr               PageReader
+       repetitionDecoder encoding.LevelDecoder
+       definitionDecoder encoding.LevelDecoder
+
+       curPage     Page
+       curEncoding format.Encoding
+       curDecoder  encoding.TypedDecoder
+
+       // number of currently buffered values in the current page
+       numBuffered int64
+       // the number of values we've decoded so far
+       numDecoded int64
+       mem        memory.Allocator
+
+       decoders      map[format.Encoding]encoding.TypedDecoder
+       decoderTraits encoding.DecoderTraits
+
+       // is set when an error is encountered
+       err          error
+       defLvlBuffer []int16
+}
+
+// NewColumnReader returns a column reader for the provided column initialized 
with the given pagereader that will
+// provide the pages of data for this column. The type is determined from the 
column passed in.
+func NewColumnReader(descr *schema.Column, pageReader PageReader, mem 
memory.Allocator) ColumnChunkReader {
+       base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, 
decoders: make(map[format.Encoding]encoding.TypedDecoder)}
+       switch descr.PhysicalType() {
+       case parquet.Types.FixedLenByteArray:
+               base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
+               return &FixedLenByteArrayColumnChunkReader{base}
+       case parquet.Types.Float:
+               base.decoderTraits = &encoding.Float32DecoderTraits
+               return &Float32ColumnChunkReader{base}
+       case parquet.Types.Double:
+               base.decoderTraits = &encoding.Float64DecoderTraits
+               return &Float64ColumnChunkReader{base}
+       case parquet.Types.ByteArray:
+               base.decoderTraits = &encoding.ByteArrayDecoderTraits
+               return &ByteArrayColumnChunkReader{base}
+       case parquet.Types.Int32:
+               base.decoderTraits = &encoding.Int32DecoderTraits
+               return &Int32ColumnChunkReader{base}
+       case parquet.Types.Int64:
+               base.decoderTraits = &encoding.Int64DecoderTraits
+               return &Int64ColumnChunkReader{base}
+       case parquet.Types.Int96:
+               base.decoderTraits = &encoding.Int96DecoderTraits
+               return &Int96ColumnChunkReader{base}
+       case parquet.Types.Boolean:
+               base.decoderTraits = &encoding.BooleanDecoderTraits
+               return &BooleanColumnChunkReader{base}
+       }
+       return nil
+}
+
+func (c *columnChunkReader) Err() error                    { return c.err }
+func (c *columnChunkReader) Type() parquet.Type            { return 
c.descr.PhysicalType() }
+func (c *columnChunkReader) Descriptor() *schema.Column    { return c.descr }
+func (c *columnChunkReader) consumeBufferedValues(n int64) { c.numDecoded += n 
}
+func (c *columnChunkReader) numAvailValues() int64         { return 
c.numBuffered - c.numDecoded }
+func (c *columnChunkReader) pager() PageReader             { return c.rdr }
+func (c *columnChunkReader) setPageReader(rdr PageReader) {
+       c.rdr, c.err = rdr, nil
+       c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
+       c.numBuffered, c.numDecoded = 0, 0
+}
+
+func (c *columnChunkReader) getDefLvlBuffer(sz int64) []int16 {
+       if int64(len(c.defLvlBuffer)) < sz {
+               c.defLvlBuffer = make([]int16, sz)
+               return c.defLvlBuffer
+       }
+
+       return c.defLvlBuffer[:sz]
+}
+
+// HasNext returns whether there is more data to be read in this column
+// and row group.
+func (c *columnChunkReader) HasNext() bool {
+       if c.numBuffered == 0 || c.numDecoded == c.numBuffered {
+               return c.readNewPage() && c.numBuffered != 0
+       }
+       return true
+}
+
+func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
+       enc := page.encoding
+       if enc == format.Encoding_PLAIN_DICTIONARY || enc == 
format.Encoding_PLAIN {
+               enc = format.Encoding_RLE_DICTIONARY
+       }
+
+       if _, ok := c.decoders[enc]; ok {
+               return xerrors.New("parquet: column chunk cannot have more than 
one dictionary.")
+       }
+
+       switch page.Encoding() {
+       case format.Encoding_PLAIN, format.Encoding_PLAIN_DICTIONARY:
+               dict := c.decoderTraits.Decoder(parquet.Encodings.Plain, 
c.descr, false, c.mem)
+               dict.SetData(int(page.NumValues()), page.Data())
+
+               decoder := c.decoderTraits.Decoder(parquet.Encodings.Plain, 
c.descr, true, c.mem).(encoding.DictDecoder)
+               decoder.SetDict(dict)
+               c.decoders[enc] = decoder
+       default:
+               return xerrors.New("parquet: dictionary index must be plain 
encoding")
+       }
+
+       c.curDecoder = c.decoders[enc]
+       return nil
+}
+
+// read a new page from the page reader
+func (c *columnChunkReader) readNewPage() bool {
+       for c.rdr.Next() { // keep going until we get a data page
+               c.curPage = c.rdr.Page()
+               if c.curPage == nil {
+                       break
+               }
+
+               var lvlByteLen int64
+               switch p := c.curPage.(type) {
+               case *DictionaryPage:
+                       if err := c.configureDict(p); err != nil {
+                               c.err = err
+                               return false
+                       }
+                       continue
+               case *DataPageV1:
+                       lvlByteLen, c.err = c.initLevelDecodersV1(p, 
p.repLvlEncoding, p.defLvlEncoding)
+                       if c.err != nil {
+                               return false
+                       }
+               case *DataPageV2:
+                       lvlByteLen, c.err = c.initLevelDecodersV2(p)
+                       if c.err != nil {
+                               return false
+                       }
+               default:
+                       // we can skip non-data pages
+                       continue
+               }
+
+               c.err = c.initDataDecoder(c.curPage, lvlByteLen)
+               return c.err == nil
+       }
+       c.err = c.rdr.Err()
+       return false
+}
+
+func (c *columnChunkReader) initLevelDecodersV2(page *DataPageV2) (int64, 
error) {
+       c.numBuffered = int64(page.nvals)
+       c.numDecoded = 0
+       buf := page.Data()
+       totalLvlLen := int64(page.repLvlByteLen) + int64(page.defLvlByteLen)
+
+       if totalLvlLen > int64(len(buf)) {
+               return totalLvlLen, xerrors.New("parquet: data page too small 
for levels (corrupt header?)")
+       }
+
+       if c.descr.MaxRepetitionLevel() > 0 {
+               c.repetitionDecoder.SetDataV2(page.repLvlByteLen, 
c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
+               buf = buf[page.repLvlByteLen:]
+       }
+
+       if c.descr.MaxDefinitionLevel() > 0 {
+               c.definitionDecoder.SetDataV2(page.defLvlByteLen, 
c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
+       }
+
+       return totalLvlLen, nil
+}
+
+func (c *columnChunkReader) initLevelDecodersV1(page *DataPageV1, 
repLvlEncoding, defLvlEncoding format.Encoding) (int64, error) {
+       c.numBuffered = int64(page.nvals)
+       c.numDecoded = 0
+
+       buf := page.Data()
+       maxSize := len(buf)
+       levelsByteLen := int64(0)
+
+       // Data page layout: Repetition Levels - Definition Levels - encoded 
values.
+       // Levels are encoded as rle or bit-packed
+       if c.descr.MaxRepetitionLevel() > 0 {
+               repBytes, err := 
c.repetitionDecoder.SetData(parquet.Encoding(repLvlEncoding), 
c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
+               if err != nil {
+                       return levelsByteLen, err
+               }
+               buf = buf[repBytes:]
+               maxSize -= repBytes
+               levelsByteLen += int64(repBytes)
+       }
+
+       if c.descr.MaxDefinitionLevel() > 0 {
+               defBytes, err := 
c.definitionDecoder.SetData(parquet.Encoding(defLvlEncoding), 
c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
+               if err != nil {
+                       return levelsByteLen, err
+               }
+               levelsByteLen += int64(defBytes)
+               maxSize -= defBytes
+       }
+
+       return levelsByteLen, nil
+}
+
+func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error 
{
+       buf := page.Data()
+       if int64(len(buf)) < lvlByteLen {
+               return xerrors.New("parquet: page smaller than size of encoded 
levels")
+       }
+
+       buf = buf[lvlByteLen:]
+       encoding := page.Encoding()
+
+       if isDictIndexEncoding(encoding) {
+               encoding = format.Encoding_RLE_DICTIONARY
+       }
+
+       if decoder, ok := c.decoders[encoding]; ok {
+               c.curDecoder = decoder
+       } else {
+               switch encoding {
+               case format.Encoding_PLAIN,
+                       format.Encoding_DELTA_BYTE_ARRAY,
+                       format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
+                       format.Encoding_DELTA_BINARY_PACKED:
+                       c.curDecoder = 
c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
+                       c.decoders[encoding] = c.curDecoder
+               case format.Encoding_RLE_DICTIONARY:
+                       return xerrors.New("parquet: dictionary page must be 
before data page")
+               case format.Encoding_BYTE_STREAM_SPLIT:
+                       return xerrors.Errorf("parquet: unsupported data 
encoding %s", encoding)
+               default:
+                       return xerrors.Errorf("parquet: unknown encoding type 
%s", encoding)
+               }
+       }
+
+       c.curEncoding = encoding
+       c.curDecoder.SetData(int(c.numBuffered), buf)
+       return nil
+}
+
+// readDefinitionLevels decodes the definition levels from the page and returns
+// it returns the total number of levels that were decoded (and thus populated
+// in the passed in slice) and the number of physical values that exist to read
+// (the number of levels that are equal to the max definition level).
+//
+// If the max definition level is 0, the assumption is that there no nulls in 
the
+// column and therefore no definition levels to read, so it will always return 
0, 0
+func (c *columnChunkReader) readDefinitionLevels(levels []int16) (totalDecoded 
int, valuesToRead int64) {
+       if c.descr.MaxDefinitionLevel() == 0 {
+               return 0, 0
+       }
+
+       return c.definitionDecoder.Decode(levels)
+}
+
+// readRepetitionLevels decodes the repetition levels from the page and returns
+// the total number of values decoded (and thus populated in the passed in 
levels
+// slice).
+//
+// If max repetition level is 0, it is assumed there are no repetition levels,
+// and thus will always return 0.
+func (c *columnChunkReader) readRepetitionLevels(levels []int16) int {
+       if c.descr.MaxRepetitionLevel() == 0 {
+               return 0
+       }
+
+       nlevels, _ := c.repetitionDecoder.Decode(levels)
+       return nlevels
+}
+
+// determineNumToRead reads the definition levels (and optionally populates 
the repetition levels)
+// in order to determine how many values need to be read to fulfill this batch 
read.
+//
+// batchLen is the number of values it is desired to read. defLvls must be 
either nil (in which case
+// a buffer will be used) or must be at least batchLen in length to be safe. 
repLvls should be either nil
+// (in which case it is ignored) or should be at least batchLen in length to 
be safe.
+//
+// In the return values: ndef is the number of definition levels that were 
actually read in which will
+// typically be the minimum of batchLen and numAvailValues.
+// toRead is the number of physical values that should be read in based on the 
definition levels (the number
+// of definition levels that were equal to maxDefinitionLevel). and err being 
either nil or any error encountered
+func (c *columnChunkReader) determineNumToRead(batchLen int64, defLvls, 
repLvls []int16) (ndefs int, toRead int64, err error) {
+       if !c.HasNext() {
+               return 0, 0, c.err
+       }
+
+       size := utils.Min(batchLen, c.numBuffered-c.numDecoded)
+
+       if c.descr.MaxDefinitionLevel() > 0 {
+               if defLvls == nil {
+                       defLvls = c.getDefLvlBuffer(size)
+               }
+               ndefs, toRead = c.readDefinitionLevels(defLvls[:size])
+       } else {
+               toRead = size
+       }
+
+       if c.descr.MaxRepetitionLevel() > 0 && repLvls != nil {
+               nreps := c.readRepetitionLevels(repLvls[:size])
+               if defLvls != nil && ndefs != nreps {
+                       err = xerrors.New("parquet: number of decoded rep/def 
levels did not match")
+               }
+       }
+       return
+}
+
+// skipValues some number of rows using readFn as the function to read the 
data and throw it away.
+// If we can skipValues a whole page based on its metadata, then we do so, 
otherwise we read the
+// page until we have skipped the number of rows desired.
+func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, 
buf []byte) (int64, error)) (int64, error) {
+       var err error
+       toskip := nvalues
+       for c.HasNext() && toskip > 0 {
+               // if number to skip is more than the number of undecoded 
values, skip the page
+               if toskip > (c.numBuffered - c.numDecoded) {
+                       toskip -= c.numBuffered - c.numDecoded
+                       c.numDecoded = c.numBuffered
+               } else {
+                       var (
+                               batchSize int64 = 1024
+                               valsRead  int64 = 0
+                       )
+
+                       scratch := memory.NewResizableBuffer(c.mem)
+                       
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize)))
+                       defer scratch.Release()
+
+                       for {
+                               batchSize = utils.Min(batchSize, toskip)
+                               valsRead, err = readFn(batchSize, scratch.Buf())
+                               toskip -= valsRead
+                               if valsRead <= 0 || toskip <= 0 || err != nil {
+                                       break
+                               }
+                       }
+               }
+       }
+       if c.err != nil {
+               err = c.err
+       }
+       return nvalues - toskip, err
+}
+
+type readerFunc func(int64, int64) (int, error)
+
+// base function for reading a batch of values, this will read until it either 
reads in batchSize values or
+// it hits the end of the column chunk, including reading multiple pages.
+//
+// totalValues is the total number of values which were read in, and thus 
would be the total number
+// of definition levels and repetition levels which were populated (if they 
were non-nil). totalRead
+// is the number of physical values that were read in (ie: the number of 
non-null values)
+func (c *columnChunkReader) readBatch(batchSize int64, defLvls, repLvls 
[]int16, readFn readerFunc) (totalLvls int64, totalRead int, err error) {
+       var (
+               read   int
+               defs   []int16
+               reps   []int16
+               ndefs  int
+               toRead int64
+       )
+
+       for c.HasNext() && totalLvls < batchSize && err == nil {
+               if defLvls != nil {
+                       defs = defLvls[totalLvls:]
+               }
+               if repLvls != nil {
+                       reps = repLvls[totalLvls:]
+               }
+               ndefs, toRead, err = c.determineNumToRead(batchSize-totalLvls, 
defs, reps)
+               if err != nil {
+                       return totalLvls, totalRead, err
+               }
+
+               read, err = readFn(int64(totalRead), toRead)
+               // the total number of values processed here is the maximum of
+               // the number of definition levels or the number of physical 
values read.
+               // if this is a required field, ndefs will be 0 since there is 
no definition
+               // levels stored with it and `read` will be the number of 
values, otherwise
+               // we use ndefs since it will be equal to or greater than read.
+               totalVals := int64(utils.MaxInt(ndefs, read))
+               c.consumeBufferedValues(totalVals)
+
+               totalLvls += totalVals
+               totalRead += read
+       }
+       return totalLvls, totalRead, err
+}
diff --git a/go/parquet/file/column_reader_test.go 
b/go/parquet/file/column_reader_test.go
new file mode 100644
index 0000000..d22e365
--- /dev/null
+++ b/go/parquet/file/column_reader_test.go
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file_test
+
+import (
+       "math"
+       "math/rand"
+       "reflect"
+       "testing"
+
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/file"
+       "github.com/apache/arrow/go/parquet/internal/testutils"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/apache/arrow/go/parquet/schema"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/suite"
+)
+
+func initValues(values reflect.Value) {
+       if values.Kind() != reflect.Slice {
+               panic("must init values with slice")
+       }
+
+       r := rand.New(rand.NewSource(0))
+       typ := values.Type().Elem()
+       switch {
+       case typ.Bits() <= 32:
+               max := int64(math.MaxInt32)
+               min := int64(math.MinInt32)
+               for i := 0; i < values.Len(); i++ {
+                       values.Index(i).Set(reflect.ValueOf(r.Int63n(max-min+1) 
+ min).Convert(reflect.TypeOf(int32(0))))
+               }
+       case typ.Bits() <= 64:
+               max := int64(math.MaxInt64)
+               min := int64(math.MinInt64)
+               for i := 0; i < values.Len(); i++ {
+                       values.Index(i).Set(reflect.ValueOf(r.Int63n(max-min+1) 
+ min))
+               }
+       }
+}
+
+func initDictValues(values reflect.Value, numDicts int) {
+       repeatFactor := values.Len() / numDicts
+       initValues(values)
+       // add some repeated values
+       for j := 1; j < repeatFactor; j++ {
+               for i := 0; i < numDicts; i++ {
+                       values.Index(numDicts*j + i).Set(values.Index(i))
+               }
+       }
+       // computed only dict_per_page * repeat_factor - 1 values < num_values 
compute remaining
+       for i := numDicts * repeatFactor; i < values.Len(); i++ {
+               values.Index(i).Set(values.Index(i - numDicts*repeatFactor))
+       }
+}
+
+func makePages(version parquet.DataPageVersion, d *schema.Column, npages, 
lvlsPerPage int, typ reflect.Type, enc parquet.Encoding) ([]file.Page, int, 
reflect.Value, []int16, []int16) {
+       nlevels := lvlsPerPage * npages
+       nvalues := 0
+
+       maxDef := d.MaxDefinitionLevel()
+       maxRep := d.MaxRepetitionLevel()
+
+       var (
+               defLevels []int16
+               repLevels []int16
+       )
+
+       valuesPerPage := make([]int, npages)
+       if maxDef > 0 {
+               defLevels = make([]int16, nlevels)
+               testutils.FillRandomInt16(0, 0, maxDef, defLevels)
+               for idx := range valuesPerPage {
+                       numPerPage := 0
+                       for i := 0; i < lvlsPerPage; i++ {
+                               if defLevels[i+idx*lvlsPerPage] == maxDef {
+                                       numPerPage++
+                                       nvalues++
+                               }
+                       }
+                       valuesPerPage[idx] = numPerPage
+               }
+       } else {
+               nvalues = nlevels
+               valuesPerPage[0] = lvlsPerPage
+               for i := 1; i < len(valuesPerPage); i *= 2 {
+                       copy(valuesPerPage[i:], valuesPerPage[:i])
+               }
+       }
+
+       if maxRep > 0 {
+               repLevels = make([]int16, nlevels)
+               testutils.FillRandomInt16(0, 0, maxRep, repLevels)
+       }
+
+       values := reflect.MakeSlice(reflect.SliceOf(typ), nvalues, nvalues)
+       if enc == parquet.Encodings.Plain {
+               initValues(values)
+               return testutils.PaginatePlain(version, d, values, defLevels, 
repLevels, maxDef, maxRep, lvlsPerPage, valuesPerPage, 
parquet.Encodings.Plain), nvalues, values, defLevels, repLevels
+       } else if enc == parquet.Encodings.PlainDict || enc == 
parquet.Encodings.RLEDict {
+               initDictValues(values, lvlsPerPage)
+               return testutils.PaginateDict(version, d, values, defLevels, 
repLevels, maxDef, maxRep, lvlsPerPage, valuesPerPage, 
parquet.Encodings.RLEDict), nvalues, values, defLevels, repLevels
+       }
+       panic("invalid encoding type for make pages")
+}
+
+func compareVectorWithDefLevels(left, right reflect.Value, defLevels []int16, 
maxDef, maxRep int16) assert.Comparison {
+       return func() bool {
+               if left.Kind() != reflect.Slice || right.Kind() != 
reflect.Slice {
+                       return false
+               }
+
+               if left.Type().Elem() != right.Type().Elem() {
+                       return false
+               }
+
+               iLeft, iRight := 0, 0
+               for _, def := range defLevels {
+                       if def == maxDef {
+                               if 
!reflect.DeepEqual(left.Index(iLeft).Interface(), 
right.Index(iRight).Interface()) {
+                                       return false
+                               }
+                               iLeft++
+                               iRight++
+                       } else if def == (maxDef - 1) {
+                               // null entry on the lowest nested level
+                               iRight++
+                       } else if def < (maxDef - 1) {
+                               // null entry on higher nesting level, only 
supported for non-repeating data
+                               if maxRep == 0 {
+                                       iRight++
+                               }
+                       }
+               }
+               return true
+       }
+}
+
+var mem = memory.DefaultAllocator
+
+type PrimitiveReaderSuite struct {
+       suite.Suite
+
+       dataPageVersion parquet.DataPageVersion
+       pager           file.PageReader
+       reader          file.ColumnChunkReader
+       pages           []file.Page
+       values          reflect.Value
+       defLevels       []int16
+       repLevels       []int16
+       nlevels         int
+       nvalues         int
+       maxDefLvl       int16
+       maxRepLvl       int16
+}
+
+func (p *PrimitiveReaderSuite) TearDownTest() {
+       p.clear()
+}
+
+func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
+       m := new(testutils.MockPageReader)
+       m.Test(p.T())
+       m.TestData().Set("pages", p.pages)
+       m.On("Err").Return((error)(nil))
+       p.pager = m
+       p.reader = file.NewColumnReader(d, m, mem)
+}
+
+func (p *PrimitiveReaderSuite) checkResults() {
+       vresult := make([]int32, p.nvalues)
+       dresult := make([]int16, p.nlevels)
+       rresult := make([]int16, p.nlevels)
+
+       var (
+               read        int64 = 0
+               totalRead   int   = 0
+               batchActual int   = 0
+               batchSize   int32 = 8
+               batch       int   = 0
+       )
+
+       rdr := p.reader.(*file.Int32ColumnChunkReader)
+       p.Require().NotNil(rdr)
+
+       // this will cover both cases:
+       // 1) batch size < page size (multiple ReadBatch from a single page)
+       // 2) batch size > page size (BatchRead limits to single page)
+       for {
+               read, batch, _ = rdr.ReadBatch(int64(batchSize), 
vresult[totalRead:], dresult[batchActual:], rresult[batchActual:])
+               totalRead += batch
+               batchActual += int(read)
+               batchSize = int32(utils.MinInt(1<<24, 
utils.MaxInt(int(batchSize*2), 4096)))
+               if batch <= 0 {
+                       break
+               }
+       }
+
+       p.Equal(p.nlevels, batchActual)
+       p.Equal(p.nvalues, totalRead)
+       p.Equal(p.values.Interface(), vresult)
+       if p.maxDefLvl > 0 {
+               p.Equal(p.defLevels, dresult)
+       }
+       if p.maxRepLvl > 0 {
+               p.Equal(p.repLevels, rresult)
+       }
+
+       // catch improper writes at EOS
+       read, batchActual, _ = rdr.ReadBatch(5, vresult, nil, nil)
+       p.Zero(batchActual)
+       p.Zero(read)
+}
+
+func (p *PrimitiveReaderSuite) clear() {
+       p.values = reflect.ValueOf(nil)
+       p.defLevels = nil
+       p.repLevels = nil
+       p.pages = nil
+       p.pager = nil
+       p.reader = nil
+}
+
+func (p *PrimitiveReaderSuite) testPlain(npages, levels int, d *schema.Column) 
{
+       p.pages, p.nvalues, p.values, p.defLevels, p.repLevels = 
makePages(p.dataPageVersion, d, npages, levels, reflect.TypeOf(int32(0)), 
parquet.Encodings.Plain)
+       p.nlevels = npages * levels
+       p.initReader(d)
+       p.checkResults()
+       p.clear()
+}
+
+func (p *PrimitiveReaderSuite) testDict(npages, levels int, d *schema.Column) {
+       p.pages, p.nvalues, p.values, p.defLevels, p.repLevels = 
makePages(p.dataPageVersion, d, npages, levels, reflect.TypeOf(int32(0)), 
parquet.Encodings.RLEDict)
+       p.nlevels = npages * levels
+       p.initReader(d)
+       p.checkResults()
+       p.clear()
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatRequired() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 50
+       )
+
+       p.maxDefLvl = 0
+       p.maxRepLvl = 0
+
+       typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+       d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+       p.testPlain(npages, levelsPerPage, d)
+       p.testDict(npages, levelsPerPage, d)
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatOptional() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 50
+       )
+
+       p.maxDefLvl = 4
+       p.maxRepLvl = 0
+       typ := schema.NewInt32Node("b", parquet.Repetitions.Optional, -1)
+       d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+       p.testPlain(npages, levelsPerPage, d)
+       p.testDict(npages, levelsPerPage, d)
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatRepeated() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 50
+       )
+
+       p.maxDefLvl = 4
+       p.maxRepLvl = 2
+       typ := schema.NewInt32Node("c", parquet.Repetitions.Repeated, -1)
+       d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+       p.testPlain(npages, levelsPerPage, d)
+       p.testDict(npages, levelsPerPage, d)
+}
+
+func (p *PrimitiveReaderSuite) TestReadBatchMultiPage() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 3
+       )
+
+       p.maxDefLvl = 0
+       p.maxRepLvl = 0
+       typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+       d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+       p.pages, p.nvalues, p.values, p.defLevels, p.repLevels = 
makePages(p.dataPageVersion, d, npages, levelsPerPage, 
reflect.TypeOf(int32(0)), parquet.Encodings.Plain)
+       p.initReader(d)
+
+       vresult := make([]int32, levelsPerPage*npages)
+       dresult := make([]int16, levelsPerPage*npages)
+       rresult := make([]int16, levelsPerPage*npages)
+
+       rdr := p.reader.(*file.Int32ColumnChunkReader)
+       total, read, err := rdr.ReadBatch(int64(levelsPerPage*npages), vresult, 
dresult, rresult)
+       p.NoError(err)
+       p.EqualValues(levelsPerPage*npages, total)
+       p.EqualValues(levelsPerPage*npages, read)
+}
+
+func (p *PrimitiveReaderSuite) TestInt32FlatRequiredSkip() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 5
+       )
+
+       p.maxDefLvl = 0
+       p.maxRepLvl = 0
+       typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+       d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+       p.pages, p.nvalues, p.values, p.defLevels, p.repLevels = 
makePages(p.dataPageVersion, d, npages, levelsPerPage, 
reflect.TypeOf(int32(0)), parquet.Encodings.Plain)
+       p.initReader(d)
+
+       vresult := make([]int32, levelsPerPage/2)
+       dresult := make([]int16, levelsPerPage/2)
+       rresult := make([]int16, levelsPerPage/2)
+
+       rdr := p.reader.(*file.Int32ColumnChunkReader)
+
+       p.Run("skip_size > page_size", func() {
+               // Skip first 2 pages
+               skipped, _ := rdr.Skip(int64(2 * levelsPerPage))
+               p.Equal(int64(2*levelsPerPage), skipped)
+
+               rdr.ReadBatch(int64(levelsPerPage/2), vresult, dresult, rresult)
+               subVals := p.values.Slice(2*levelsPerPage, 
int(2.5*float64(levelsPerPage))).Interface().([]int32)
+               p.Equal(subVals, vresult)
+       })
+
+       p.Run("skip_size == page_size", func() {
+               // skip across two pages
+               skipped, _ := rdr.Skip(int64(levelsPerPage))
+               p.Equal(int64(levelsPerPage), skipped)
+               // read half a page
+               rdr.ReadBatch(int64(levelsPerPage/2), vresult, dresult, rresult)
+               subVals := p.values.Slice(int(3.5*float64(levelsPerPage)), 
4*levelsPerPage).Interface().([]int32)
+               p.Equal(subVals, vresult)
+       })
+
+       p.Run("skip_size < page_size", func() {
+               // skip limited to a single page
+               // Skip half a page
+               skipped, _ := rdr.Skip(int64(levelsPerPage / 2))
+               p.Equal(int64(0.5*float32(levelsPerPage)), skipped)
+               // Read half a page
+               rdr.ReadBatch(int64(levelsPerPage/2), vresult, dresult, rresult)
+               subVals := p.values.Slice(int(4.5*float64(levelsPerPage)), 
p.values.Len()).Interface().([]int32)
+               p.Equal(subVals, vresult)
+       })
+}
+
+func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() {
+       p.maxDefLvl = 0
+       p.maxRepLvl = 0
+       typ := schema.NewInt32Node("a", parquet.Repetitions.Required, -1)
+       descr := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+       dummy := memory.NewResizableBuffer(mem)
+
+       p.Run("Dict: Plain, Data: RLEDict", func() {
+               dictPage := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.Plain)
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+
+               p.pages = append(p.pages, dictPage, dataPage)
+               p.initReader(descr)
+               p.NotPanics(func() { p.reader.HasNext() })
+               p.NoError(p.reader.Err())
+               p.pages = p.pages[:0]
+       })
+
+       p.Run("Dict: Plain Dictionary, Data: Plain Dictionary", func() {
+               dictPage := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.PlainDict)
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.PlainDict, dummy, nil, nil, 0, 0)
+               p.pages = append(p.pages, dictPage, dataPage)
+               p.initReader(descr)
+               p.NotPanics(func() { p.reader.HasNext() })
+               p.NoError(p.reader.Err())
+               p.pages = p.pages[:0]
+       })
+
+       p.Run("Panic if dict page not first", func() {
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+               p.pages = append(p.pages, dataPage)
+               p.initReader(descr)
+               p.NotPanics(func() { p.False(p.reader.HasNext()) })
+               p.Error(p.reader.Err())
+               p.pages = p.pages[:0]
+       })
+
+       p.Run("Only RLE is supported", func() {
+               dictPage := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.DeltaByteArray)
+               p.pages = append(p.pages, dictPage)
+               p.initReader(descr)
+               p.NotPanics(func() { p.False(p.reader.HasNext()) })
+               p.Error(p.reader.Err())
+               p.pages = p.pages[:0]
+       })
+
+       p.Run("Cannot have more than one dict", func() {
+               dictPage1 := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.PlainDict)
+               dictPage2 := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.Plain)
+               p.pages = append(p.pages, dictPage1, dictPage2)
+               p.initReader(descr)
+               p.NotPanics(func() { p.False(p.reader.HasNext()) })
+               p.Error(p.reader.Err())
+               p.pages = p.pages[:0]
+       })
+
+       p.Run("Unsupported encoding", func() {
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.DeltaByteArray, dummy, nil, nil, 0, 0)
+               p.pages = append(p.pages, dataPage)
+               p.initReader(descr)
+               p.Panics(func() { p.reader.HasNext() })
+               // p.Error(p.reader.Err())
+               p.pages = p.pages[:0]
+       })
+
+       p.pages = p.pages[:2]
+}
+
+func TestPrimitiveReader(t *testing.T) {
+       t.Parallel()
+       t.Run("datapage v1", func(t *testing.T) {
+               suite.Run(t, new(PrimitiveReaderSuite))
+       })
+       t.Run("datapage v2", func(t *testing.T) {
+               suite.Run(t, &PrimitiveReaderSuite{dataPageVersion: 
parquet.DataPageV2})
+       })
+}
diff --git a/go/parquet/file/column_reader_types.gen.go 
b/go/parquet/file/column_reader_types.gen.go
new file mode 100644
index 0000000..ab1fd53
--- /dev/null
+++ b/go/parquet/file/column_reader_types.gen.go
@@ -0,0 +1,299 @@
+// Code generated by column_reader_types.gen.go.tmpl. DO NOT EDIT.
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "unsafe"
+
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+)
+
+// Int32ColumnChunkReader is the Typed Column chunk reader instance for reading
+// Int32 column data.
+type Int32ColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Int32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               arrow.Int32Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *Int32ColumnChunkReader) ReadBatch(batchSize int64, values []int32, 
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.Int32Decoder).Decode(values[start : start+len])
+       })
+}
+
+// Int64ColumnChunkReader is the Typed Column chunk reader instance for reading
+// Int64 column data.
+type Int64ColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Int64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               arrow.Int64Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *Int64ColumnChunkReader) ReadBatch(batchSize int64, values []int64, 
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.Int64Decoder).Decode(values[start : start+len])
+       })
+}
+
+// Int96ColumnChunkReader is the Typed Column chunk reader instance for reading
+// Int96 column data.
+type Int96ColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Int96ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               parquet.Int96Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *Int96ColumnChunkReader) ReadBatch(batchSize int64, values 
[]parquet.Int96, defLvls, repLvls []int16) (total int64, valuesRead int, err 
error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.Int96Decoder).Decode(values[start : start+len])
+       })
+}
+
+// Float32ColumnChunkReader is the Typed Column chunk reader instance for 
reading
+// Float32 column data.
+type Float32ColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Float32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               arrow.Float32Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *Float32ColumnChunkReader) ReadBatch(batchSize int64, values 
[]float32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.Float32Decoder).Decode(values[start : start+len])
+       })
+}
+
+// Float64ColumnChunkReader is the Typed Column chunk reader instance for 
reading
+// Float64 column data.
+type Float64ColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *Float64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               arrow.Float64Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *Float64ColumnChunkReader) ReadBatch(batchSize int64, values 
[]float64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.Float64Decoder).Decode(values[start : start+len])
+       })
+}
+
+// BooleanColumnChunkReader is the Typed Column chunk reader instance for 
reading
+// Boolean column data.
+type BooleanColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *BooleanColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               *(*[]bool)(unsafe.Pointer(&buf)),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *BooleanColumnChunkReader) ReadBatch(batchSize int64, values []bool, 
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[start : start+len])
+       })
+}
+
+// ByteArrayColumnChunkReader is the Typed Column chunk reader instance for 
reading
+// ByteArray column data.
+type ByteArrayColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64) (int64, error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               parquet.ByteArrayTraits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *ByteArrayColumnChunkReader) ReadBatch(batchSize int64, values 
[]parquet.ByteArray, defLvls, repLvls []int16) (total int64, valuesRead int, 
err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+       })
+}
+
+// FixedLenByteArrayColumnChunkReader is the Typed Column chunk reader 
instance for reading
+// FixedLenByteArray column data.
+type FixedLenByteArrayColumnChunkReader struct {
+       columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues int64) (int64, 
error) {
+       return cr.columnChunkReader.skipValues(nvalues,
+               func(batch int64, buf []byte) (int64, error) {
+                       vals, _, err := cr.ReadBatch(batch,
+                               
parquet.FixedLenByteArrayTraits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf),
+                               arrow.Int16Traits.CastFromBytes(buf))
+                       return vals, err
+               })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *FixedLenByteArrayColumnChunkReader) ReadBatch(batchSize int64, 
values []parquet.FixedLenByteArray, defLvls, repLvls []int16) (total int64, 
valuesRead int, err error) {
+       return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+               return 
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start : 
start+len])
+       })
+}
diff --git a/go/parquet/file/column_reader_types.gen.go.tmpl 
b/go/parquet/file/column_reader_types.gen.go.tmpl
new file mode 100644
index 0000000..23b7d3e
--- /dev/null
+++ b/go/parquet/file/column_reader_types.gen.go.tmpl
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+    "github.com/apache/arrow/go/parquet"
+    "github.com/apache/arrow/go/parquet/internal/encoding"
+)
+
+{{range .In}}
+// {{.Name}}ColumnChunkReader is the Typed Column chunk reader instance for 
reading
+// {{.Name}} column data.
+type {{.Name}}ColumnChunkReader struct {
+  columnChunkReader
+}
+
+// Skip skips the next nvalues so that the next call to ReadBatch
+// will start reading *after* the skipped values.
+func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64) (int64, error) {
+  return cr.columnChunkReader.skipValues(nvalues,
+    func(batch int64, buf []byte) (int64, error) {
+      vals, _, err := cr.ReadBatch(batch,
+        {{- if ne .Name "Boolean"}}
+        {{.prefix}}.{{.Name}}Traits.CastFromBytes(buf),
+        {{- else}}
+        *(*[]bool)(unsafe.Pointer(&buf)),
+        {{- end}}
+        arrow.Int16Traits.CastFromBytes(buf),
+        arrow.Int16Traits.CastFromBytes(buf))
+      return vals, err
+    })
+}
+
+// ReadBatch reads batchSize values from the column.
+//
+// Returns error if values is not at least big enough to hold the number of 
values that will be read.
+//
+// defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
+// at least large enough to hold the number of values that will be read.
+//
+// total is the number of rows that were read, valuesRead is the actual number 
of physical values
+// that were read excluding nulls
+func (cr *{{.Name}}ColumnChunkReader) ReadBatch(batchSize int64, values 
[]{{.name}}, defLvls, repLvls []int16) (total int64, valuesRead int, err error) 
{
+  return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
+    return 
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+  })
+}
+{{end}}
diff --git a/go/parquet/file/file_reader.go b/go/parquet/file/file_reader.go
new file mode 100644
index 0000000..8b95223
--- /dev/null
+++ b/go/parquet/file/file_reader.go
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+       "os"
+
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encryption"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "golang.org/x/exp/mmap"
+       "golang.org/x/xerrors"
+)
+
+const (
+       footerSize uint32 = 8
+)
+
+var (
+       magicBytes                  = []byte("PAR1")
+       magicEBytes                 = []byte("PARE")
+       errInconsistentFileMetadata = xerrors.New("parquet: file is smaller 
than indicated metadata size")
+)
+
+// Reader is the main interface for reading a parquet file
+type Reader struct {
+       r             parquet.ReaderAtSeeker
+       props         *parquet.ReaderProperties
+       metadata      *metadata.FileMetaData
+       footerOffset  int64
+       fileDecryptor encryption.FileDecryptor
+}
+
+// an adapter for mmap'd files
+type mmapAdapter struct {
+       *mmap.ReaderAt
+
+       pos int64
+}
+
+func (m *mmapAdapter) Close() error {
+       return m.ReaderAt.Close()
+}
+
+func (m *mmapAdapter) ReadAt(p []byte, off int64) (int, error) {
+       return m.ReaderAt.ReadAt(p, off)
+}
+
+func (m *mmapAdapter) Read(p []byte) (n int, err error) {
+       n, err = m.ReaderAt.ReadAt(p, m.pos)
+       m.pos += int64(n)
+       return
+}
+
+func (m *mmapAdapter) Seek(offset int64, whence int) (int64, error) {
+       newPos, offs := int64(0), offset
+       switch whence {
+       case io.SeekStart:
+               newPos = offs
+       case io.SeekCurrent:
+               newPos = m.pos + offs
+       case io.SeekEnd:
+               newPos = int64(m.ReaderAt.Len()) + offs
+       }
+       if newPos < 0 {
+               return 0, xerrors.New("negative result pos")
+       }
+       if newPos > int64(m.ReaderAt.Len()) {
+               return 0, xerrors.New("new position exceeds size of file")
+       }
+       m.pos = newPos
+       return newPos, nil
+}
+
+type ReadOption func(*Reader)
+
+// WithReadProps specifies a specific reader properties instance to use, rather
+// than using the default ReaderProperties.
+func WithReadProps(props *parquet.ReaderProperties) ReadOption {
+       return func(r *Reader) {
+               r.props = props
+       }
+}
+
+// WithMetadata allows providing a specific FileMetaData object rather than 
reading
+// the file metadata from the file itself.
+func WithMetadata(m *metadata.FileMetaData) ReadOption {
+       return func(r *Reader) {
+               r.metadata = m
+       }
+}
+
+// OpenParquetFile will return a Reader for the given parquet file on the 
local file system.
+//
+// Optionally the file can be memory mapped for faster reading. If no read 
properties are provided
+// then the default ReaderProperties will be used. The WithMetadata option can 
be used to provide
+// a FileMetaData object rather than reading the file metadata from the file.
+func OpenParquetFile(filename string, memoryMap bool, opts ...ReadOption) 
(*Reader, error) {
+       var source parquet.ReaderAtSeeker
+
+       var err error
+       if memoryMap {
+               rdr, err := mmap.Open(filename)
+               if err != nil {
+                       return nil, err
+               }
+               source = &mmapAdapter{rdr, 0}
+       } else {
+               source, err = os.Open(filename)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return NewParquetReader(source, opts...)
+}
+
+// NewParquetReader returns a FileReader instance that reads a parquet file 
which can be read from r.
+// This reader needs to support Read, ReadAt and Seeking.
+//
+// If no read properties are provided then the default ReaderProperties will 
be used. The WithMetadata
+// option can be used to provide a FileMetaData object rather than reading the 
file metadata from the file.
+func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, 
error) {
+       var err error
+       f := &Reader{r: r}
+       for _, o := range opts {
+               o(f)
+       }
+
+       if f.footerOffset <= 0 {
+               f.footerOffset, err = r.Seek(0, io.SeekEnd)
+               if err != nil {
+                       return nil, xerrors.Errorf("parquet: could not retrieve 
footer offset: %w", err)
+               }
+       }
+
+       if f.props == nil {
+               f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
+       }
+
+       if f.metadata == nil {
+               return f, f.parseMetaData()
+       }
+
+       return f, nil
+}
+
+// Close will close the current reader, and if the underlying reader being used
+// is an `io.Closer` then Close will be called on it too.
+func (f *Reader) Close() error {
+       if r, ok := f.r.(io.Closer); ok {
+               return r.Close()
+       }
+       return nil
+}
+
+// MetaData returns the underlying FileMetadata object
+func (f *Reader) MetaData() *metadata.FileMetaData { return f.metadata }
+
+// parseMetaData handles parsing the metadata from the opened file.
+func (f *Reader) parseMetaData() error {
+       if f.footerOffset <= int64(footerSize) {
+               return xerrors.Errorf("parquet: file too small (size=%d)", 
f.footerOffset)
+       }
+
+       buf := make([]byte, footerSize)
+       // backup 8 bytes to read the footer size (first four bytes) and the 
magic bytes (last 4 bytes)
+       n, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize))
+       if err != nil {
+               return xerrors.Errorf("parquet: could not read footer: %w", err)
+       }
+       if n != len(buf) {
+               return xerrors.Errorf("parquet: could not read %d bytes from 
end of file", len(buf))
+       }
+
+       size := int64(binary.LittleEndian.Uint32(buf[:4]))
+       if size < 0 || size+int64(footerSize) > f.footerOffset {
+               return errInconsistentFileMetadata
+       }
+
+       fileDecryptProps := f.props.FileDecryptProps
+
+       switch {
+       case bytes.Equal(buf[4:], magicBytes): // non-encrypted metadata
+               buf = make([]byte, size)
+               if _, err := f.r.ReadAt(buf, 
f.footerOffset-int64(footerSize)-size); err != nil {
+                       return xerrors.Errorf("parquet: could not read footer: 
%w", err)
+               }
+
+               f.metadata, err = metadata.NewFileMetaData(buf, nil)
+               if err != nil {
+                       return xerrors.Errorf("parquet: could not read footer: 
%w", err)
+               }
+
+               if !f.metadata.IsSetEncryptionAlgorithm() {
+                       if fileDecryptProps != nil && 
!fileDecryptProps.PlaintextFilesAllowed() {
+                               return xerrors.Errorf("parquet: applying 
decryption properties on plaintext file")
+                       }
+               } else {
+                       if err := 
f.parseMetaDataEncryptedFilePlaintextFooter(fileDecryptProps, buf); err != nil {
+                               return err
+                       }
+               }
+       case bytes.Equal(buf[4:], magicEBytes): // encrypted metadata
+               buf = make([]byte, size)
+               if _, err := f.r.ReadAt(buf, 
f.footerOffset-int64(footerSize)-size); err != nil {
+                       return xerrors.Errorf("parquet: could not read footer: 
%w", err)
+               }
+
+               if fileDecryptProps == nil {
+                       return xerrors.New("could not read encrypted metadata, 
no decryption found in reader's properties")
+               }
+
+               fileCryptoMetadata, err := metadata.NewFileCryptoMetaData(buf)
+               if err != nil {
+                       return err
+               }
+               algo := fileCryptoMetadata.EncryptionAlgorithm()
+               fileAad, err := f.handleAadPrefix(fileDecryptProps, &algo)
+               if err != nil {
+                       return err
+               }
+               f.fileDecryptor = encryption.NewFileDecryptor(fileDecryptProps, 
fileAad, algo.Algo, string(fileCryptoMetadata.KeyMetadata()), 
f.props.Allocator())
+
+               f.metadata, err = 
metadata.NewFileMetaData(buf[fileCryptoMetadata.Len():], f.fileDecryptor)
+               if err != nil {
+                       return xerrors.Errorf("parquet: could not read footer: 
%w", err)
+               }
+       default:
+               return xerrors.Errorf("parquet: magic bytes not found in 
footer. Either the file is corrupted or this isn't a parquet file")
+       }
+
+       return nil
+}
+
+func (f *Reader) handleAadPrefix(fileDecrypt 
*parquet.FileDecryptionProperties, algo *parquet.Algorithm) (string, error) {
+       aadPrefixInProps := fileDecrypt.AadPrefix()
+       aadPrefix := []byte(aadPrefixInProps)
+       fileHasAadPrefix := algo.Aad.AadPrefix != nil && 
len(algo.Aad.AadPrefix) > 0
+       aadPrefixInFile := algo.Aad.AadPrefix
+
+       if algo.Aad.SupplyAadPrefix && aadPrefixInProps == "" {
+               return "", xerrors.New("AAD Prefix used for file encryption but 
not stored in file and not suppliedin decryption props")
+       }
+
+       if fileHasAadPrefix {
+               if aadPrefixInProps != "" {
+                       if aadPrefixInProps != string(aadPrefixInFile) {
+                               return "", xerrors.New("AAD prefix in file and 
in properties but not the same")
+                       }
+               }
+               aadPrefix = aadPrefixInFile
+               if fileDecrypt.Verifier != nil {
+                       fileDecrypt.Verifier.Verify(string(aadPrefix))
+               }
+       } else {
+               if !algo.Aad.SupplyAadPrefix && aadPrefixInProps != "" {
+                       return "", xerrors.New("AAD Prefix set in 
decryptionproperties but was not used for file encryption")
+               }
+               if fileDecrypt.Verifier != nil {
+                       return "", xerrors.New("AAD Prefix Verifier is set but 
AAD Prefix not found in file")
+               }
+       }
+       return string(append(aadPrefix, algo.Aad.AadFileUnique...)), nil
+}
+
+func (f *Reader) parseMetaDataEncryptedFilePlaintextFooter(decryptProps 
*parquet.FileDecryptionProperties, data []byte) error {
+       if decryptProps != nil {
+               algo := f.metadata.EncryptionAlgorithm()
+               fileAad, err := f.handleAadPrefix(decryptProps, &algo)
+               if err != nil {
+                       return err
+               }
+               f.fileDecryptor = encryption.NewFileDecryptor(decryptProps, 
fileAad, algo.Algo, string(f.metadata.GetFooterSigningKeyMetadata()), 
f.props.Allocator())
+               // set the InternalFileDecryptor in the metadata as well, as 
it's used
+               // for signature verification and for ColumnChunkMetaData 
creation.
+               f.metadata.FileDecryptor = f.fileDecryptor
+               if decryptProps.PlaintextFooterIntegrity() {
+                       if len(data)-f.metadata.Size() != 
encryption.GcmTagLength+encryption.NonceLength {
+                               return xerrors.New("failed reading metadata for 
encryption signature")
+                       }
+
+                       if 
!f.metadata.VerifySignature(data[f.metadata.Size():]) {
+                               return xerrors.New("parquet crypto signature 
verification failed")
+                       }
+               }
+       }
+       return nil
+}
+
+// WriterVersion returns the Application Version that was written in the file
+// metadata
+func (f *Reader) WriterVersion() *metadata.AppVersion {
+       return f.metadata.WriterVersion()
+}
+
+// NumRows returns the total number of rows in this parquet file.
+func (f *Reader) NumRows() int64 {
+       return f.metadata.GetNumRows()
+}
+
+// NumRowGroups returns the total number of row groups in this file.
+func (f *Reader) NumRowGroups() int {
+       return len(f.metadata.GetRowGroups())
+}
+
+// RowGroup returns a reader for the desired (0-based) row group
+func (f *Reader) RowGroup(i int) *RowGroupReader {
+       rg := f.metadata.RowGroups[i]
+
+       return &RowGroupReader{
+               fileMetadata:  f.metadata,
+               rgMetadata:    metadata.NewRowGroupMetaData(rg, 
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
+               props:         f.props,
+               r:             f.r,
+               sourceSz:      f.footerOffset,
+               fileDecryptor: f.fileDecryptor,
+       }
+}
diff --git a/go/parquet/file/file_reader_test.go 
b/go/parquet/file/file_reader_test.go
new file mode 100644
index 0000000..6dfb1fa
--- /dev/null
+++ b/go/parquet/file/file_reader_test.go
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file_test
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+       "math/rand"
+       "testing"
+
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet/compress"
+       "github.com/apache/arrow/go/parquet/file"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/thrift"
+       "github.com/apache/arrow/go/parquet/metadata"
+       libthrift "github.com/apache/thrift/lib/go/thrift"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/suite"
+)
+
+func getDummyStats(statSize int, fillAll bool) *format.Statistics {
+       statBytes := make([]byte, statSize)
+       memory.Set(statBytes, 1)
+
+       ret := format.NewStatistics()
+       ret.Max = statBytes
+       if fillAll {
+               ret.Min = statBytes
+               ret.NullCount = libthrift.Int64Ptr(42)
+               ret.DistinctCount = libthrift.Int64Ptr(1)
+       }
+       return ret
+}
+
+func checkStatistics(t *testing.T, stats format.Statistics, actual 
metadata.EncodedStatistics) {
+       if stats.IsSetMax() {
+               assert.Equal(t, stats.Max, actual.Max)
+       }
+       if stats.IsSetMin() {
+               assert.Equal(t, stats.Min, actual.Min)
+       }
+       if stats.IsSetNullCount() {
+               assert.Equal(t, stats.GetNullCount(), actual.NullCount)
+       }
+       if stats.IsSetDistinctCount() {
+               assert.Equal(t, stats.GetDistinctCount(), actual.DistinctCount)
+       }
+}
+
+type PageSerdeSuite struct {
+       suite.Suite
+
+       sink   *encoding.BufferWriter
+       buffer *memory.Buffer
+
+       pageHdr       format.PageHeader
+       dataPageHdr   format.DataPageHeader
+       dataPageHdrV2 format.DataPageHeaderV2
+
+       pageReader file.PageReader
+}
+
+func TestFileDeserializing(t *testing.T) {
+       t.Parallel()
+       suite.Run(t, new(PageSerdeSuite))
+}
+
+func (p *PageSerdeSuite) ResetStream() {
+       p.sink = encoding.NewBufferWriter(0, memory.DefaultAllocator)
+}
+
+func (p *PageSerdeSuite) EndStream() {
+       p.buffer = p.sink.Finish()
+}
+
+func (p *PageSerdeSuite) SetupTest() {
+       p.dataPageHdr.Encoding = format.Encoding_PLAIN
+       p.dataPageHdr.DefinitionLevelEncoding = format.Encoding_RLE
+       p.dataPageHdr.RepetitionLevelEncoding = format.Encoding_RLE
+
+       p.ResetStream()
+}
+
+func (p *PageSerdeSuite) InitSerializedPageReader(nrows int64, codec 
compress.Compression) {
+       p.EndStream()
+
+       p.pageReader, _ = file.NewPageReader(bytes.NewReader(p.buffer.Bytes()), 
nrows, codec, memory.DefaultAllocator, nil)
+}
+
+func (p *PageSerdeSuite) WriteDataPageHeader(maxSerialized int, uncompressed, 
compressed int32) {
+       // simplifying writing serialized data page headers which may or may
+       // not have meaningful data associated with them
+
+       p.pageHdr.DataPageHeader = &p.dataPageHdr
+       p.pageHdr.UncompressedPageSize = uncompressed
+       p.pageHdr.CompressedPageSize = compressed
+       p.pageHdr.Type = format.PageType_DATA_PAGE
+
+       serializer := thrift.NewThriftSerializer()
+       p.NotPanics(func() {
+               serializer.Serialize(&p.pageHdr, p.sink, nil)
+       })
+}
+
+func (p *PageSerdeSuite) WriteDataPageHeaderV2(maxSerialized int, 
uncompressed, compressed int32) {
+       p.pageHdr.DataPageHeaderV2 = &p.dataPageHdrV2
+       p.pageHdr.UncompressedPageSize = uncompressed
+       p.pageHdr.CompressedPageSize = compressed
+       p.pageHdr.Type = format.PageType_DATA_PAGE_V2
+
+       serializer := thrift.NewThriftSerializer()
+       p.NotPanics(func() {
+               serializer.Serialize(&p.pageHdr, p.sink, nil)
+       })
+}
+
+func (p *PageSerdeSuite) CheckDataPageHeader(expected format.DataPageHeader, 
page file.Page) {
+       p.Equal(format.PageType_DATA_PAGE, page.Type())
+
+       p.IsType(&file.DataPageV1{}, page)
+       p.Equal(expected.NumValues, page.NumValues())
+       p.Equal(expected.Encoding, page.Encoding())
+       p.EqualValues(expected.DefinitionLevelEncoding, 
page.(*file.DataPageV1).DefinitionLevelEncoding())
+       p.EqualValues(expected.RepetitionLevelEncoding, 
page.(*file.DataPageV1).RepetitionLevelEncoding())
+       checkStatistics(p.T(), *expected.Statistics, 
page.(file.DataPage).Statistics())
+}
+
+func (p *PageSerdeSuite) CheckDataPageHeaderV2(expected 
format.DataPageHeaderV2, page file.Page) {
+       p.Equal(format.PageType_DATA_PAGE_V2, page.Type())
+
+       p.IsType(&file.DataPageV2{}, page)
+       p.Equal(expected.NumValues, page.NumValues())
+       p.Equal(expected.Encoding, page.Encoding())
+       p.Equal(expected.NumNulls, page.(*file.DataPageV2).NumNulls())
+       p.Equal(expected.DefinitionLevelsByteLength, 
page.(*file.DataPageV2).DefinitionLevelByteLen())
+       p.Equal(expected.RepetitionLevelsByteLength, 
page.(*file.DataPageV2).RepetitionLevelByteLen())
+       p.Equal(expected.IsCompressed, page.(*file.DataPageV2).IsCompressed())
+       checkStatistics(p.T(), *expected.Statistics, 
page.(file.DataPage).Statistics())
+}
+
+func (p *PageSerdeSuite) TestDataPageV1() {
+       const (
+               statsSize = 512
+               nrows     = 4444
+       )
+       p.dataPageHdr.Statistics = getDummyStats(statsSize, true)
+       p.dataPageHdr.NumValues = nrows
+
+       p.WriteDataPageHeader(1024, 0, 0)
+       p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+       p.True(p.pageReader.Next())
+       currentPage := p.pageReader.Page()
+       p.CheckDataPageHeader(p.dataPageHdr, currentPage)
+}
+
+func (p *PageSerdeSuite) TestDataPageV2() {
+       const (
+               statsSize = 512
+               nrows     = 4444
+       )
+       p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
+       p.dataPageHdrV2.NumValues = nrows
+       p.WriteDataPageHeaderV2(1024, 0, 0)
+       p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+       p.True(p.pageReader.Next())
+       p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
+}
+
+func (p *PageSerdeSuite) TestLargePageHeaders() {
+       const (
+               statsSize     = 256 * 1024 // 256KB
+               nrows         = 4141
+               maxHeaderSize = 512 * 1024 // 512KB
+       )
+
+       p.dataPageHdr.Statistics = getDummyStats(statsSize, false)
+       p.dataPageHdr.NumValues = nrows
+       p.WriteDataPageHeader(maxHeaderSize, 0, 0)
+       pos, err := p.sink.Seek(0, io.SeekCurrent)
+       p.NoError(err)
+       p.GreaterOrEqual(maxHeaderSize, int(pos))
+       p.LessOrEqual(statsSize, int(pos))
+       p.GreaterOrEqual(16*1024*1024, int(pos))
+
+       p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+       p.True(p.pageReader.Next())
+       p.CheckDataPageHeader(p.dataPageHdr, p.pageReader.Page())
+}
+
+func (p *PageSerdeSuite) TestFailLargePageHeaders() {
+       const (
+               statsSize      = 256 * 1024 // 256KB
+               nrows          = 1337       // dummy value
+               maxHeaderSize  = 512 * 1024 // 512 KB
+               smallerMaxSize = 128 * 1024 // 128KB
+       )
+       p.dataPageHdr.Statistics = getDummyStats(statsSize, false)
+       p.WriteDataPageHeader(maxHeaderSize, 0, 0)
+       pos, err := p.sink.Seek(0, io.SeekCurrent)
+       p.NoError(err)
+       p.GreaterOrEqual(maxHeaderSize, int(pos))
+
+       p.LessOrEqual(smallerMaxSize, int(pos))
+       p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
+       p.pageReader.SetMaxPageHeaderSize(smallerMaxSize)
+       p.NotPanics(func() { p.False(p.pageReader.Next()) })
+       p.Error(p.pageReader.Err())
+}
+
+func (p *PageSerdeSuite) TestCompression() {
+       codecs := []compress.Compression{
+               compress.Codecs.Snappy,
+               compress.Codecs.Brotli,
+               compress.Codecs.Gzip,
+               // compress.Codecs.Lz4, // not yet implemented
+               compress.Codecs.Zstd,
+       }
+
+       const (
+               nrows  = 32 // dummy value
+               npages = 10
+       )
+       p.dataPageHdr.NumValues = nrows
+
+       fauxData := make([][]byte, npages)
+       for idx := range fauxData {
+               // each page is larger
+               fauxData[idx] = make([]byte, (idx+1)*64)
+               rand.Read(fauxData[idx])
+       }
+       for _, c := range codecs {
+               p.Run(c.String(), func() {
+                       codec, _ := compress.GetCodec(c)
+                       for _, data := range fauxData {
+                               maxCompressed := 
codec.CompressBound(int64(len(data)))
+                               buffer := make([]byte, maxCompressed)
+                               buffer = codec.Encode(buffer, data)
+                               p.WriteDataPageHeader(1024, int32(len(data)), 
int32(len(buffer)))
+                               _, err := p.sink.Write(buffer)
+                               p.NoError(err)
+                       }
+
+                       p.InitSerializedPageReader(nrows*npages, c)
+
+                       for _, data := range fauxData {
+                               p.True(p.pageReader.Next())
+                               page := p.pageReader.Page()
+                               p.IsType(&file.DataPageV1{}, page)
+                               p.Equal(data, page.Data())
+                       }
+                       p.ResetStream()
+               })
+       }
+}
+
+func TestInvalidHeaders(t *testing.T) {
+       badHeader := []byte("PAR2")
+       _, err := file.NewParquetReader(bytes.NewReader(badHeader))
+       assert.Error(t, err)
+}
+
+func TestInvalidFooter(t *testing.T) {
+       // file is smaller than FOOTER_SIZE
+       badFile := []byte("PAR1PAR")
+       _, err := file.NewParquetReader(bytes.NewReader(badFile))
+       assert.Error(t, err)
+
+       // Magic Number Incorrect
+       badFile2 := []byte("PAR1PAR2")
+       _, err = file.NewParquetReader(bytes.NewReader(badFile2))
+       assert.Error(t, err)
+}
+
+func TestIncompleteMetadata(t *testing.T) {
+       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+       magic := []byte("PAR1")
+
+       sink.Write(magic)
+       sink.Write(make([]byte, 10))
+       const metadataLen = 24
+       binary.Write(sink, binary.LittleEndian, uint32(metadataLen))
+       sink.Write(magic)
+       buf := sink.Finish()
+       defer buf.Release()
+       _, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+       assert.Error(t, err)
+}
diff --git a/go/parquet/file/level_conversion.go 
b/go/parquet/file/level_conversion.go
new file mode 100644
index 0000000..6c56c13
--- /dev/null
+++ b/go/parquet/file/level_conversion.go
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "math"
+       "math/bits"
+       "unsafe"
+
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/bmi"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/apache/arrow/go/parquet/schema"
+       "golang.org/x/xerrors"
+)
+
+type LevelInfo struct {
+       // How many slots an undefined but present (i.e. null) element in
+       // parquet consumes when decoding to Arrow.
+       // "Slot" is used in the same context as the Arrow specification
+       // (i.e. a value holder).
+       // This is only ever >1 for descendents of FixedSizeList.
+       NullSlotUsage int32
+       // The definition level at which the value for the field
+       // is considered not null (definition levels greater than
+       // or equal to this value indicate a not-null
+       // value for the field). For list fields definition levels
+       // greater than or equal to this field indicate a present,
+       // possibly null, child value.
+       DefLevel int16
+       // The repetition level corresponding to this element
+       // or the closest repeated ancestor.  Any repetition
+       // level less than this indicates either a new list OR
+       // an empty list (which is determined in conjunction
+       // with definition levels).
+       RepLevel int16
+       // The definition level indicating the level at which the closest
+       // repeated ancestor is not empty.  This is used to discriminate
+       // between a value less than |def_level| being null or excluded 
entirely.
+       // For instance if we have an arrow schema like:
+       // list(struct(f0: int)).  Then then there are the following
+       // definition levels:
+       //   0 = null list
+       //   1 = present but empty list.
+       //   2 = a null value in the list
+       //   3 = a non null struct but null integer.
+       //   4 = a present integer.
+       // When reconstructing, the struct and integer arrays'
+       // repeated_ancestor_def_level would be 2.  Any
+       // def_level < 2 indicates that there isn't a corresponding
+       // child value in the list.
+       // i.e. [null, [], [null], [{f0: null}], [{f0: 1}]]
+       // has the def levels [0, 1, 2, 3, 4].  The actual
+       // struct array is only of length 3: [not-set, set, set] and
+       // the int array is also of length 3: [N/A, null, 1].
+       RepeatedAncestorDefLevel int16
+}
+
+func newDefaultLevelInfo() *LevelInfo {
+       return &LevelInfo{NullSlotUsage: 1}
+}
+
+func (l *LevelInfo) Equal(rhs *LevelInfo) bool {
+       return l.NullSlotUsage == rhs.NullSlotUsage &&
+               l.DefLevel == rhs.DefLevel &&
+               l.RepLevel == rhs.RepLevel &&
+               l.RepeatedAncestorDefLevel == rhs.RepeatedAncestorDefLevel
+}
+
+func (l *LevelInfo) HasNullableValues() bool {
+       return l.RepeatedAncestorDefLevel < l.DefLevel
+}
+
+func (l *LevelInfo) IncrementOptional() {
+       l.DefLevel++
+}
+
+func (l *LevelInfo) IncrementRepeated() int16 {
+       lastRepAncestor := l.RepeatedAncestorDefLevel
+       // Repeated fields add both a repetition and definition level. This is 
used
+       // to distinguish between an empty list and a list with an item in it.
+       l.RepLevel++
+       l.DefLevel++
+
+       // For levels >= repeated_ancenstor_def_level it indicates the list was
+       // non-null and had at least one element.  This is important
+       // for later decoding because we need to add a slot for these
+       // values.  for levels < current_def_level no slots are added
+       // to arrays.
+       l.RepeatedAncestorDefLevel = l.DefLevel
+       return lastRepAncestor
+}
+
+func (l *LevelInfo) Increment(n schema.Node) {
+       switch n.RepetitionType() {
+       case parquet.Repetitions.Repeated:
+               l.IncrementRepeated()
+       case parquet.Repetitions.Optional:
+               l.IncrementOptional()
+       }
+}
+
+// Input/Output structure for reconstructed validity bitmaps.
+type ValidityBitmapInputOutput struct {
+       // Input only.
+       // The maximum number of values_read expected (actual
+       // values read must be less than or equal to this value).
+       // If this number is exceeded methods will throw a
+       // ParquetException. Exceeding this limit indicates
+       // either a corrupt or incorrectly written file.
+       ReadUpperBound int64
+       // Output only. The number of values added to the encountered
+       // (this is logically the count of the number of elements
+       // for an Arrow array).
+       Read int64
+       // Input/Output. The number of nulls encountered.
+       NullCount int64
+       // Output only. The validity bitmap to populate. May be be null only
+       // for DefRepLevelsToListInfo (if all that is needed is list offsets).
+       ValidBits []byte
+       // Input only, offset into valid_bits to start at.
+       ValidBitsOffset int64
+}
+
+const extractBitsSize int64 = 8 * int64(unsafe.Sizeof(uint64(0)))
+
+// create a bitmap out of the definition Levels and return the number of 
non-null values
+func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info 
LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) uint64 {
+       definedBitmap := bmi.GreaterThanBitmap(defLevels, info.DefLevel-1)
+
+       if hasRepeatedParent {
+               // Greater than level_info.repeated_ancestor_def_level - 1 
implies >= the
+               // repeated_ancestor_def_level
+               presentBitmap := bmi.GreaterThanBitmap(defLevels, 
info.RepeatedAncestorDefLevel-1)
+               selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap)
+               selectedCount := int64(bits.OnesCount64(presentBitmap))
+               if selectedCount > remainingUpperBound {
+                       panic("values read exceeded upper bound")
+               }
+               wr.AppendWord(selectedBits, selectedCount)
+               return uint64(bits.OnesCount64(selectedBits))
+       }
+
+       if int64(len(defLevels)) > remainingUpperBound {
+               panic("values read exceed upper bound")
+       }
+
+       wr.AppendWord(definedBitmap, int64(len(defLevels)))
+       return uint64(bits.OnesCount64(definedBitmap))
+}
+
+// create a bitmap out of the definition Levels
+func defLevelsToBitmapInternal(defLevels []int16, info LevelInfo, out 
*ValidityBitmapInputOutput, hasRepeatedParent bool) {
+       wr := utils.NewFirstTimeBitmapWriter(out.ValidBits, 
out.ValidBitsOffset, int64(len(defLevels)))
+       defer wr.Finish()
+       setCount := defLevelsBatchToBitmap(defLevels, out.ReadUpperBound, info, 
wr, hasRepeatedParent)
+       out.Read = int64(wr.Pos())
+       out.NullCount += out.Read - int64(setCount)
+}
+
+// DefLevelsToBitmap creates a validitybitmap out of the passed in definition 
levels and info object.
+func DefLevelsToBitmap(defLevels []int16, info LevelInfo, out 
*ValidityBitmapInputOutput) {
+       hasRepeatedParent := false
+       if info.RepLevel > 0 {
+               hasRepeatedParent = true
+       }
+       defLevelsToBitmapInternal(defLevels, info, out, hasRepeatedParent)
+}
+
+// DefRepLevelsToListInfo takes in the definition and repetition levels in 
order to populate the validity bitmap
+// and properly handle nested lists and update the offsets for them.
+func DefRepLevelsToListInfo(defLevels, repLevels []int16, info LevelInfo, out 
*ValidityBitmapInputOutput, offsets []int32) error {
+       var wr utils.BitmapWriter
+       if out.ValidBits != nil {
+               wr = utils.NewFirstTimeBitmapWriter(out.ValidBits, 
out.ValidBitsOffset, out.ReadUpperBound)
+               defer wr.Finish()
+       }
+       offsetPos := 0
+       for idx := range defLevels {
+               // skip items that belong to empty or null ancestor lists and 
further nested lists
+               if defLevels[idx] < info.RepeatedAncestorDefLevel || 
repLevels[idx] > info.RepLevel {
+                       continue
+               }
+
+               if repLevels[idx] == info.RepLevel {
+                       // continuation of an existing list.
+                       // offsets can be null for structs with repeated 
children
+                       if offsetPos < len(offsets) {
+                               if offsets[offsetPos] == math.MaxInt32 {
+                                       return xerrors.New("list index 
overflow")
+                               }
+                               offsets[offsetPos]++
+                       }
+               } else {
+                       if (wr != nil && int64(wr.Pos()) >= out.ReadUpperBound) 
|| (offsetPos >= int(out.ReadUpperBound)) {
+                               return xerrors.Errorf("definition levels 
exceeded upper bound: %d", out.ReadUpperBound)
+                       }
+
+                       // current_rep < list rep_level i.e. start of a list 
(ancestor empty lists
+                       // are filtered out above)
+                       // offsets can be null for structs with repeated 
children
+                       if offsetPos+1 < len(offsets) {
+                               offsetPos++
+                               // use cumulative offsets because variable size 
lists are more common
+                               // than fixed size lists so it should be 
cheaper to make these
+                               // cumulative and subtract when validating 
fixed size lists
+                               offsets[offsetPos] = offsets[offsetPos-1]
+                               if defLevels[idx] >= info.DefLevel {
+                                       if offsets[offsetPos] == math.MaxInt32 {
+                                               return xerrors.New("list index 
overflow")
+                                       }
+                                       offsets[offsetPos]++
+                               }
+                       }
+
+                       if wr != nil {
+                               // the level info def level for lists reflects 
element present level
+                               // the prior level distinguishes between empty 
lists
+                               if defLevels[idx] >= info.DefLevel-1 {
+                                       wr.Set()
+                               } else {
+                                       out.NullCount++
+                                       wr.Clear()
+                               }
+                               wr.Next()
+                       }
+               }
+       }
+
+       if len(offsets) > 0 {
+               out.Read = int64(offsetPos)
+       } else if wr != nil {
+               out.Read = int64(wr.Pos())
+       }
+
+       if out.NullCount > 0 && info.NullSlotUsage > 1 {
+               return xerrors.New("null values with null_slot_usage > 1 not 
supported.")
+       }
+       return nil
+}
+
+// DefRepLevelsToBitmap constructs a full validitybitmap out of the definition 
and repetition levels
+// properly handling nested lists and parents.
+func DefRepLevelsToBitmap(defLevels, repLevels []int16, info LevelInfo, out 
*ValidityBitmapInputOutput) error {
+       info.RepLevel++
+       info.DefLevel++
+       return DefRepLevelsToListInfo(defLevels, repLevels, info, out, nil)
+}
diff --git a/go/parquet/file/level_conversion_test.go 
b/go/parquet/file/level_conversion_test.go
new file mode 100644
index 0000000..08d2fe3
--- /dev/null
+++ b/go/parquet/file/level_conversion_test.go
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/parquet/internal/bmi"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/stretchr/testify/assert"
+)
+
+func bitmapToString(bitmap []byte, bitCount int64) string {
+       var bld strings.Builder
+       bld.Grow(int(bitCount))
+       for i := 0; i < int(bitCount); i++ {
+               if bitutil.BitIsSet(bitmap, i) {
+                       bld.WriteByte('1')
+               } else {
+                       bld.WriteByte('0')
+               }
+       }
+       return bld.String()
+}
+
+func TestDefLevelsToBitmap(t *testing.T) {
+       defLevels := []int16{3, 3, 3, 2, 3, 3, 3, 3, 3}
+       validBits := []byte{2, 0}
+
+       var info LevelInfo
+       info.DefLevel = 3
+       info.RepLevel = 1
+
+       var io ValidityBitmapInputOutput
+       io.ReadUpperBound = int64(len(defLevels))
+       io.Read = -1
+       io.ValidBits = validBits
+
+       DefLevelsToBitmap(defLevels, info, &io)
+       assert.Equal(t, int64(9), io.Read)
+       assert.Equal(t, int64(1), io.NullCount)
+
+       // call again with 0 definition levels make sure that valid bits is 
unmodified
+       curByte := validBits[1]
+       io.NullCount = 0
+       DefLevelsToBitmap(defLevels[:0], info, &io)
+
+       assert.Zero(t, io.Read)
+       assert.Zero(t, io.NullCount)
+       assert.Equal(t, curByte, validBits[1])
+}
+
+func TestDefLevelstToBitmapPowerOf2(t *testing.T) {
+       defLevels := []int16{3, 3, 3, 2, 3, 3, 3, 3}
+       validBits := []byte{1, 0}
+
+       var (
+               info LevelInfo
+               io   ValidityBitmapInputOutput
+       )
+
+       info.RepLevel = 1
+       info.DefLevel = 3
+       io.Read = -1
+       io.ReadUpperBound = int64(len(defLevels))
+       io.ValidBits = validBits
+
+       DefLevelsToBitmap(defLevels[4:8], info, &io)
+       assert.Equal(t, int64(4), io.Read)
+       assert.Zero(t, io.NullCount)
+}
+
+func TestGreaterThanBitmapGeneratesExpectedBitmasks(t *testing.T) {
+       defLevels := []int16{
+               0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+               0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+               0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+               0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}
+
+       tests := []struct {
+               name     string
+               num      int
+               rhs      int16
+               expected uint64
+       }{
+               {"no levels", 0, 0, 0},
+               {"64 and 8", 64, 8, 0},
+               {"64 and -1", 64, -1, 0xFFFFFFFFFFFFFFFF},
+               // should be zero padded
+               {"zero pad 47, -1", 47, -1, 0x7FFFFFFFFFFF},
+               {"zero pad 64 and 6", 64, 6, 0x8080808080808080},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert.Equal(t, tt.expected, 
bmi.GreaterThanBitmap(defLevels[:tt.num], tt.rhs))
+               })
+       }
+}
+
+func TestWithRepetitionlevelFiltersOutEmptyListValues(t *testing.T) {
+       validityBitmap := make([]byte, 8)
+       io := ValidityBitmapInputOutput{
+               ReadUpperBound:  64,
+               Read:            1,
+               NullCount:       5,
+               ValidBits:       validityBitmap,
+               ValidBitsOffset: 1,
+       }
+
+       info := LevelInfo{
+               RepeatedAncestorDefLevel: 1,
+               DefLevel:                 2,
+               RepLevel:                 1,
+       }
+
+       defLevels := []int16{0, 0, 0, 2, 2, 1, 0, 2}
+       DefLevelsToBitmap(defLevels, info, &io)
+
+       assert.Equal(t, bitmapToString(validityBitmap, 8), "01101000")
+       for _, x := range validityBitmap[1:] {
+               assert.Zero(t, x)
+       }
+       assert.EqualValues(t, 6, io.NullCount)
+       assert.EqualValues(t, 4, io.Read)
+}
+
+type MultiLevelTestData struct {
+       defLevels []int16
+       repLevels []int16
+}
+
+func TriplNestedList() MultiLevelTestData {
+       // Triply nested list values borrow from write_path
+       // [null, [[1, null, 3], []], []],
+       // [[[]], [[], [1, 2]], null, [[3]]],
+       // null,
+       // []
+       return MultiLevelTestData{
+               defLevels: []int16{2, 7, 6, 7, 5, 3, // first row
+                       5, 5, 7, 7, 2, 7, // second row
+                       0, // third row
+                       1},
+               repLevels: []int16{0, 1, 3, 3, 2, 1, // first row
+                       0, 1, 2, 3, 1, 1, // second row
+                       0, 0},
+       }
+}
+
+func TestActualCase(t *testing.T) {
+       out := make([]byte, 512)
+       defs := make([]int16, 64)
+       for i := range defs {
+               defs[i] = 3
+       }
+
+       defs[0] = 0
+       defs[25] = 0
+       defs[33] = 0
+       defs[49] = 0
+       defs[58] = 0
+       defs[59] = 0
+       defs[60] = 0
+       defs[61] = 0
+
+       remaining := int64(4096)
+       info := LevelInfo{
+               NullSlotUsage:            0,
+               DefLevel:                 3,
+               RepLevel:                 1,
+               RepeatedAncestorDefLevel: 2,
+       }
+
+       wr := utils.NewFirstTimeBitmapWriter(out, 0, 4096)
+       v := defLevelsBatchToBitmap(defs, remaining, info, wr, true)
+       assert.EqualValues(t, 56, v)
+       assert.Equal(t, []byte{255, 255, 255, 255}, out[:4])
+}
diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
new file mode 100644
index 0000000..251499a
--- /dev/null
+++ b/go/parquet/file/page_reader.go
@@ -0,0 +1,620 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "io"
+       "sync"
+
+       "github.com/JohnCGriffin/overflow"
+       "github.com/apache/arrow/go/arrow/ipc"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/compress"
+       "github.com/apache/arrow/go/parquet/internal/debug"
+       "github.com/apache/arrow/go/parquet/internal/encryption"
+       format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/thrift"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "golang.org/x/xerrors"
+)
+
+// PageReader is the interface used by the columnreader in order to read
+// and handle DataPages and loop through them.
+type PageReader interface {
+       // Set the maximum Page header size allowed to be read
+       SetMaxPageHeaderSize(int)
+       // Return the current page, or nil if there are no more
+       Page() Page
+       // Fetch the next page, returns false if there are no more pages
+       Next() bool
+       // if Next returns false, Err will return the error encountered or
+       // nil if there was no error and you just hit the end of the page
+       Err() error
+       // Reset allows reusing a page reader
+       Reset(r parquet.ReaderAtSeeker, nrows int64, compressType 
compress.Compression, ctx *CryptoContext)
+}
+
+// Page is an interface for handling DataPages or Dictionary Pages
+type Page interface {
+       // Returns which kind of page this is
+       Type() format.PageType
+       // Get the raw bytes of this page
+       Data() []byte
+       // return the encoding used for this page, Plain/RLE, etc.
+       Encoding() format.Encoding
+       // get the number of values in this page
+       NumValues() int32
+       // release this page object back into the page pool for re-use
+       Release()
+}
+
+type page struct {
+       buf *memory.Buffer
+       typ format.PageType
+
+       nvals    int32
+       encoding format.Encoding
+}
+
+func (p *page) Type() format.PageType     { return p.typ }
+func (p *page) Data() []byte              { return p.buf.Bytes() }
+func (p *page) NumValues() int32          { return p.nvals }
+func (p *page) Encoding() format.Encoding { return p.encoding }
+
+// DataPage is the base interface for both DataPageV1 and DataPageV2 of the
+// parquet spec.
+type DataPage interface {
+       Page
+       UncompressedSize() int64
+       Statistics() metadata.EncodedStatistics
+}
+
+// Create some pools to use for reusing the data page objects themselves so 
that
+// we can avoid tight loops that are creating and destroying tons of individual
+// objects. This combined with a Release function on the pages themselves
+// which will put them back into the pool yields significant memory reduction
+// and performance benefits
+
+var dataPageV1Pool = sync.Pool{
+       New: func() interface{} { return (*DataPageV1)(nil) },
+}
+
+var dataPageV2Pool = sync.Pool{
+       New: func() interface{} { return (*DataPageV2)(nil) },
+}
+
+var dictPagePool = sync.Pool{
+       New: func() interface{} { return (*DictionaryPage)(nil) },
+}
+
+// DataPageV1 represents a DataPage version 1 from the parquet.thrift file
+type DataPageV1 struct {
+       page
+
+       defLvlEncoding   format.Encoding
+       repLvlEncoding   format.Encoding
+       uncompressedSize int64
+       statistics       metadata.EncodedStatistics
+}
+
+// NewDataPageV1 returns a V1 data page with the given buffer as its data and 
the specified encoding information
+//
+// Will utilize objects that have been released back into the data page pool 
and
+// re-use them if available as opposed to creating new objects. Calling 
Release on the
+// data page object will release it back to the pool for re-use.
+func NewDataPageV1(buffer *memory.Buffer, num int32, encoding, defEncoding, 
repEncoding parquet.Encoding, uncompressedSize int64) *DataPageV1 {
+       dp := dataPageV1Pool.Get().(*DataPageV1)
+       if dp == nil {
+               return &DataPageV1{
+                       page:             page{buf: buffer, typ: 
format.PageType_DATA_PAGE, nvals: num, encoding: format.Encoding(encoding)},
+                       defLvlEncoding:   format.Encoding(defEncoding),
+                       repLvlEncoding:   format.Encoding(repEncoding),
+                       uncompressedSize: uncompressedSize,
+               }
+       }
+
+       dp.buf, dp.nvals = buffer, num
+       dp.encoding = format.Encoding(encoding)
+       dp.defLvlEncoding, dp.repLvlEncoding = format.Encoding(defEncoding), 
format.Encoding(repEncoding)
+       dp.statistics.HasMax, dp.statistics.HasMin = false, false
+       dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, 
false
+       dp.uncompressedSize = uncompressedSize
+       return dp
+}
+
+// NewDataPageV1WithStats is the same as NewDataPageV1, but also allows adding 
the stat info into the created page
+func NewDataPageV1WithStats(buffer *memory.Buffer, num int32, encoding, 
defEncoding, repEncoding parquet.Encoding, uncompressedSize int64, stats 
metadata.EncodedStatistics) *DataPageV1 {
+       ret := NewDataPageV1(buffer, num, encoding, defEncoding, repEncoding, 
uncompressedSize)
+       ret.statistics = stats
+       return ret
+}
+
+// Release this page back into the DataPage object pool so that it can be 
reused.
+//
+// After calling this function, the object should not be utilized anymore, 
otherwise
+// conflicts can arise.
+func (d *DataPageV1) Release() {
+       d.buf.Release()
+       d.buf = nil
+       dataPageV1Pool.Put(d)
+}
+
+// UncompressedSize returns the size of the data in this data page when 
uncompressed
+func (d *DataPageV1) UncompressedSize() int64 { return d.uncompressedSize }
+
+// Statistics returns the encoded statistics on this data page
+func (d *DataPageV1) Statistics() metadata.EncodedStatistics { return 
d.statistics }
+
+// DefinitionLevelEncoding returns the encoding utilized for the Definition 
Levels
+func (d *DataPageV1) DefinitionLevelEncoding() parquet.Encoding {
+       return parquet.Encoding(d.defLvlEncoding)
+}
+
+// RepetitionLevelEncoding returns the encoding utilized for the Repetition 
Levels
+func (d *DataPageV1) RepetitionLevelEncoding() parquet.Encoding {
+       return parquet.Encoding(d.repLvlEncoding)
+}
+
+// DataPageV2 is the representation of the V2 data page from the 
parquet.thrift spec
+type DataPageV2 struct {
+       page
+
+       nulls            int32
+       nrows            int32
+       defLvlByteLen    int32
+       repLvlByteLen    int32
+       compressed       bool
+       uncompressedSize int64
+       statistics       metadata.EncodedStatistics
+}
+
+// NewDataPageV2 constructs a new V2 data page with the provided information 
and a buffer of the raw data.
+func NewDataPageV2(buffer *memory.Buffer, numValues, numNulls, numRows int32, 
encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen int32, uncompressed 
int64, isCompressed bool) *DataPageV2 {
+       dp := dataPageV2Pool.Get().(*DataPageV2)
+       if dp == nil {
+               return &DataPageV2{
+                       page:             page{buf: buffer, typ: 
format.PageType_DATA_PAGE_V2, nvals: numValues, encoding: 
format.Encoding(encoding)},
+                       nulls:            numNulls,
+                       nrows:            numRows,
+                       defLvlByteLen:    defLvlsByteLen,
+                       repLvlByteLen:    repLvlsByteLen,
+                       compressed:       isCompressed,
+                       uncompressedSize: uncompressed,
+               }
+       }
+
+       dp.buf, dp.nvals = buffer, numValues
+       dp.encoding = format.Encoding(encoding)
+       dp.nulls, dp.nrows = numNulls, numRows
+       dp.defLvlByteLen, dp.repLvlByteLen = defLvlsByteLen, repLvlsByteLen
+       dp.compressed, dp.uncompressedSize = isCompressed, uncompressed
+       dp.statistics.HasMax, dp.statistics.HasMin = false, false
+       dp.statistics.HasNullCount, dp.statistics.HasDistinctCount = false, 
false
+       return dp
+}
+
+// NewDataPageV2WithStats is the same as NewDataPageV2 but allows providing 
the encoded stats with the page.
+func NewDataPageV2WithStats(buffer *memory.Buffer, numValues, numNulls, 
numRows int32, encoding parquet.Encoding, defLvlsByteLen, repLvlsByteLen int32, 
uncompressed int64, isCompressed bool, stats metadata.EncodedStatistics) 
*DataPageV2 {
+       ret := NewDataPageV2(buffer, numValues, numNulls, numRows, encoding, 
defLvlsByteLen, repLvlsByteLen, uncompressed, isCompressed)
+       ret.statistics = stats
+       return ret
+}
+
+// Release this page back into the DataPage object pool so that it can be 
reused.
+//
+// After calling this function, the object should not be utilized anymore, 
otherwise
+// conflicts can arise.
+func (d *DataPageV2) Release() {
+       d.buf.Release()
+       d.buf = nil
+       dataPageV2Pool.Put(d)
+}
+
+// UncompressedSize is the size of the raw page when uncompressed. If 
`IsCompressed` is true, then
+// the raw data in the buffer is expected to be compressed.
+func (d *DataPageV2) UncompressedSize() int64 { return d.uncompressedSize }
+
+// Statistics are the encoded statistics in the data page
+func (d *DataPageV2) Statistics() metadata.EncodedStatistics { return 
d.statistics }
+
+// NumNulls is the reported number of nulls in this datapage
+func (d *DataPageV2) NumNulls() int32 { return d.nulls }
+
+// DefinitionLevelByteLen is the number of bytes in the buffer that are used 
to represent the definition levels
+func (d *DataPageV2) DefinitionLevelByteLen() int32 { return d.defLvlByteLen }
+
+// RepetitionLevelByteLen is the number of bytes in the buffer which are used 
to represent the repetition Levels
+func (d *DataPageV2) RepetitionLevelByteLen() int32 { return d.repLvlByteLen }
+
+// IsCompressed returns true if the data of this page is compressed
+func (d *DataPageV2) IsCompressed() bool { return d.compressed }
+
+// DictionaryPage represents the a page of data that uses dictionary encoding
+type DictionaryPage struct {
+       page
+
+       sorted bool
+}
+
+// NewDictionaryPage constructs a new dictionary page with the provided data 
buffer and number of values.
+func NewDictionaryPage(buffer *memory.Buffer, nvals int32, encoding 
parquet.Encoding) *DictionaryPage {
+       dp := dictPagePool.Get().(*DictionaryPage)
+       if dp == nil {
+               return &DictionaryPage{
+                       page: page{
+                               buf:      buffer,
+                               typ:      format.PageType_DICTIONARY_PAGE,
+                               nvals:    nvals,
+                               encoding: format.Encoding(encoding),
+                       },
+               }
+       }
+
+       dp.buf = buffer
+       dp.nvals = nvals
+       dp.encoding = format.Encoding(encoding)
+       dp.sorted = false
+       return dp
+}
+
+// Release this page back into the DataPage object pool so that it can be 
reused.
+//
+// After calling this function, the object should not be utilized anymore, 
otherwise
+// conflicts can arise.
+func (d *DictionaryPage) Release() {
+       d.buf.Release()
+       d.buf = nil
+       dictPagePool.Put(d)
+}
+
+// IsSorted returns whether the dictionary itself is sorted
+func (d *DictionaryPage) IsSorted() bool { return d.sorted }
+
+type serializedPageReader struct {
+       r        ipc.ReadAtSeeker
+       nrows    int64
+       rowsSeen int64
+       mem      memory.Allocator
+       codec    compress.Codec
+
+       curPageHdr        *format.PageHeader
+       buf               *memory.Buffer
+       pageOrd           int16
+       maxPageHeaderSize int
+
+       curPage           Page
+       cryptoCtx         CryptoContext
+       dataPageAad       string
+       dataPageHeaderAad string
+
+       decompressBuffer bytes.Buffer
+       err              error
+}
+
+// NewPageReader returns a page reader for the data which can be read from the 
provided reader and compression.
+func NewPageReader(r parquet.ReaderAtSeeker, nrows int64, compressType 
compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader, 
error) {
+       if mem == nil {
+               mem = memory.NewGoAllocator()
+       }
+
+       codec, err := compress.GetCodec(compressType)
+       if err != nil {
+               return nil, err
+       }
+
+       rdr := &serializedPageReader{
+               r:                 r,
+               maxPageHeaderSize: defaultMaxPageHeaderSize,
+               nrows:             nrows,
+               mem:               mem,
+               codec:             codec,
+               buf:               memory.NewResizableBuffer(mem),
+       }
+       rdr.decompressBuffer.Grow(defaultPageHeaderSize)
+       if ctx != nil {
+               rdr.cryptoCtx = *ctx
+               rdr.initDecryption()
+       }
+       return rdr, nil
+}
+
+func (p *serializedPageReader) Reset(r parquet.ReaderAtSeeker, nrows int64, 
compressType compress.Compression, ctx *CryptoContext) {
+       p.rowsSeen, p.pageOrd = 0, 0
+       p.curPageHdr, p.curPage, p.err = nil, nil, nil
+       p.r, p.nrows = r, nrows
+
+       p.codec, p.err = compress.GetCodec(compressType)
+       if p.err != nil {
+               return
+       }
+       p.buf.ResizeNoShrink(0)
+       p.decompressBuffer.Reset()
+       if ctx != nil {
+               p.cryptoCtx = *ctx
+               p.initDecryption()
+       } else {
+               p.cryptoCtx = CryptoContext{}
+               p.dataPageAad = ""
+               p.dataPageHeaderAad = ""
+       }
+}
+
+func (p *serializedPageReader) Err() error { return p.err }
+
+func (p *serializedPageReader) SetMaxPageHeaderSize(sz int) {
+       p.maxPageHeaderSize = sz
+}
+
+func (p *serializedPageReader) initDecryption() {
+       if p.cryptoCtx.DataDecryptor != nil {
+               p.dataPageAad = 
encryption.CreateModuleAad(p.cryptoCtx.DataDecryptor.FileAad(), 
encryption.DataPageModule,
+                       p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, 
-1)
+       }
+       if p.cryptoCtx.MetaDecryptor != nil {
+               p.dataPageHeaderAad = 
encryption.CreateModuleAad(p.cryptoCtx.MetaDecryptor.FileAad(), 
encryption.DataPageHeaderModule,
+                       p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, 
-1)
+       }
+}
+
+func (p *serializedPageReader) updateDecryption(decrypt encryption.Decryptor, 
moduleType int8, pageAad string) {
+       if p.cryptoCtx.StartDecryptWithDictionaryPage {
+               aad := encryption.CreateModuleAad(decrypt.FileAad(), 
moduleType, p.cryptoCtx.RowGroupOrdinal, p.cryptoCtx.ColumnOrdinal, -1)
+               decrypt.UpdateAad(aad)
+       } else {
+               pageaad := []byte(pageAad)
+               encryption.QuickUpdatePageAad(pageaad, p.pageOrd)
+               decrypt.UpdateAad(string(pageaad))
+       }
+}
+
+func (p *serializedPageReader) Page() Page {
+       return p.curPage
+}
+
+func (p *serializedPageReader) decompress(lenCompressed int, buf []byte) 
([]byte, error) {
+       p.decompressBuffer.Reset()
+       p.decompressBuffer.Grow(lenCompressed)
+       if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed)); 
err != nil {
+               return nil, err
+       }
+
+       data := p.decompressBuffer.Bytes()
+       if p.cryptoCtx.DataDecryptor != nil {
+               data = 
p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+       }
+
+       return p.codec.Decode(buf, data), nil
+}
+
+type dataheader interface {
+       IsSetStatistics() bool
+       GetStatistics() *format.Statistics
+}
+
+func extractStats(dataHeader dataheader) (pageStats 
metadata.EncodedStatistics) {
+       if dataHeader.IsSetStatistics() {
+               stats := dataHeader.GetStatistics()
+               if stats.IsSetMaxValue() {
+                       pageStats.SetMax(stats.GetMaxValue())
+               } else if stats.IsSetMax() {
+                       pageStats.SetMax(stats.GetMax())
+               }
+               if stats.IsSetMinValue() {
+                       pageStats.SetMin(stats.GetMinValue())
+               } else if stats.IsSetMin() {
+                       pageStats.SetMin(stats.GetMin())
+               }
+
+               if stats.IsSetNullCount() {
+                       pageStats.SetNullCount(stats.GetNullCount())
+               }
+               if stats.IsSetDistinctCount() {
+                       pageStats.SetDistinctCount(stats.GetDistinctCount())
+               }
+       }
+       return
+}
+
+func (p *serializedPageReader) Next() bool {
+       // Loop here because there may be unhandled page types that we skip 
until
+       // finding a page that we do know what to do with
+       if p.curPage != nil {
+               p.curPage.Release()
+       }
+       p.curPage = nil
+       p.curPageHdr = format.NewPageHeader()
+       p.err = nil
+
+       for p.rowsSeen < p.nrows {
+               // headerSize := 0
+               allowedPgSz := defaultPageHeaderSize
+
+               start, _ := p.r.Seek(0, io.SeekCurrent)
+               p.decompressBuffer.Reset()
+               // Page headers can be very large because of page statistics
+               // We try to deserialize a larger buffer progressively
+               // until a maximum allowed header limit
+               for {
+                       n, err := io.CopyN(&p.decompressBuffer, p.r, 
int64(allowedPgSz))
+                       // view, err := p.r.Peek(allowedPgSz)
+                       if err != nil && err != io.EOF {
+                               p.err = err
+                               return false
+                       }
+
+                       if n == 0 {
+                               return false
+                       }
+
+                       view := p.decompressBuffer.Bytes()
+
+                       extra := 0
+                       if p.cryptoCtx.MetaDecryptor != nil {
+                               p.updateDecryption(p.cryptoCtx.MetaDecryptor, 
encryption.DictPageHeaderModule, p.dataPageHeaderAad)
+                               view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
+                               extra = 
p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
+                       }
+
+                       remaining, err := 
thrift.DeserializeThrift(p.curPageHdr, view)
+                       if err != nil {
+                               allowedPgSz *= 2
+                               if allowedPgSz > p.maxPageHeaderSize {
+                                       p.err = xerrors.New("parquet: 
deserializing page header failed")
+                                       return false
+                               }
+                               continue
+                       }
+
+                       p.r.Seek(start+int64(len(view)-int(remaining)+extra), 
io.SeekStart)
+                       break
+               }
+
+               lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
+               lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
+               if lenCompressed < 0 || lenUncompressed < 0 {
+                       p.err = xerrors.New("parquet: invalid page header")
+                       return false
+               }
+
+               if p.cryptoCtx.DataDecryptor != nil {
+                       p.updateDecryption(p.cryptoCtx.DataDecryptor, 
encryption.DictPageModule, p.dataPageAad)
+               }
+
+               p.buf.ResizeNoShrink(lenUncompressed)
+
+               switch p.curPageHdr.GetType() {
+               case format.PageType_DICTIONARY_PAGE:
+                       p.cryptoCtx.StartDecryptWithDictionaryPage = false
+                       dictHeader := p.curPageHdr.GetDictionaryPageHeader()
+                       if dictHeader.GetNumValues() < 0 {
+                               p.err = xerrors.New("parquet: invalid page 
header (negative number of values)")
+                               return false
+                       }
+
+                       data, err := p.decompress(lenCompressed, p.buf.Bytes())
+                       if err != nil {
+                               p.err = err
+                               return false
+                       }
+                       debug.Assert(len(data) == lenUncompressed, "len(data) 
!= lenUncompressed")
+
+                       // p.buf.Resize(lenUncompressed)
+                       // make dictionary page
+                       p.curPage = &DictionaryPage{
+                               page: page{
+                                       buf:      memory.NewBufferBytes(data),
+                                       typ:      p.curPageHdr.Type,
+                                       nvals:    dictHeader.GetNumValues(),
+                                       encoding: dictHeader.GetEncoding(),
+                               },
+                               sorted: dictHeader.IsSetIsSorted() && 
dictHeader.GetIsSorted(),
+                       }
+
+               case format.PageType_DATA_PAGE:
+                       p.pageOrd++
+                       dataHeader := p.curPageHdr.GetDataPageHeader()
+                       if dataHeader.GetNumValues() < 0 {
+                               p.err = xerrors.New("parquet: invalid page 
header (negative number of values)")
+                               return false
+                       }
+
+                       p.rowsSeen += int64(dataHeader.GetNumValues())
+                       data, err := p.decompress(lenCompressed, p.buf.Bytes())
+                       if err != nil {
+                               p.err = err
+                               return false
+                       }
+                       debug.Assert(len(data) == lenUncompressed, "len(data) 
!= lenUncompressed")
+
+                       // make datapagev1
+                       p.curPage = &DataPageV1{
+                               page: page{
+                                       buf:      memory.NewBufferBytes(data),
+                                       typ:      p.curPageHdr.Type,
+                                       nvals:    dataHeader.GetNumValues(),
+                                       encoding: dataHeader.GetEncoding(),
+                               },
+                               defLvlEncoding:   
dataHeader.GetDefinitionLevelEncoding(),
+                               repLvlEncoding:   
dataHeader.GetRepetitionLevelEncoding(),
+                               uncompressedSize: int64(lenUncompressed),
+                               statistics:       extractStats(dataHeader),
+                       }
+               case format.PageType_DATA_PAGE_V2:
+                       p.pageOrd++
+                       dataHeader := p.curPageHdr.GetDataPageHeaderV2()
+                       if dataHeader.GetNumValues() < 0 {
+                               p.err = xerrors.New("parquet: invalid page 
header (negative number of values)")
+                               return false
+                       }
+
+                       if dataHeader.GetDefinitionLevelsByteLength() < 0 || 
dataHeader.GetRepetitionLevelsByteLength() < 0 {
+                               p.err = xerrors.New("parquet: invalid page 
header (negative levels byte length)")
+                               return false
+                       }
+
+                       compressed := dataHeader.GetIsCompressed()
+                       // extract stats
+                       p.rowsSeen += int64(dataHeader.GetNumValues())
+                       levelsBytelen, ok := 
overflow.Add(int(dataHeader.GetDefinitionLevelsByteLength()), 
int(dataHeader.GetRepetitionLevelsByteLength()))
+                       if !ok {
+                               p.err = xerrors.New("parquet: levels size too 
large (corrupt file?)")
+                               return false
+                       }
+
+                       var data []byte
+                       if compressed {
+                               if levelsBytelen > 0 {
+                                       io.ReadFull(p.r, 
p.buf.Bytes()[:levelsBytelen])
+                               }
+                               if data, p.err = 
p.decompress(lenCompressed-levelsBytelen, p.buf.Bytes()[levelsBytelen:]); p.err 
!= nil {
+                                       return false
+                               }
+                       } else {
+                               io.ReadFull(p.r, p.buf.Bytes())
+                               data = p.buf.Bytes()
+                       }
+                       debug.Assert(len(data) == lenUncompressed, "len(data) 
!= lenUncompressed")
+
+                       // make datapage v2
+                       p.curPage = &DataPageV2{
+                               page: page{
+                                       buf:      memory.NewBufferBytes(data),
+                                       typ:      p.curPageHdr.Type,
+                                       nvals:    dataHeader.GetNumValues(),
+                                       encoding: dataHeader.GetEncoding(),
+                               },
+                               nulls:            dataHeader.GetNumNulls(),
+                               nrows:            dataHeader.GetNumRows(),
+                               defLvlByteLen:    
dataHeader.GetDefinitionLevelsByteLength(),
+                               repLvlByteLen:    
dataHeader.GetRepetitionLevelsByteLength(),
+                               compressed:       compressed,
+                               uncompressedSize: int64(lenUncompressed),
+                               statistics:       extractStats(dataHeader),
+                       }
+               default:
+                       // we don't know this page type, we're allowed to skip 
non-data pages
+                       continue
+               }
+
+               p.buf = memory.NewResizableBuffer(p.mem)
+               return true
+       }
+
+       return false
+}
diff --git a/go/parquet/file/row_group_reader.go 
b/go/parquet/file/row_group_reader.go
new file mode 100644
index 0000000..9c74a25
--- /dev/null
+++ b/go/parquet/file/row_group_reader.go
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "github.com/apache/arrow/go/arrow/ipc"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encryption"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "golang.org/x/xerrors"
+)
+
+const (
+       maxDictHeaderSize int64 = 100
+)
+
+// RowGroupReader is the primary interface for reading a single row group
+type RowGroupReader struct {
+       r             ipc.ReadAtSeeker
+       sourceSz      int64
+       fileMetadata  *metadata.FileMetaData
+       rgMetadata    *metadata.RowGroupMetaData
+       props         *parquet.ReaderProperties
+       fileDecryptor encryption.FileDecryptor
+}
+
+// MetaData returns the metadata of the current Row Group
+func (r *RowGroupReader) MetaData() *metadata.RowGroupMetaData { return 
r.rgMetadata }
+
+// NumColumns returns the number of columns of data as defined in the metadata 
of this row group
+func (r *RowGroupReader) NumColumns() int { return r.rgMetadata.NumColumns() }
+
+// NumRows returns the number of rows in just this row group
+func (r *RowGroupReader) NumRows() int64 { return r.rgMetadata.NumRows() }
+
+// ByteSize returns the full byte size of this row group as defined in its 
metadata
+func (r *RowGroupReader) ByteSize() int64 { return 
r.rgMetadata.TotalByteSize() }
+
+// Column returns a column reader for the requested (0-indexed) column
+//
+// panics if passed a column not in the range [0, NumColumns)
+func (r *RowGroupReader) Column(i int) ColumnChunkReader {
+       if i >= r.NumColumns() || i < 0 {
+               panic(xerrors.Errorf("parquet: trying to read column index %d 
but row group metadata only has %d columns", i, r.rgMetadata.NumColumns()))
+       }
+
+       descr := r.fileMetadata.Schema.Column(i)
+       pageRdr, err := r.GetColumnPageReader(i)
+       if err != nil {
+               panic(xerrors.Errorf("parquet: unable to initialize page 
reader: %w", err))
+       }
+       return NewColumnReader(descr, pageRdr, r.props.Allocator())
+}
+
+func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
+       col, err := r.rgMetadata.ColumnChunk(i)
+       if err != nil {
+               return nil, err
+       }
+
+       colStart := col.DataPageOffset()
+       if col.HasDictionaryPage() && col.DictionaryPageOffset() > 0 && 
colStart > col.DictionaryPageOffset() {
+               colStart = col.DictionaryPageOffset()
+       }
+
+       colLen := col.TotalCompressedSize()
+       if 
r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
+               bytesRemain := r.sourceSz - (colStart + colLen)
+               padding := utils.Min(maxDictHeaderSize, bytesRemain)
+               colLen += padding
+       }
+
+       stream, err := r.props.GetStream(r.r, colStart, colLen)
+       if err != nil {
+               return nil, err
+       }
+
+       cryptoMetadata := col.CryptoMetadata()
+       if cryptoMetadata == nil {
+               return NewPageReader(stream, col.NumValues(), 
col.Compression(), r.props.Allocator(), nil)
+       }
+
+       if r.fileDecryptor == nil {
+               return nil, xerrors.New("column in rowgroup is encrypted, but 
no file decryptor")
+       }
+
+       const encryptedRowGroupsLimit = 32767
+       if i > encryptedRowGroupsLimit {
+               return nil, xerrors.New("encrypted files cannot contain more 
than 32767 column chunks")
+       }
+
+       if cryptoMetadata.IsSetENCRYPTION_WITH_FOOTER_KEY() {
+               ctx := CryptoContext{
+                       StartDecryptWithDictionaryPage: col.HasDictionaryPage(),
+                       RowGroupOrdinal:                r.rgMetadata.Ordinal(),
+                       ColumnOrdinal:                  int16(i),
+                       MetaDecryptor:                  
r.fileDecryptor.GetFooterDecryptorForColumnMeta(""),
+                       DataDecryptor:                  
r.fileDecryptor.GetFooterDecryptorForColumnData(""),
+               }
+               return NewPageReader(stream, col.NumValues(), 
col.Compression(), r.props.Allocator(), &ctx)
+       }
+
+       // column encrypted with it's own key
+       columnKeyMeta := 
cryptoMetadata.GetENCRYPTION_WITH_COLUMN_KEY().KeyMetadata
+       columnPath := 
cryptoMetadata.GetENCRYPTION_WITH_COLUMN_KEY().PathInSchema
+
+       ctx := CryptoContext{
+               StartDecryptWithDictionaryPage: col.HasDictionaryPage(),
+               RowGroupOrdinal:                r.rgMetadata.Ordinal(),
+               ColumnOrdinal:                  int16(i),
+               MetaDecryptor:                  
r.fileDecryptor.GetColumnMetaDecryptor(parquet.ColumnPath(columnPath).String(), 
string(columnKeyMeta), ""),
+               DataDecryptor:                  
r.fileDecryptor.GetColumnDataDecryptor(parquet.ColumnPath(columnPath).String(), 
string(columnKeyMeta), ""),
+       }
+       return NewPageReader(stream, col.NumValues(), col.Compression(), 
r.props.Allocator(), &ctx)
+}
diff --git a/go/parquet/go.sum b/go/parquet/go.sum
index cf7b678..46b4f5a 100644
--- a/go/parquet/go.sum
+++ b/go/parquet/go.sum
@@ -75,6 +75,7 @@ github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod 
h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
+github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.7.0 
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
diff --git a/go/parquet/internal/bmi/bitmap_bmi2_noasm.go 
b/go/parquet/internal/bmi/bitmap_bmi2_noasm.go
new file mode 100644
index 0000000..6dc4a39
--- /dev/null
+++ b/go/parquet/internal/bmi/bitmap_bmi2_noasm.go
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build noasm
+
+package bmi
+
+func init() {
+       funclist.extractBits = extractBitsGo
+       funclist.gtbitmap = greaterThanBitmapGo
+}
diff --git a/go/parquet/internal/bmi/bmi.go b/go/parquet/internal/bmi/bmi.go
index ea0f6e3..a12af3e 100644
--- a/go/parquet/internal/bmi/bmi.go
+++ b/go/parquet/internal/bmi/bmi.go
@@ -254,7 +254,7 @@ func extractBitsGo(bitmap, selectBitmap uint64) uint64 {
        for selectBitmap != 0 {
                maskLen := bits.OnesCount32(uint32(selectBitmap & lookupMask))
                value := pextTable[selectBitmap&lookupMask][bitmap&lookupMask]
-               bitValue |= uint64(value << bitLen)
+               bitValue |= uint64(value) << bitLen
                bitLen += maskLen
                bitmap >>= lookupBits
                selectBitmap >>= lookupBits
diff --git a/go/parquet/internal/encoding/boolean_decoder.go 
b/go/parquet/internal/encoding/boolean_decoder.go
index a33b21a..bdf1fd5 100644
--- a/go/parquet/internal/encoding/boolean_decoder.go
+++ b/go/parquet/internal/encoding/boolean_decoder.go
@@ -45,7 +45,7 @@ func (dec *PlainBooleanDecoder) Decode(out []bool) (int, 
error) {
 
        unalignedExtract := func(start, end, curBitOffset int) int {
                i := start
-               for ; curBitOffset < end; i, curBitOffset = i+1, curBitOffset+1 
{
+               for ; curBitOffset < end && i < max; i, curBitOffset = i+1, 
curBitOffset+1 {
                        out[i] = (dec.data[0] & byte(1<<curBitOffset)) != 0
                }
                return i // return the number of bits we extracted
@@ -56,7 +56,7 @@ func (dec *PlainBooleanDecoder) Decode(out []bool) (int, 
error) {
        i := 0
        if dec.bitOffset != 0 {
                i = unalignedExtract(0, 8, dec.bitOffset)
-               dec.bitOffset = 0
+               dec.bitOffset = (dec.bitOffset + i) % 8
        }
 
        // determine the number of full bytes worth of bits we can decode
diff --git a/go/parquet/internal/encoding/boolean_encoder.go 
b/go/parquet/internal/encoding/boolean_encoder.go
index ba06d4c..81f523b 100644
--- a/go/parquet/internal/encoding/boolean_encoder.go
+++ b/go/parquet/internal/encoding/boolean_encoder.go
@@ -47,6 +47,9 @@ func (enc *PlainBooleanEncoder) Put(in []bool) {
        if enc.wr == nil {
                enc.wr = utils.NewBitmapWriter(enc.bitsBuffer, 0, boolsInBuf)
        }
+       if len(in) == 0 {
+               return
+       }
 
        n := enc.wr.AppendBools(in)
        for n < len(in) {
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go 
b/go/parquet/internal/encoding/typed_encoder.gen.go
index 211f062..54f0f56 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go
@@ -495,10 +495,9 @@ type int96EncoderTraits struct{}
 
 // Encoder returns an encoder for int96 type data, using the specified 
encoding type and whether or not
 // it should be dictionary encoded.
-// dictionary encoding does not exist for this type and Encoder will panic if 
useDict is true
 func (int96EncoderTraits) Encoder(e format.Encoding, useDict bool, descr 
*schema.Column, mem memory.Allocator) TypedEncoder {
        if useDict {
-               panic("parquet: no parquet.Int96 dictionary encoding")
+               return &DictInt96Encoder{newDictEncoderBase(descr, 
NewBinaryDictionary(mem), mem)}
        }
 
        switch e {
@@ -521,7 +520,7 @@ func (int96DecoderTraits) BytesRequired(n int) int {
 // Decoder returns a decoder for int96 typed data of the requested encoding 
type if available
 func (int96DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column, 
useDict bool, mem memory.Allocator) TypedDecoder {
        if useDict {
-               panic("dictionary decoding unimplemented for int96")
+               return &DictInt96Decoder{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
        }
 
        switch e {
@@ -532,6 +531,157 @@ func (int96DecoderTraits) Decoder(e parquet.Encoding, 
descr *schema.Column, useD
        }
 }
 
+// DictInt96Encoder is an encoder for parquet.Int96 data using dictionary 
encoding
+type DictInt96Encoder struct {
+       dictEncoder
+}
+
+// Type returns the underlying physical type that can be encoded with this 
encoder
+func (enc *DictInt96Encoder) Type() parquet.Type {
+       return parquet.Types.Int96
+}
+
+// WriteDict populates the byte slice with the dictionary index
+func (enc *DictInt96Encoder) WriteDict(out []byte) {
+       enc.memo.(BinaryMemoTable).CopyFixedWidthValues(0, 
parquet.Int96SizeBytes, out)
+}
+
+// Put encodes the values passed in, adding to the index as needed
+func (enc *DictInt96Encoder) Put(in []parquet.Int96) {
+       for _, v := range in {
+               memoIdx, found, err := enc.memo.GetOrInsert(v)
+               if err != nil {
+                       panic(err)
+               }
+               if !found {
+                       enc.dictEncodedSize += parquet.Int96SizeBytes
+               }
+               enc.addIndex(memoIdx)
+       }
+}
+
+// PutSpaced is like Put but assumes space for nulls
+func (enc *DictInt96Encoder) PutSpaced(in []parquet.Int96, validBits []byte, 
validBitsOffset int64) {
+       utils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), 
func(pos, length int64) error {
+               enc.Put(in[pos : pos+length])
+               return nil
+       })
+}
+
+// DictInt96Decoder is a decoder for decoding dictionary encoded data for 
parquet.Int96 columns
+type DictInt96Decoder struct {
+       dictDecoder
+}
+
+// Type returns the underlying physical type that can be decoded with this 
decoder
+func (DictInt96Decoder) Type() parquet.Type {
+       return parquet.Types.Int96
+}
+
+// Decode populates the passed in slice with min(len(out), remaining values) 
values,
+// decoding using hte dictionary to get the actual values. Returns the number 
of values
+// actually decoded and any error encountered.
+func (d *DictInt96Decoder) Decode(out []parquet.Int96) (int, error) {
+       vals := utils.MinInt(len(out), d.nvals)
+       decoded, err := d.decode(out[:vals])
+       if err != nil {
+               return decoded, err
+       }
+       if vals != decoded {
+               return decoded, xerrors.New("parquet: dict eof exception")
+       }
+       d.nvals -= vals
+       return vals, nil
+}
+
+// Decode spaced is like Decode but will space out the data leaving slots for 
null values
+// based on the provided bitmap.
+func (d *DictInt96Decoder) DecodeSpaced(out []parquet.Int96, nullCount int, 
validBits []byte, validBitsOffset int64) (int, error) {
+       vals := utils.MinInt(len(out), d.nvals)
+       decoded, err := d.decodeSpaced(out[:vals], nullCount, validBits, 
validBitsOffset)
+       if err != nil {
+               return decoded, err
+       }
+       if vals != decoded {
+               return decoded, xerrors.New("parquet: dict spaced eof 
exception")
+       }
+       d.nvals -= vals
+       return vals, nil
+}
+
+// Int96DictConverter is a helper for dictionary handling which is used for 
converting
+// run length encoded indexes into the actual values that are stored in the 
dictionary index page.
+type Int96DictConverter struct {
+       valueDecoder Int96Decoder
+       dict         []parquet.Int96
+       zeroVal      parquet.Int96
+}
+
+// ensure validates that we've decoded dictionary values up to the index
+// provided so that we don't need to decode the entire dictionary at start.
+func (dc *Int96DictConverter) ensure(idx utils.IndexType) error {
+       if len(dc.dict) <= int(idx) {
+               if cap(dc.dict) <= int(idx) {
+                       val := make([]parquet.Int96, int(idx+1)-len(dc.dict))
+                       n, err := dc.valueDecoder.Decode(val)
+                       if err != nil {
+                               return err
+                       }
+                       dc.dict = append(dc.dict, val[:n]...)
+               } else {
+                       cur := len(dc.dict)
+                       n, err := dc.valueDecoder.Decode(dc.dict[cur : idx+1])
+                       if err != nil {
+                               return err
+                       }
+                       dc.dict = dc.dict[:cur+n]
+               }
+       }
+       return nil
+}
+
+// IsValid verifies that the set of indexes passed in are all valid indexes
+// in the dictionary and if necessary decodes dictionary indexes up to the 
index
+// requested.
+func (dc *Int96DictConverter) IsValid(idxes ...utils.IndexType) bool {
+       min, max := utils.GetMinMaxInt32(*(*[]int32)(unsafe.Pointer(&idxes)))
+       dc.ensure(utils.IndexType(max))
+
+       return min >= 0 && int(min) < len(dc.dict) && int(max) >= 0 && int(max) 
< len(dc.dict)
+}
+
+// Fill populates the slice passed in entirely with the value at dictionary 
index indicated by val
+func (dc *Int96DictConverter) Fill(out interface{}, val utils.IndexType) error 
{
+       o := out.([]parquet.Int96)
+       if err := dc.ensure(val); err != nil {
+               return err
+       }
+       o[0] = dc.dict[val]
+       for i := 1; i < len(o); i *= 2 {
+               copy(o[i:], o[:i])
+       }
+       return nil
+}
+
+// FillZero populates the entire slice of out with the zero value for 
parquet.Int96
+func (dc *Int96DictConverter) FillZero(out interface{}) {
+       o := out.([]parquet.Int96)
+       o[0] = dc.zeroVal
+       for i := 1; i < len(o); i *= 2 {
+               copy(o[i:], o[:i])
+       }
+}
+
+// Copy populates the slice provided with the values in the dictionary at the 
indexes
+// in the vals slice.
+func (dc *Int96DictConverter) Copy(out interface{}, vals []utils.IndexType) 
error {
+       o := out.([]parquet.Int96)
+       for idx, val := range vals {
+               o[idx] = dc.dict[val]
+       }
+       return nil
+}
+
 // Float32Encoder is the interface for all encoding types that implement 
encoding
 // float32 values.
 type Float32Encoder interface {
@@ -1385,6 +1535,8 @@ func NewDictConverter(dict TypedDecoder) 
utils.DictionaryConverter {
                return &Int32DictConverter{valueDecoder: dict.(Int32Decoder), 
dict: make([]int32, 0, dict.ValuesLeft())}
        case parquet.Types.Int64:
                return &Int64DictConverter{valueDecoder: dict.(Int64Decoder), 
dict: make([]int64, 0, dict.ValuesLeft())}
+       case parquet.Types.Int96:
+               return &Int96DictConverter{valueDecoder: dict.(Int96Decoder), 
dict: make([]parquet.Int96, 0, dict.ValuesLeft())}
        case parquet.Types.Float:
                return &Float32DictConverter{valueDecoder: 
dict.(Float32Decoder), dict: make([]float32, 0, dict.ValuesLeft())}
        case parquet.Types.Double:
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl 
b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
index d2ebbe4..14c1e9a 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
@@ -56,15 +56,15 @@ type {{.lower}}EncoderTraits struct{}
 
 // Encoder returns an encoder for {{.lower}} type data, using the specified 
encoding type and whether or not
 // it should be dictionary encoded.
-{{- if or (eq .Name "Boolean") (eq .Name "Int96")}}
+{{- if or (eq .Name "Boolean") }}
 // dictionary encoding does not exist for this type and Encoder will panic if 
useDict is true
 {{- end }}
 func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr 
*schema.Column, mem memory.Allocator) TypedEncoder {
   if useDict {
-{{- if or (eq .Name "Boolean") (eq .Name "Int96")}}
+{{- if or (eq .Name "Boolean") }}
     panic("parquet: no {{.name}} dictionary encoding")
 {{- else}}
-    return &Dict{{.Name}}Encoder{newDictEncoderBase(descr, New{{if and (ne 
.Name "ByteArray") (ne .Name 
"FixedLenByteArray")}}{{.Name}}Dictionary(){{else}}BinaryDictionary(mem){{end}},
 mem)}
+    return &Dict{{.Name}}Encoder{newDictEncoderBase(descr, New{{if and (ne 
.Name "Int96") (ne .Name "ByteArray") (ne .Name 
"FixedLenByteArray")}}{{.Name}}Dictionary(){{else}}BinaryDictionary(mem){{end}},
 mem)}
 {{- end}}
   }
 
@@ -105,7 +105,7 @@ func ({{.lower}}DecoderTraits) BytesRequired(n int) int {
 // Decoder returns a decoder for {{.lower}} typed data of the requested 
encoding type if available
 func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr 
*schema.Column, useDict bool, mem memory.Allocator) TypedDecoder {
   if useDict {
-{{- if and (ne .Name "Boolean") (ne .Name "Int96")}}
+{{- if and (ne .Name "Boolean") }}
     return &Dict{{.Name}}Decoder{dictDecoder{decoder: 
newDecoderBase(format.Encoding_RLE_DICTIONARY, descr), mem: mem}}
 {{- else}}
     panic("dictionary decoding unimplemented for {{.lower}}")
@@ -150,7 +150,7 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, 
descr *schema.Column,
   }
 }
 
-{{if and (ne .Name "Boolean") (ne .Name "Int96")}}
+{{if and (ne .Name "Boolean") }}
 // Dict{{.Name}}Encoder is an encoder for {{.name}} data using dictionary 
encoding
 type Dict{{.Name}}Encoder struct {
   dictEncoder
@@ -162,6 +162,12 @@ func (enc *Dict{{.Name}}Encoder) Type() parquet.Type {
 }
 
 {{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}
+{{if (ne .Name "Int96")}}
+// WriteDict populates the byte slice with the dictionary index
+func (enc *Dict{{.Name}}Encoder) WriteDict(out []byte) {
+  enc.memo.CopyValues({{.prefix}}.{{.Name}}Traits.CastFromBytes(out))
+}
+
 // Put encodes the values passed in, adding to the index as needed.
 func (enc *Dict{{.Name}}Encoder) Put(in []{{.name}}) {
   for _, val := range in {
@@ -179,6 +185,34 @@ func (enc *Dict{{.Name}}Encoder) PutSpaced(in []{{.name}}, 
validBits []byte, val
     return nil
   })
 }
+{{else}}
+// WriteDict populates the byte slice with the dictionary index
+func (enc *DictInt96Encoder) WriteDict(out []byte) {
+  enc.memo.(BinaryMemoTable).CopyFixedWidthValues(0, parquet.Int96SizeBytes, 
out)
+}
+
+// Put encodes the values passed in, adding to the index as needed
+func (enc *DictInt96Encoder) Put(in []parquet.Int96) {
+  for _, v := range in {
+    memoIdx, found, err := enc.memo.GetOrInsert(v)
+    if err != nil {
+      panic(err)
+    }
+    if !found {
+      enc.dictEncodedSize += parquet.Int96SizeBytes
+    }
+    enc.addIndex(memoIdx)
+  }
+}
+
+// PutSpaced is like Put but assumes space for nulls
+func (enc *DictInt96Encoder) PutSpaced(in []parquet.Int96, validBits []byte, 
validBitsOffset int64) {
+  utils.VisitSetBitRuns(validBits, validBitsOffset, int64(len(in)), func(pos, 
length int64) error {
+    enc.Put(in[pos : pos+length])
+    return nil
+  })
+}
+{{end}}
 {{end}}
 
 // Dict{{.Name}}Decoder is a decoder for decoding dictionary encoded data for 
{{.name}} columns
@@ -302,7 +336,7 @@ func (dc *{{.Name}}DictConverter) Copy(out interface{}, 
vals []utils.IndexType)
 // decoder as the decoder to decode the dictionary index.
 func NewDictConverter(dict TypedDecoder) utils.DictionaryConverter {
   switch dict.Type() {
-  {{ range .In }}{{ if and (ne .Name "Boolean") (ne .Name "Int96") -}}
+  {{ range .In }}{{ if and (ne .Name "Boolean") -}}
   case parquet.Types.{{if .physical }}{{.physical}}{{else}}{{.Name}}{{end}}:
     return &{{.Name}}DictConverter{valueDecoder: dict.({{.Name}}Decoder), 
dict: make([]{{.name}}, 0, dict.ValuesLeft())}
   {{ end }}{{ end -}}
diff --git a/go/parquet/internal/testutils/pagebuilder.go 
b/go/parquet/internal/testutils/pagebuilder.go
new file mode 100644
index 0000000..f742f1a
--- /dev/null
+++ b/go/parquet/internal/testutils/pagebuilder.go
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutils
+
+import (
+       "encoding/binary"
+       "io"
+       "reflect"
+
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/compress"
+       "github.com/apache/arrow/go/parquet/file"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/apache/arrow/go/parquet/schema"
+       "github.com/stretchr/testify/mock"
+)
+
+type DataPageBuilder struct {
+       sink    io.Writer
+       version parquet.DataPageVersion
+
+       nvals          int
+       encoding       parquet.Encoding
+       defLvlEncoding parquet.Encoding
+       repLvlEncoding parquet.Encoding
+       defLvlBytesLen int
+       repLvlBytesLen int
+       hasDefLvls     bool
+       hasRepLvls     bool
+       hasValues      bool
+}
+
+var mem = memory.NewGoAllocator()
+
+func (d *DataPageBuilder) appendLevels(lvls []int16, maxLvl int16, e 
parquet.Encoding) int {
+       if e != parquet.Encodings.RLE {
+               panic("parquet: only rle encoding currently implemented")
+       }
+
+       buf := encoding.NewBufferWriter(encoding.LevelEncodingMaxBufferSize(e, 
maxLvl, len(lvls)), memory.DefaultAllocator)
+       var enc encoding.LevelEncoder
+       enc.Init(e, maxLvl, buf)
+       enc.Encode(lvls)
+
+       rleBytes := enc.Len()
+       if d.version == parquet.DataPageV1 {
+               if err := binary.Write(d.sink, binary.LittleEndian, 
int32(rleBytes)); err != nil {
+                       panic(err)
+               }
+       }
+
+       if _, err := d.sink.Write(buf.Bytes()[:rleBytes]); err != nil {
+               panic(err)
+       }
+       return rleBytes
+}
+
+func (d *DataPageBuilder) AppendDefLevels(lvls []int16, maxLvl int16) {
+       d.defLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE)
+
+       d.nvals = utils.MaxInt(len(lvls), d.nvals)
+       d.defLvlEncoding = parquet.Encodings.RLE
+       d.hasDefLvls = true
+}
+
+func (d *DataPageBuilder) AppendRepLevels(lvls []int16, maxLvl int16) {
+       d.repLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE)
+
+       d.nvals = utils.MaxInt(len(lvls), d.nvals)
+       d.repLvlEncoding = parquet.Encodings.RLE
+       d.hasRepLvls = true
+}
+
+func (d *DataPageBuilder) AppendValues(desc *schema.Column, values 
interface{}, e parquet.Encoding) {
+       enc := encoding.NewEncoder(desc.PhysicalType(), e, false, desc, mem)
+       var sz int
+       switch v := values.(type) {
+       case []int32:
+               enc.(encoding.Int32Encoder).Put(v)
+               sz = len(v)
+       case []int64:
+               enc.(encoding.Int64Encoder).Put(v)
+               sz = len(v)
+       case []parquet.Int96:
+               enc.(encoding.Int96Encoder).Put(v)
+               sz = len(v)
+       case []float32:
+               enc.(encoding.Float32Encoder).Put(v)
+               sz = len(v)
+       case []float64:
+               enc.(encoding.Float64Encoder).Put(v)
+               sz = len(v)
+       case []parquet.ByteArray:
+               enc.(encoding.ByteArrayEncoder).Put(v)
+               sz = len(v)
+       }
+       buf, _ := enc.FlushValues()
+       _, err := d.sink.Write(buf.Bytes())
+       if err != nil {
+               panic(err)
+       }
+
+       d.nvals = utils.MaxInt(sz, d.nvals)
+       d.encoding = e
+       d.hasValues = true
+}
+
+type DictionaryPageBuilder struct {
+       traits        encoding.DictEncoder
+       numDictValues int32
+       hasValues     bool
+}
+
+func NewDictionaryPageBuilder(d *schema.Column) *DictionaryPageBuilder {
+       return &DictionaryPageBuilder{
+               encoding.NewEncoder(d.PhysicalType(), parquet.Encodings.Plain, 
true, d, mem).(encoding.DictEncoder),
+               0, false}
+}
+
+func (d *DictionaryPageBuilder) AppendValues(values interface{}) 
encoding.Buffer {
+       switch v := values.(type) {
+       case []int32:
+               d.traits.(encoding.Int32Encoder).Put(v)
+       case []int64:
+               d.traits.(encoding.Int64Encoder).Put(v)
+       case []parquet.Int96:
+               d.traits.(encoding.Int96Encoder).Put(v)
+       case []float32:
+               d.traits.(encoding.Float32Encoder).Put(v)
+       case []float64:
+               d.traits.(encoding.Float64Encoder).Put(v)
+       case []parquet.ByteArray:
+               d.traits.(encoding.ByteArrayEncoder).Put(v)
+       }
+
+       d.numDictValues = int32(d.traits.NumEntries())
+       d.hasValues = true
+       buf, _ := d.traits.FlushValues()
+       return buf
+}
+
+func (d *DictionaryPageBuilder) WriteDict() *memory.Buffer {
+       buf := memory.NewBufferBytes(make([]byte, d.traits.DictEncodedSize()))
+       d.traits.WriteDict(buf.Bytes())
+       return buf
+}
+
+func (d *DictionaryPageBuilder) NumValues() int32 {
+       return d.numDictValues
+}
+
+func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column, 
values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer, 
defLvls, repLvls []int16, maxDef, maxRep int16) file.Page {
+       num := 0
+
+       stream := encoding.NewBufferWriter(1024, mem)
+       builder := DataPageBuilder{sink: stream, version: dataPageVersion}
+
+       if len(repLvls) > 0 {
+               builder.AppendRepLevels(repLvls, maxRep)
+       }
+       if len(defLvls) > 0 {
+               builder.AppendDefLevels(defLvls, maxDef)
+       }
+
+       if e == parquet.Encodings.Plain {
+               builder.AppendValues(d, values, e)
+               num = builder.nvals
+       } else {
+               stream.Write(indexBuffer.Bytes())
+               num = utils.MaxInt(builder.nvals, nvals)
+       }
+
+       buf := stream.Finish()
+       if dataPageVersion == parquet.DataPageV1 {
+               return file.NewDataPageV1(buf, int32(num), e, 
builder.defLvlEncoding, builder.repLvlEncoding, int64(buf.Len()))
+       }
+       return file.NewDataPageV2(buf, int32(num), 0, int32(num), e, 
int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), int64(buf.Len()), 
false)
+}
+
+func MakeDictPage(d *schema.Column, values interface{}, valuesPerPage []int, e 
parquet.Encoding) (*file.DictionaryPage, []encoding.Buffer) {
+       bldr := NewDictionaryPageBuilder(d)
+       npages := len(valuesPerPage)
+
+       ref := reflect.ValueOf(values)
+       valStart := 0
+
+       rleIndices := make([]encoding.Buffer, 0, npages)
+       for _, nvals := range valuesPerPage {
+               rleIndices = append(rleIndices, 
bldr.AppendValues(ref.Slice(valStart, valStart+nvals).Interface()))
+               valStart += nvals
+       }
+
+       buffer := bldr.WriteDict()
+       return file.NewDictionaryPage(buffer, bldr.NumValues(), 
parquet.Encodings.Plain), rleIndices
+}
+
+type MockPageReader struct {
+       mock.Mock
+
+       curpage int
+}
+
+func (m *MockPageReader) Err() error {
+       return m.Called().Error(0)
+}
+
+func (m *MockPageReader) Reset(parquet.ReaderAtSeeker, int64, 
compress.Compression, *file.CryptoContext) {
+}
+
+func (m *MockPageReader) SetMaxPageHeaderSize(int) {}
+
+func (m *MockPageReader) Page() file.Page {
+       return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1]
+}
+
+func (m *MockPageReader) Next() bool {
+       pageList := m.TestData().Get("pages").Data().([]file.Page)
+       m.curpage++
+       return len(pageList) >= m.curpage
+}
+
+func PaginatePlain(version parquet.DataPageVersion, d *schema.Column, values 
reflect.Value, defLevels, repLevels []int16,
+       maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc 
parquet.Encoding) []file.Page {
+
+       var (
+               npages      = len(valuesPerPage)
+               defLvlStart = 0
+               defLvlEnd   = 0
+               repLvlStart = 0
+               repLvlEnd   = 0
+               valueStart  = 0
+       )
+
+       pageList := make([]file.Page, 0, npages)
+       for i := 0; i < npages; i++ {
+               if maxDef > 0 {
+                       defLvlStart = i * lvlsPerPage
+                       defLvlEnd = (i + 1) * lvlsPerPage
+               }
+               if maxRep > 0 {
+                       repLvlStart = i * lvlsPerPage
+                       repLvlEnd = (i + 1) * lvlsPerPage
+               }
+
+               page := MakeDataPage(version, d,
+                       values.Slice(valueStart, 
valueStart+valuesPerPage[i]).Interface(),
+                       valuesPerPage[i], enc, nil, 
defLevels[defLvlStart:defLvlEnd],
+                       repLevels[repLvlStart:repLvlEnd], maxDef, maxRep)
+               valueStart += valuesPerPage[i]
+               pageList = append(pageList, page)
+       }
+       return pageList
+}
+
+func PaginateDict(version parquet.DataPageVersion, d *schema.Column, values 
reflect.Value, defLevels, repLevels []int16, maxDef, maxRep int16, lvlsPerPage 
int, valuesPerPage []int, enc parquet.Encoding) []file.Page {
+       var (
+               npages   = len(valuesPerPage)
+               pages    = make([]file.Page, 0, npages)
+               defStart = 0
+               defEnd   = 0
+               repStart = 0
+               repEnd   = 0
+       )
+
+       dictPage, rleIndices := MakeDictPage(d, values.Interface(), 
valuesPerPage, enc)
+       pages = append(pages, dictPage)
+       for i := 0; i < npages; i++ {
+               if maxDef > 0 {
+                       defStart = i * lvlsPerPage
+                       defEnd = (i + 1) * lvlsPerPage
+               }
+               if maxRep > 0 {
+                       repStart = i * lvlsPerPage
+                       repEnd = (i + 1) * lvlsPerPage
+               }
+               page := MakeDataPage(version, d, nil, valuesPerPage[i], enc, 
rleIndices[i],
+                       defLevels[defStart:defEnd], repLevels[repStart:repEnd], 
maxDef, maxRep)
+               pages = append(pages, page)
+       }
+       return pages
+}
diff --git a/go/parquet/reader_properties.go b/go/parquet/reader_properties.go
index 92abae5..7e99d9f 100644
--- a/go/parquet/reader_properties.go
+++ b/go/parquet/reader_properties.go
@@ -20,7 +20,6 @@ import (
        "bytes"
        "io"
 
-       "github.com/apache/arrow/go/arrow/ipc"
        "github.com/apache/arrow/go/arrow/memory"
        "golang.org/x/xerrors"
 )
@@ -61,7 +60,7 @@ func (r *ReaderProperties) Allocator() memory.Allocator { 
return r.alloc }
 //
 // If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise 
it will read the entire section
 // into a buffer in memory and return a bytes.NewReader for that buffer.
-func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) 
(ipc.ReadAtSeeker, error) {
+func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) 
(ReaderAtSeeker, error) {
        if r.BufferedStreamEnabled {
                return io.NewSectionReader(source, start, nbytes), nil
        }
diff --git a/go/parquet/types.go b/go/parquet/types.go
index e568984..630244c 100644
--- a/go/parquet/types.go
+++ b/go/parquet/types.go
@@ -18,6 +18,7 @@ package parquet
 
 import (
        "encoding/binary"
+       "io"
        "reflect"
        "strings"
        "time"
@@ -47,6 +48,15 @@ var (
        FixedLenByteArraySizeBytes int = 
int(reflect.TypeOf(FixedLenByteArray{}).Size())
 )
 
+// ReaderAtSeeker is a combination of the ReaderAt and ReadSeeker interfaces
+// from the io package defining the only functionality that is required
+// in order for a parquet file to be read by the file functions. We just need
+// to be able to call ReadAt, Read, and Seek
+type ReaderAtSeeker interface {
+       io.ReaderAt
+       io.ReadSeeker
+}
+
 // NewInt96 creates a new Int96 from the given 3 uint32 values.
 func NewInt96(v [3]uint32) (out Int96) {
        binary.LittleEndian.PutUint32(out[0:], v[0])

Reply via email to