This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 23b62a441e GH-38462: [Go][Parquet] Handle Boolean RLE 
encoding/decoding (#38367)
23b62a441e is described below

commit 23b62a441ee73ef1643a28e992335b9029f0ebe4
Author: Matt Topol <[email protected]>
AuthorDate: Mon Oct 30 11:11:41 2023 -0400

    GH-38462: [Go][Parquet] Handle Boolean RLE encoding/decoding (#38367)
    
    
    
    ### Rationale for this change
    Looks like the parquet-testing repo files have been updated and now include 
boolean columns which use the RLE encoding type. This causes the Go parquet lib 
to fail verification tests when it pulls the most recent commits for the 
parquet-testing repository. So a solution for this is to actually implement the 
RleBoolean encoder and decoder.
    
    ### What changes are included in this PR?
    Adding `RleBooleanEncoder` and `RleBooleanDecoder` and updating the 
`parquet-testing` repo.
    
    ### Are these changes tested?
    Unit tests are added, and this is also tested via the `parquet-testing` 
golden files.
    
    * Closes: #38345
    * Closes: #38462
    
    Lead-authored-by: Matt Topol <[email protected]>
    Co-authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/parquet/file/column_reader.go                   |  8 ++-
 go/parquet/file/file_reader_test.go                | 61 ++++++++++++++++
 go/parquet/internal/encoding/boolean_decoder.go    | 82 +++++++++++++++++++++-
 go/parquet/internal/encoding/boolean_encoder.go    | 55 +++++++++++++++
 go/parquet/internal/encoding/encoder.go            |  2 +-
 go/parquet/internal/encoding/encoding_test.go      | 12 ++++
 go/parquet/internal/encoding/levels.go             |  2 +-
 go/parquet/internal/encoding/typed_encoder.gen.go  |  4 ++
 .../internal/encoding/typed_encoder.gen.go.tmpl    |  8 +++
 go/parquet/internal/testutils/random.go            | 13 ++--
 go/parquet/internal/utils/bit_reader_test.go       |  2 +-
 go/parquet/internal/utils/rle.go                   |  4 +-
 12 files changed, 239 insertions(+), 14 deletions(-)

diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
index 1ebceaca23..b623bd5074 100644
--- a/go/parquet/file/column_reader.go
+++ b/go/parquet/file/column_reader.go
@@ -17,6 +17,7 @@
 package file
 
 import (
+       "errors"
        "fmt"
        "sync"
 
@@ -345,6 +346,11 @@ func (c *columnChunkReader) initDataDecoder(page Page, 
lvlByteLen int64) error {
                c.curDecoder = decoder
        } else {
                switch encoding {
+               case format.Encoding_RLE:
+                       if c.descr.PhysicalType() != parquet.Types.Boolean {
+                               return fmt.Errorf("parquet: only boolean 
supports RLE encoding, got %s", c.descr.PhysicalType())
+                       }
+                       fallthrough
                case format.Encoding_PLAIN,
                        format.Encoding_DELTA_BYTE_ARRAY,
                        format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
@@ -352,7 +358,7 @@ func (c *columnChunkReader) initDataDecoder(page Page, 
lvlByteLen int64) error {
                        c.curDecoder = 
c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
                        c.decoders[encoding] = c.curDecoder
                case format.Encoding_RLE_DICTIONARY:
-                       return xerrors.New("parquet: dictionary page must be 
before data page")
+                       return errors.New("parquet: dictionary page must be 
before data page")
                case format.Encoding_BYTE_STREAM_SPLIT:
                        return fmt.Errorf("parquet: unsupported data encoding 
%s", encoding)
                default:
diff --git a/go/parquet/file/file_reader_test.go 
b/go/parquet/file/file_reader_test.go
index eccb572b30..2a9b097139 100644
--- a/go/parquet/file/file_reader_test.go
+++ b/go/parquet/file/file_reader_test.go
@@ -21,6 +21,8 @@ import (
        "crypto/rand"
        "encoding/binary"
        "io"
+       "os"
+       "path"
        "testing"
 
        "github.com/apache/arrow/go/v14/arrow/memory"
@@ -385,3 +387,62 @@ func TestDeltaLengthByteArrayPackingWithNulls(t 
*testing.T) {
                assert.NotNil(t, readData[0])
        }
 }
+
+func TestRleBooleanEncodingFileRead(t *testing.T) {
+       dir := os.Getenv("PARQUET_TEST_DATA")
+       if dir == "" {
+               t.Skip("no path supplied with PARQUET_TEST_DATA")
+       }
+       assert.DirExists(t, dir)
+
+       props := parquet.NewReaderProperties(memory.DefaultAllocator)
+       fileReader, err := file.OpenParquetFile(path.Join(dir, 
"rle_boolean_encoding.parquet"),
+               false, file.WithReadProps(props))
+       require.NoError(t, err)
+       defer fileReader.Close()
+
+       assert.Equal(t, 1, fileReader.NumRowGroups())
+       rgr := fileReader.RowGroup(0)
+       assert.EqualValues(t, 68, rgr.NumRows())
+
+       rdr, err := rgr.Column(0)
+       require.NoError(t, err)
+       brdr := rdr.(*file.BooleanColumnChunkReader)
+
+       values := make([]bool, 68)
+       defLvls, repLvls := make([]int16, 68), make([]int16, 68)
+       total, read, err := brdr.ReadBatch(68, values, defLvls, repLvls)
+       require.NoError(t, err)
+
+       assert.EqualValues(t, 68, total)
+       md, err := rgr.MetaData().ColumnChunk(0)
+       require.NoError(t, err)
+       stats, err := md.Statistics()
+       require.NoError(t, err)
+       assert.EqualValues(t, total-stats.NullCount(), read)
+
+       expected := []bool{
+               true, false, true, true, false, false,
+               true, true, true, false, false, true, true,
+               false, true, true, false, false, true, true,
+               false, true, true, false, false, true, true,
+               true, false, false, false, false, true, true,
+               false, true, true, false, false, true, true,
+               true, false, false, true, true, false, false,
+               true, true, true, false, true, true, false,
+               true, true, false, false, true, true, true,
+       }
+       expectedNulls := []int{2, 15, 23, 38, 48, 60}
+
+       expectedNullIdx := 0
+       for i, v := range defLvls {
+               if expectedNullIdx < len(expectedNulls) && i == 
expectedNulls[expectedNullIdx] {
+                       assert.Zero(t, v)
+                       expectedNullIdx++
+               } else {
+                       assert.EqualValues(t, 1, v)
+               }
+       }
+
+       assert.Equal(t, expected, values[:len(expected)])
+}
diff --git a/go/parquet/internal/encoding/boolean_decoder.go 
b/go/parquet/internal/encoding/boolean_decoder.go
index dd213395d6..337a6db967 100644
--- a/go/parquet/internal/encoding/boolean_decoder.go
+++ b/go/parquet/internal/encoding/boolean_decoder.go
@@ -17,11 +17,16 @@
 package encoding
 
 import (
+       "bytes"
+       "encoding/binary"
+       "errors"
+       "fmt"
+       "io"
+
        "github.com/apache/arrow/go/v14/arrow/bitutil"
        shared_utils "github.com/apache/arrow/go/v14/internal/utils"
        "github.com/apache/arrow/go/v14/parquet"
        "github.com/apache/arrow/go/v14/parquet/internal/utils"
-       "golang.org/x/xerrors"
 )
 
 // PlainBooleanDecoder is for the Plain Encoding type, there is no
@@ -103,7 +108,80 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, 
nullCount int, validBit
                        return 0, err
                }
                if valuesRead != toRead {
-                       return valuesRead, xerrors.New("parquet: boolean 
decoder: number of values / definition levels read did not match")
+                       return valuesRead, errors.New("parquet: boolean 
decoder: number of values / definition levels read did not match")
+               }
+               return spacedExpand(out, nullCount, validBits, 
validBitsOffset), nil
+       }
+       return dec.Decode(out)
+}
+
+type RleBooleanDecoder struct {
+       decoder
+
+       rleDec *utils.RleDecoder
+}
+
+func (RleBooleanDecoder) Type() parquet.Type {
+       return parquet.Types.Boolean
+}
+
+func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
+       dec.nvals = nvals
+
+       if len(data) < 4 {
+               return fmt.Errorf("invalid length - %d (corrupt data page?)", 
len(data))
+       }
+
+       // load the first 4 bytes in little-endian which indicates the length
+       nbytes := binary.LittleEndian.Uint32(data[:4])
+       if nbytes > uint32(len(data)-4) {
+               return fmt.Errorf("received invalid number of bytes - %d 
(corrupt data page?)", nbytes)
+       }
+
+       dec.data = data[4:]
+       if dec.rleDec == nil {
+               dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
+       } else {
+               dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
+       }
+       return nil
+}
+
+func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
+       max := shared_utils.MinInt(len(out), dec.nvals)
+
+       var (
+               buf [1024]uint64
+               n   = max
+       )
+
+       for n > 0 {
+               batch := shared_utils.MinInt(len(buf), n)
+               decoded := dec.rleDec.GetBatch(buf[:batch])
+               if decoded != batch {
+                       return max - n, io.ErrUnexpectedEOF
+               }
+
+               for i := 0; i < batch; i++ {
+                       out[i] = buf[i] != 0
+               }
+               n -= batch
+               out = out[batch:]
+       }
+
+       dec.nvals -= max
+       return max, nil
+}
+
+func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, 
validBits []byte, validBitsOffset int64) (int, error) {
+       if nullCount > 0 {
+               toRead := len(out) - nullCount
+               valuesRead, err := dec.Decode(out[:toRead])
+               if err != nil {
+                       return 0, err
+               }
+               if valuesRead != toRead {
+                       return valuesRead, errors.New("parquet: rle boolean 
decoder: number of values / definition levels read did not match")
                }
                return spacedExpand(out, nullCount, validBits, 
validBitsOffset), nil
        }
diff --git a/go/parquet/internal/encoding/boolean_encoder.go 
b/go/parquet/internal/encoding/boolean_encoder.go
index 65ba2658b0..3970e05fca 100644
--- a/go/parquet/internal/encoding/boolean_encoder.go
+++ b/go/parquet/internal/encoding/boolean_encoder.go
@@ -17,8 +17,11 @@
 package encoding
 
 import (
+       "encoding/binary"
+
        "github.com/apache/arrow/go/v14/arrow/bitutil"
        "github.com/apache/arrow/go/v14/parquet"
+       "github.com/apache/arrow/go/v14/parquet/internal/debug"
        "github.com/apache/arrow/go/v14/parquet/internal/utils"
 )
 
@@ -87,3 +90,55 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer, 
error) {
 
        return enc.sink.Finish(), nil
 }
+
+const rleLengthInBytes = 4
+
+type RleBooleanEncoder struct {
+       encoder
+
+       bufferedValues []bool
+}
+
+func (RleBooleanEncoder) Type() parquet.Type {
+       return parquet.Types.Boolean
+}
+
+func (enc *RleBooleanEncoder) Put(in []bool) {
+       enc.bufferedValues = append(enc.bufferedValues, in...)
+}
+
+func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, 
validBitsOffset int64) {
+       bufferOut := make([]bool, len(in))
+       nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
+       enc.Put(bufferOut[:nvalid])
+}
+
+func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
+       return rleLengthInBytes + int64(enc.maxRleBufferSize())
+}
+
+func (enc *RleBooleanEncoder) maxRleBufferSize() int {
+       return utils.MaxRLEBufferSize(1, len(enc.bufferedValues)) +
+               utils.MinRLEBufferSize(1)
+}
+
+func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
+       rleBufferSizeMax := enc.maxRleBufferSize()
+       enc.sink.SetOffset(rleLengthInBytes)
+       enc.sink.Reserve(rleBufferSizeMax)
+
+       rleEncoder := utils.NewRleEncoder(enc.sink, 1)
+       for _, v := range enc.bufferedValues {
+               if v {
+                       rleEncoder.Put(1)
+               } else {
+                       rleEncoder.Put(0)
+               }
+       }
+       n := rleEncoder.Flush()
+       debug.Assert(n <= rleBufferSizeMax, "num encoded bytes larger than 
expected max")
+       buf := enc.sink.Finish()
+       binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))
+
+       return buf, nil
+}
diff --git a/go/parquet/internal/encoding/encoder.go 
b/go/parquet/internal/encoding/encoder.go
index 9626e4e9ff..f6b57fe63c 100644
--- a/go/parquet/internal/encoding/encoder.go
+++ b/go/parquet/internal/encoding/encoder.go
@@ -244,7 +244,7 @@ func (d *dictEncoder) FlushValues() (Buffer, error) {
 // EstimatedDataEncodedSize returns the maximum number of bytes needed to 
store the RLE encoded indexes, not including the
 // dictionary index in the computation.
 func (d *dictEncoder) EstimatedDataEncodedSize() int64 {
-       return 1 + int64(utils.MaxBufferSize(d.BitWidth(), 
len(d.idxValues))+utils.MinBufferSize(d.BitWidth()))
+       return 1 + int64(utils.MaxRLEBufferSize(d.BitWidth(), 
len(d.idxValues))+utils.MinRLEBufferSize(d.BitWidth()))
 }
 
 // NumEntries returns the number of entires in the dictionary index for this 
encoder.
diff --git a/go/parquet/internal/encoding/encoding_test.go 
b/go/parquet/internal/encoding/encoding_test.go
index 50e72de004..b0d86321e0 100644
--- a/go/parquet/internal/encoding/encoding_test.go
+++ b/go/parquet/internal/encoding/encoding_test.go
@@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
        b.checkRoundTrip(parquet.Encodings.Plain)
 }
 
+func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
+       switch b.typ {
+       case reflect.TypeOf(true):
+               b.initData(2000, 200)
+               b.checkRoundTrip(parquet.Encodings.RLE)
+       default:
+               b.T().SkipNow()
+       }
+}
+
 func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
        b.initData(10000, 1)
 
@@ -408,6 +418,8 @@ func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
                        if validBits != nil {
                                b.checkRoundTripSpaced(parquet.Encodings.Plain, 
validBits, validBitsOffset)
                                switch b.typ {
+                               case reflect.TypeOf(false):
+                                       
b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset)
                                case reflect.TypeOf(int32(0)), 
reflect.TypeOf(int64(0)):
                                        
b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, 
validBitsOffset)
                                case reflect.TypeOf(parquet.ByteArray{}):
diff --git a/go/parquet/internal/encoding/levels.go 
b/go/parquet/internal/encoding/levels.go
index c5622519b0..e04ec19d54 100644
--- a/go/parquet/internal/encoding/levels.go
+++ b/go/parquet/internal/encoding/levels.go
@@ -48,7 +48,7 @@ func LevelEncodingMaxBufferSize(encoding parquet.Encoding, 
maxLvl int16, nbuffer
        nbytes := 0
        switch encoding {
        case parquet.Encodings.RLE:
-               nbytes = utils.MaxBufferSize(bitWidth, nbuffered) + 
utils.MinBufferSize(bitWidth)
+               nbytes = utils.MaxRLEBufferSize(bitWidth, nbuffered) + 
utils.MinRLEBufferSize(bitWidth)
        case parquet.Encodings.BitPacked:
                nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
        default:
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go 
b/go/parquet/internal/encoding/typed_encoder.gen.go
index 7c1f954632..411e87c17e 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go
@@ -1225,6 +1225,8 @@ func (boolEncoderTraits) Encoder(e format.Encoding, 
useDict bool, descr *schema.
        switch e {
        case format.Encoding_PLAIN:
                return &PlainBooleanEncoder{encoder: newEncoderBase(e, descr, 
mem)}
+       case format.Encoding_RLE:
+               return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, 
mem)}
        default:
                panic("unimplemented encoding type")
        }
@@ -1248,6 +1250,8 @@ func (boolDecoderTraits) Decoder(e parquet.Encoding, 
descr *schema.Column, useDi
        switch e {
        case parquet.Encodings.Plain:
                return &PlainBooleanDecoder{decoder: 
newDecoderBase(format.Encoding(e), descr)}
+       case parquet.Encodings.RLE:
+               return &RleBooleanDecoder{decoder: 
newDecoderBase(format.Encoding(e), descr)}
        default:
                panic("unimplemented encoding type")
        }
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl 
b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
index f0d4fb50ae..69415ccca4 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
@@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, 
useDict bool, descr *s
   switch e {
   case format.Encoding_PLAIN:
     return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
+{{- if eq .Name "Boolean" }}
+  case format.Encoding_RLE:
+    return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)}
+{{- end}}
 {{- if or (eq .Name "Int32") (eq .Name "Int64")}}
   case format.Encoding_DELTA_BINARY_PACKED:
     return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
@@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, 
descr *schema.Column,
   switch e {
   case parquet.Encodings.Plain:
     return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), 
descr)}
+{{- if eq .Name "Boolean" }}
+  case parquet.Encodings.RLE:
+    return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), 
descr)}
+{{- end}}
 {{- if or (eq .Name "Int32") (eq .Name "Int64")}}
   case parquet.Encodings.DeltaBinaryPacked:
     if mem == nil {
diff --git a/go/parquet/internal/testutils/random.go 
b/go/parquet/internal/testutils/random.go
index 2c8a2809dc..d9a06da43b 100644
--- a/go/parquet/internal/testutils/random.go
+++ b/go/parquet/internal/testutils/random.go
@@ -438,15 +438,16 @@ func fillRandomIsValid(seed uint64, pctNull float64, out 
[]bool) {
 // If the type is parquet.ByteArray or parquet.FixedLenByteArray, heap must 
not be null.
 //
 // The default values are:
-//  []bool uses the current time as the seed with only values of 1 being 
false, for use
-//   of creating validity boolean slices.
-//  all other types use 0 as the seed
-//  a []parquet.ByteArray is populated with lengths between 2 and 12
-//  a []parquet.FixedLenByteArray is populated with fixed size random byte 
arrays of length 12.
+//
+//     []bool uses the current time as the seed with only values of 1 being 
false, for use
+//      of creating validity boolean slices.
+//     all other types use 0 as the seed
+//     a []parquet.ByteArray is populated with lengths between 2 and 12
+//     a []parquet.FixedLenByteArray is populated with fixed size random byte 
arrays of length 12.
 func InitValues(values interface{}, heap *memory.Buffer) {
        switch arr := values.(type) {
        case []bool:
-               fillRandomIsValid(uint64(time.Now().Unix()), 1.0, arr)
+               fillRandomIsValid(uint64(time.Now().Unix()), 0.5, arr)
        case []int32:
                FillRandomInt32(0, arr)
        case []int64:
diff --git a/go/parquet/internal/utils/bit_reader_test.go 
b/go/parquet/internal/utils/bit_reader_test.go
index 317cc4960a..c285f5165c 100644
--- a/go/parquet/internal/utils/bit_reader_test.go
+++ b/go/parquet/internal/utils/bit_reader_test.go
@@ -494,7 +494,7 @@ func (r *RLERandomSuite) checkRoundTrip(vals []uint64, 
width int) bool {
 
 func (r *RLERandomSuite) checkRoundTripSpaced(vals arrow.Array, width int) {
        nvalues := vals.Len()
-       bufsize := utils.MaxBufferSize(width, nvalues)
+       bufsize := utils.MaxRLEBufferSize(width, nvalues)
 
        buffer := make([]byte, bufsize)
        encoder := utils.NewRleEncoder(utils.NewWriterAtBuffer(buffer), width)
diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go
index 866d7c61b4..fef322c6fd 100644
--- a/go/parquet/internal/utils/rle.go
+++ b/go/parquet/internal/utils/rle.go
@@ -37,13 +37,13 @@ const (
        MaxValuesPerLiteralRun = (1 << 6) * 8
 )
 
-func MinBufferSize(bitWidth int) int {
+func MinRLEBufferSize(bitWidth int) int {
        maxLiteralRunSize := 1 + 
bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth))
        maxRepeatedRunSize := binary.MaxVarintLen32 + 
bitutil.BytesForBits(int64(bitWidth))
        return int(utils.Max(maxLiteralRunSize, maxRepeatedRunSize))
 }
 
-func MaxBufferSize(width, numValues int) int {
+func MaxRLEBufferSize(width, numValues int) int {
        bytesPerRun := width
        numRuns := int(bitutil.BytesForBits(int64(numValues)))
        literalMaxSize := numRuns + (numRuns * bytesPerRun)

Reply via email to