This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 23b62a441e GH-38462: [Go][Parquet] Handle Boolean RLE
encoding/decoding (#38367)
23b62a441e is described below
commit 23b62a441ee73ef1643a28e992335b9029f0ebe4
Author: Matt Topol <[email protected]>
AuthorDate: Mon Oct 30 11:11:41 2023 -0400
GH-38462: [Go][Parquet] Handle Boolean RLE encoding/decoding (#38367)
### Rationale for this change
Looks like the parquet-testing repo files have been updated and now include
boolean columns which use the RLE encoding type. This causes the Go parquet lib
to fail verification tests when it pulls the most recent commits for the
parquet-testing repository. So a solution for this is to actually implement the
RleBoolean encoder and decoder.
### What changes are included in this PR?
Adding `RleBooleanEncoder` and `RleBooleanDecoder` and updating the
`parquet-testing` repo.
### Are these changes tested?
Unit tests are added, and this is also tested via the `parquet-testing`
golden files.
* Closes: #38345
* Closes: #38462
Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/parquet/file/column_reader.go | 8 ++-
go/parquet/file/file_reader_test.go | 61 ++++++++++++++++
go/parquet/internal/encoding/boolean_decoder.go | 82 +++++++++++++++++++++-
go/parquet/internal/encoding/boolean_encoder.go | 55 +++++++++++++++
go/parquet/internal/encoding/encoder.go | 2 +-
go/parquet/internal/encoding/encoding_test.go | 12 ++++
go/parquet/internal/encoding/levels.go | 2 +-
go/parquet/internal/encoding/typed_encoder.gen.go | 4 ++
.../internal/encoding/typed_encoder.gen.go.tmpl | 8 +++
go/parquet/internal/testutils/random.go | 13 ++--
go/parquet/internal/utils/bit_reader_test.go | 2 +-
go/parquet/internal/utils/rle.go | 4 +-
12 files changed, 239 insertions(+), 14 deletions(-)
diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
index 1ebceaca23..b623bd5074 100644
--- a/go/parquet/file/column_reader.go
+++ b/go/parquet/file/column_reader.go
@@ -17,6 +17,7 @@
package file
import (
+ "errors"
"fmt"
"sync"
@@ -345,6 +346,11 @@ func (c *columnChunkReader) initDataDecoder(page Page,
lvlByteLen int64) error {
c.curDecoder = decoder
} else {
switch encoding {
+ case format.Encoding_RLE:
+ if c.descr.PhysicalType() != parquet.Types.Boolean {
+ return fmt.Errorf("parquet: only boolean
supports RLE encoding, got %s", c.descr.PhysicalType())
+ }
+ fallthrough
case format.Encoding_PLAIN,
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
@@ -352,7 +358,7 @@ func (c *columnChunkReader) initDataDecoder(page Page,
lvlByteLen int64) error {
c.curDecoder =
c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
- return xerrors.New("parquet: dictionary page must be
before data page")
+ return errors.New("parquet: dictionary page must be
before data page")
case format.Encoding_BYTE_STREAM_SPLIT:
return fmt.Errorf("parquet: unsupported data encoding
%s", encoding)
default:
diff --git a/go/parquet/file/file_reader_test.go
b/go/parquet/file/file_reader_test.go
index eccb572b30..2a9b097139 100644
--- a/go/parquet/file/file_reader_test.go
+++ b/go/parquet/file/file_reader_test.go
@@ -21,6 +21,8 @@ import (
"crypto/rand"
"encoding/binary"
"io"
+ "os"
+ "path"
"testing"
"github.com/apache/arrow/go/v14/arrow/memory"
@@ -385,3 +387,62 @@ func TestDeltaLengthByteArrayPackingWithNulls(t
*testing.T) {
assert.NotNil(t, readData[0])
}
}
+
+func TestRleBooleanEncodingFileRead(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ t.Skip("no path supplied with PARQUET_TEST_DATA")
+ }
+ assert.DirExists(t, dir)
+
+ props := parquet.NewReaderProperties(memory.DefaultAllocator)
+ fileReader, err := file.OpenParquetFile(path.Join(dir,
"rle_boolean_encoding.parquet"),
+ false, file.WithReadProps(props))
+ require.NoError(t, err)
+ defer fileReader.Close()
+
+ assert.Equal(t, 1, fileReader.NumRowGroups())
+ rgr := fileReader.RowGroup(0)
+ assert.EqualValues(t, 68, rgr.NumRows())
+
+ rdr, err := rgr.Column(0)
+ require.NoError(t, err)
+ brdr := rdr.(*file.BooleanColumnChunkReader)
+
+ values := make([]bool, 68)
+ defLvls, repLvls := make([]int16, 68), make([]int16, 68)
+ total, read, err := brdr.ReadBatch(68, values, defLvls, repLvls)
+ require.NoError(t, err)
+
+ assert.EqualValues(t, 68, total)
+ md, err := rgr.MetaData().ColumnChunk(0)
+ require.NoError(t, err)
+ stats, err := md.Statistics()
+ require.NoError(t, err)
+ assert.EqualValues(t, total-stats.NullCount(), read)
+
+ expected := []bool{
+ true, false, true, true, false, false,
+ true, true, true, false, false, true, true,
+ false, true, true, false, false, true, true,
+ false, true, true, false, false, true, true,
+ true, false, false, false, false, true, true,
+ false, true, true, false, false, true, true,
+ true, false, false, true, true, false, false,
+ true, true, true, false, true, true, false,
+ true, true, false, false, true, true, true,
+ }
+ expectedNulls := []int{2, 15, 23, 38, 48, 60}
+
+ expectedNullIdx := 0
+ for i, v := range defLvls {
+ if expectedNullIdx < len(expectedNulls) && i ==
expectedNulls[expectedNullIdx] {
+ assert.Zero(t, v)
+ expectedNullIdx++
+ } else {
+ assert.EqualValues(t, 1, v)
+ }
+ }
+
+ assert.Equal(t, expected, values[:len(expected)])
+}
diff --git a/go/parquet/internal/encoding/boolean_decoder.go
b/go/parquet/internal/encoding/boolean_decoder.go
index dd213395d6..337a6db967 100644
--- a/go/parquet/internal/encoding/boolean_decoder.go
+++ b/go/parquet/internal/encoding/boolean_decoder.go
@@ -17,11 +17,16 @@
package encoding
import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+
"github.com/apache/arrow/go/v14/arrow/bitutil"
shared_utils "github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
- "golang.org/x/xerrors"
)
// PlainBooleanDecoder is for the Plain Encoding type, there is no
@@ -103,7 +108,80 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool,
nullCount int, validBit
return 0, err
}
if valuesRead != toRead {
- return valuesRead, xerrors.New("parquet: boolean
decoder: number of values / definition levels read did not match")
+ return valuesRead, errors.New("parquet: boolean
decoder: number of values / definition levels read did not match")
+ }
+ return spacedExpand(out, nullCount, validBits,
validBitsOffset), nil
+ }
+ return dec.Decode(out)
+}
+
+type RleBooleanDecoder struct {
+ decoder
+
+ rleDec *utils.RleDecoder
+}
+
+func (RleBooleanDecoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
+ dec.nvals = nvals
+
+ if len(data) < 4 {
+ return fmt.Errorf("invalid length - %d (corrupt data page?)",
len(data))
+ }
+
+ // load the first 4 bytes in little-endian which indicates the length
+ nbytes := binary.LittleEndian.Uint32(data[:4])
+ if nbytes > uint32(len(data)-4) {
+ return fmt.Errorf("received invalid number of bytes - %d
(corrupt data page?)", nbytes)
+ }
+
+ dec.data = data[4:]
+ if dec.rleDec == nil {
+ dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
+ } else {
+ dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
+ }
+ return nil
+}
+
+func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
+ max := shared_utils.MinInt(len(out), dec.nvals)
+
+ var (
+ buf [1024]uint64
+ n = max
+ )
+
+ for n > 0 {
+ batch := shared_utils.MinInt(len(buf), n)
+ decoded := dec.rleDec.GetBatch(buf[:batch])
+ if decoded != batch {
+ return max - n, io.ErrUnexpectedEOF
+ }
+
+ for i := 0; i < batch; i++ {
+ out[i] = buf[i] != 0
+ }
+ n -= batch
+ out = out[batch:]
+ }
+
+ dec.nvals -= max
+ return max, nil
+}
+
+func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int,
validBits []byte, validBitsOffset int64) (int, error) {
+ if nullCount > 0 {
+ toRead := len(out) - nullCount
+ valuesRead, err := dec.Decode(out[:toRead])
+ if err != nil {
+ return 0, err
+ }
+ if valuesRead != toRead {
+ return valuesRead, errors.New("parquet: rle boolean
decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits,
validBitsOffset), nil
}
diff --git a/go/parquet/internal/encoding/boolean_encoder.go
b/go/parquet/internal/encoding/boolean_encoder.go
index 65ba2658b0..3970e05fca 100644
--- a/go/parquet/internal/encoding/boolean_encoder.go
+++ b/go/parquet/internal/encoding/boolean_encoder.go
@@ -17,8 +17,11 @@
package encoding
import (
+ "encoding/binary"
+
"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/parquet"
+ "github.com/apache/arrow/go/v14/parquet/internal/debug"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
)
@@ -87,3 +90,55 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer,
error) {
return enc.sink.Finish(), nil
}
+
+const rleLengthInBytes = 4
+
+type RleBooleanEncoder struct {
+ encoder
+
+ bufferedValues []bool
+}
+
+func (RleBooleanEncoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (enc *RleBooleanEncoder) Put(in []bool) {
+ enc.bufferedValues = append(enc.bufferedValues, in...)
+}
+
+func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte,
validBitsOffset int64) {
+ bufferOut := make([]bool, len(in))
+ nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
+ enc.Put(bufferOut[:nvalid])
+}
+
+func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
+ return rleLengthInBytes + int64(enc.maxRleBufferSize())
+}
+
+func (enc *RleBooleanEncoder) maxRleBufferSize() int {
+ return utils.MaxRLEBufferSize(1, len(enc.bufferedValues)) +
+ utils.MinRLEBufferSize(1)
+}
+
+func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
+ rleBufferSizeMax := enc.maxRleBufferSize()
+ enc.sink.SetOffset(rleLengthInBytes)
+ enc.sink.Reserve(rleBufferSizeMax)
+
+ rleEncoder := utils.NewRleEncoder(enc.sink, 1)
+ for _, v := range enc.bufferedValues {
+ if v {
+ rleEncoder.Put(1)
+ } else {
+ rleEncoder.Put(0)
+ }
+ }
+ n := rleEncoder.Flush()
+ debug.Assert(n <= rleBufferSizeMax, "num encoded bytes larger than
expected max")
+ buf := enc.sink.Finish()
+ binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))
+
+ return buf, nil
+}
diff --git a/go/parquet/internal/encoding/encoder.go
b/go/parquet/internal/encoding/encoder.go
index 9626e4e9ff..f6b57fe63c 100644
--- a/go/parquet/internal/encoding/encoder.go
+++ b/go/parquet/internal/encoding/encoder.go
@@ -244,7 +244,7 @@ func (d *dictEncoder) FlushValues() (Buffer, error) {
// EstimatedDataEncodedSize returns the maximum number of bytes needed to
store the RLE encoded indexes, not including the
// dictionary index in the computation.
func (d *dictEncoder) EstimatedDataEncodedSize() int64 {
- return 1 + int64(utils.MaxBufferSize(d.BitWidth(),
len(d.idxValues))+utils.MinBufferSize(d.BitWidth()))
+ return 1 + int64(utils.MaxRLEBufferSize(d.BitWidth(),
len(d.idxValues))+utils.MinRLEBufferSize(d.BitWidth()))
}
// NumEntries returns the number of entires in the dictionary index for this
encoder.
diff --git a/go/parquet/internal/encoding/encoding_test.go
b/go/parquet/internal/encoding/encoding_test.go
index 50e72de004..b0d86321e0 100644
--- a/go/parquet/internal/encoding/encoding_test.go
+++ b/go/parquet/internal/encoding/encoding_test.go
@@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}
+func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
+ switch b.typ {
+ case reflect.TypeOf(true):
+ b.initData(2000, 200)
+ b.checkRoundTrip(parquet.Encodings.RLE)
+ default:
+ b.T().SkipNow()
+ }
+}
+
func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
b.initData(10000, 1)
@@ -408,6 +418,8 @@ func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
if validBits != nil {
b.checkRoundTripSpaced(parquet.Encodings.Plain,
validBits, validBitsOffset)
switch b.typ {
+ case reflect.TypeOf(false):
+
b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset)
case reflect.TypeOf(int32(0)),
reflect.TypeOf(int64(0)):
b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits,
validBitsOffset)
case reflect.TypeOf(parquet.ByteArray{}):
diff --git a/go/parquet/internal/encoding/levels.go
b/go/parquet/internal/encoding/levels.go
index c5622519b0..e04ec19d54 100644
--- a/go/parquet/internal/encoding/levels.go
+++ b/go/parquet/internal/encoding/levels.go
@@ -48,7 +48,7 @@ func LevelEncodingMaxBufferSize(encoding parquet.Encoding,
maxLvl int16, nbuffer
nbytes := 0
switch encoding {
case parquet.Encodings.RLE:
- nbytes = utils.MaxBufferSize(bitWidth, nbuffered) +
utils.MinBufferSize(bitWidth)
+ nbytes = utils.MaxRLEBufferSize(bitWidth, nbuffered) +
utils.MinRLEBufferSize(bitWidth)
case parquet.Encodings.BitPacked:
nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
default:
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go
b/go/parquet/internal/encoding/typed_encoder.gen.go
index 7c1f954632..411e87c17e 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go
@@ -1225,6 +1225,8 @@ func (boolEncoderTraits) Encoder(e format.Encoding,
useDict bool, descr *schema.
switch e {
case format.Encoding_PLAIN:
return &PlainBooleanEncoder{encoder: newEncoderBase(e, descr,
mem)}
+ case format.Encoding_RLE:
+ return &RleBooleanEncoder{encoder: newEncoderBase(e, descr,
mem)}
default:
panic("unimplemented encoding type")
}
@@ -1248,6 +1250,8 @@ func (boolDecoderTraits) Decoder(e parquet.Encoding,
descr *schema.Column, useDi
switch e {
case parquet.Encodings.Plain:
return &PlainBooleanDecoder{decoder:
newDecoderBase(format.Encoding(e), descr)}
+ case parquet.Encodings.RLE:
+ return &RleBooleanDecoder{decoder:
newDecoderBase(format.Encoding(e), descr)}
default:
panic("unimplemented encoding type")
}
diff --git a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
index f0d4fb50ae..69415ccca4 100644
--- a/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
+++ b/go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
@@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding,
useDict bool, descr *s
switch e {
case format.Encoding_PLAIN:
return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
+{{- if eq .Name "Boolean" }}
+ case format.Encoding_RLE:
+ return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)}
+{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case format.Encoding_DELTA_BINARY_PACKED:
return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
@@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding,
descr *schema.Column,
switch e {
case parquet.Encodings.Plain:
return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e),
descr)}
+{{- if eq .Name "Boolean" }}
+ case parquet.Encodings.RLE:
+ return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e),
descr)}
+{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case parquet.Encodings.DeltaBinaryPacked:
if mem == nil {
diff --git a/go/parquet/internal/testutils/random.go
b/go/parquet/internal/testutils/random.go
index 2c8a2809dc..d9a06da43b 100644
--- a/go/parquet/internal/testutils/random.go
+++ b/go/parquet/internal/testutils/random.go
@@ -438,15 +438,16 @@ func fillRandomIsValid(seed uint64, pctNull float64, out
[]bool) {
// If the type is parquet.ByteArray or parquet.FixedLenByteArray, heap must
not be null.
//
// The default values are:
-// []bool uses the current time as the seed with only values of 1 being
false, for use
-// of creating validity boolean slices.
-// all other types use 0 as the seed
-// a []parquet.ByteArray is populated with lengths between 2 and 12
-// a []parquet.FixedLenByteArray is populated with fixed size random byte
arrays of length 12.
+//
+// []bool uses the current time as the seed with only values of 1 being
false, for use
+// of creating validity boolean slices.
+// all other types use 0 as the seed
+// a []parquet.ByteArray is populated with lengths between 2 and 12
+// a []parquet.FixedLenByteArray is populated with fixed size random byte
arrays of length 12.
func InitValues(values interface{}, heap *memory.Buffer) {
switch arr := values.(type) {
case []bool:
- fillRandomIsValid(uint64(time.Now().Unix()), 1.0, arr)
+ fillRandomIsValid(uint64(time.Now().Unix()), 0.5, arr)
case []int32:
FillRandomInt32(0, arr)
case []int64:
diff --git a/go/parquet/internal/utils/bit_reader_test.go
b/go/parquet/internal/utils/bit_reader_test.go
index 317cc4960a..c285f5165c 100644
--- a/go/parquet/internal/utils/bit_reader_test.go
+++ b/go/parquet/internal/utils/bit_reader_test.go
@@ -494,7 +494,7 @@ func (r *RLERandomSuite) checkRoundTrip(vals []uint64,
width int) bool {
func (r *RLERandomSuite) checkRoundTripSpaced(vals arrow.Array, width int) {
nvalues := vals.Len()
- bufsize := utils.MaxBufferSize(width, nvalues)
+ bufsize := utils.MaxRLEBufferSize(width, nvalues)
buffer := make([]byte, bufsize)
encoder := utils.NewRleEncoder(utils.NewWriterAtBuffer(buffer), width)
diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go
index 866d7c61b4..fef322c6fd 100644
--- a/go/parquet/internal/utils/rle.go
+++ b/go/parquet/internal/utils/rle.go
@@ -37,13 +37,13 @@ const (
MaxValuesPerLiteralRun = (1 << 6) * 8
)
-func MinBufferSize(bitWidth int) int {
+func MinRLEBufferSize(bitWidth int) int {
maxLiteralRunSize := 1 +
bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth))
maxRepeatedRunSize := binary.MaxVarintLen32 +
bitutil.BytesForBits(int64(bitWidth))
return int(utils.Max(maxLiteralRunSize, maxRepeatedRunSize))
}
-func MaxBufferSize(width, numValues int) int {
+func MaxRLEBufferSize(width, numValues int) int {
bytesPerRun := width
numRuns := int(bitutil.BytesForBits(int64(numValues)))
literalMaxSize := numRuns + (numRuns * bytesPerRun)