This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6b810cbc23 GH-37102: [Go][Parquet] Encoding: Make BitWriter Reserve
when ReserveBytes (#37112)
6b810cbc23 is described below
commit 6b810cbc23fb4f73ab8684ff1457888cfa833c5d
Author: mwish <[email protected]>
AuthorDate: Tue Aug 15 22:56:52 2023 +0800
GH-37102: [Go][Parquet] Encoding: Make BitWriter Reserve when ReserveBytes
(#37112)
### Rationale for this change
See https://github.com/apache/arrow/issues/37102. Also fixes #35718
Golang BitWriter didn't reserve when `ReserveBytes` called. So it make it
wrong.
### What changes are included in this PR?
Change `ReserveBytes` impl
### Are these changes tested?
Currently not
### Are there any user-facing changes?
bugfix
* Closes: #37102
Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: mwish <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/parquet/internal/encoding/delta_bit_packing.go | 2 +-
go/parquet/internal/encoding/encoding_test.go | 24 +++++++++++++++++++++++
go/parquet/internal/encoding/levels.go | 3 +--
go/parquet/internal/utils/bit_writer.go | 16 ++++++++++-----
go/parquet/internal/utils/rle.go | 8 +++++---
5 files changed, 42 insertions(+), 11 deletions(-)
diff --git a/go/parquet/internal/encoding/delta_bit_packing.go
b/go/parquet/internal/encoding/delta_bit_packing.go
index ab542eabb2..d327150673 100644
--- a/go/parquet/internal/encoding/delta_bit_packing.go
+++ b/go/parquet/internal/encoding/delta_bit_packing.go
@@ -355,7 +355,7 @@ func (enc *deltaBitPackEncoder) flushBlock() {
enc.bitWriter.WriteZigZagVlqInt(minDelta)
// reserve enough bytes to write out our miniblock deltas
- offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks))
+ offset, _ := enc.bitWriter.SkipBytes(int(enc.numMiniBlocks))
valuesToWrite := int64(len(enc.deltas))
for i := 0; i < int(enc.numMiniBlocks); i++ {
diff --git a/go/parquet/internal/encoding/encoding_test.go
b/go/parquet/internal/encoding/encoding_test.go
index de82494325..f82d9d75d3 100644
--- a/go/parquet/internal/encoding/encoding_test.go
+++ b/go/parquet/internal/encoding/encoding_test.go
@@ -646,6 +646,30 @@ func TestWriteDeltaBitPackedInt64(t *testing.T) {
assert.Equalf(t, values[i:j], valueBuf, "indexes
%d:%d", i, j)
}
})
+
+ t.Run("GH-37102", func(t *testing.T) {
+ values := []int64{
+ 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0,
+ }
+
+ enc := encoding.NewEncoder(parquet.Types.Int64,
parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
+ enc.(encoding.Int64Encoder).Put(values)
+ buf, _ := enc.FlushValues()
+ defer buf.Release()
+
+ dec := encoding.NewDecoder(parquet.Types.Int64,
parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
+ dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
+
+ valueBuf := make([]int64, len(values))
+
+ decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
+ assert.Equal(t, len(valueBuf), decoded)
+ assert.Equal(t, values, valueBuf)
+ })
}
func TestDeltaLengthByteArrayEncoding(t *testing.T) {
diff --git a/go/parquet/internal/encoding/levels.go
b/go/parquet/internal/encoding/levels.go
index f0767b4976..d6a47f3366 100644
--- a/go/parquet/internal/encoding/levels.go
+++ b/go/parquet/internal/encoding/levels.go
@@ -20,7 +20,6 @@ import (
"bytes"
"encoding/binary"
"fmt"
- "io"
"math/bits"
"github.com/JohnCGriffin/overflow"
@@ -75,7 +74,7 @@ func (l *LevelEncoder) Reset(maxLvl int16) {
// Init is called to set up the desired encoding type, max level and
underlying writer for a
// level encoder to control where the resulting encoded buffer will end up.
-func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w
io.WriterAt) {
+func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w
utils.WriterAtWithLen) {
l.bitWidth = bits.Len64(uint64(maxLvl))
l.encoding = format.Encoding(encoding)
switch l.encoding {
diff --git a/go/parquet/internal/utils/bit_writer.go
b/go/parquet/internal/utils/bit_writer.go
index 110f232641..837a85d554 100644
--- a/go/parquet/internal/utils/bit_writer.go
+++ b/go/parquet/internal/utils/bit_writer.go
@@ -56,16 +56,21 @@ func (w *WriterAtBuffer) WriteAt(p []byte, off int64) (n
int, err error) {
return
}
+func (w *WriterAtBuffer) Reserve(nbytes int) {
+ // no-op. We should not expand or otherwise modify the underlying buffer
+}
+
// WriterAtWithLen is an interface for an io.WriterAt with a Len function
type WriterAtWithLen interface {
io.WriterAt
Len() int
+ Reserve(int)
}
// BitWriter is a utility for writing values of specific bit widths to a stream
// using a uint64 as a buffer to build up between flushing for efficiency.
type BitWriter struct {
- wr io.WriterAt
+ wr WriterAtWithLen
buffer uint64
byteoffset int
bitoffset uint
@@ -74,18 +79,19 @@ type BitWriter struct {
// NewBitWriter initializes a new bit writer to write to the passed in
interface
// using WriteAt to write the appropriate offsets and values.
-func NewBitWriter(w io.WriterAt) *BitWriter {
+func NewBitWriter(w WriterAtWithLen) *BitWriter {
return &BitWriter{wr: w}
}
-// ReserveBytes reserves the next aligned nbytes, skipping them and returning
+// SkipBytes reserves the next aligned nbytes, skipping them and returning
// the offset to use with WriteAt to write to those reserved bytes. Used for
// RLE encoding to fill in the indicators after encoding.
-func (b *BitWriter) ReserveBytes(nbytes int) int {
+func (b *BitWriter) SkipBytes(nbytes int) (int, error) {
b.Flush(true)
ret := b.byteoffset
b.byteoffset += nbytes
- return ret
+ b.wr.Reserve(b.byteoffset)
+ return ret, nil
}
// WriteAt fulfills the io.WriterAt interface to write len(p) bytes from p
diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go
index eed97b6d01..bcb69fa858 100644
--- a/go/parquet/internal/utils/rle.go
+++ b/go/parquet/internal/utils/rle.go
@@ -22,7 +22,6 @@ package utils
import (
"bytes"
"encoding/binary"
- "io"
"math"
"github.com/apache/arrow/go/v13/arrow/bitutil"
@@ -465,7 +464,7 @@ type RleEncoder struct {
indicatorBuffer [1]byte
}
-func NewRleEncoder(w io.WriterAt, width int) *RleEncoder {
+func NewRleEncoder(w WriterAtWithLen, width int) *RleEncoder {
return &RleEncoder{
w: NewBitWriter(w),
buffer: make([]uint64, 0, 8),
@@ -521,7 +520,10 @@ func (r *RleEncoder) flushBuffered(done bool) (err error) {
func (r *RleEncoder) flushLiteral(updateIndicator bool) (err error) {
if r.literalIndicatorOffset == -1 {
- r.literalIndicatorOffset = r.w.ReserveBytes(1)
+ r.literalIndicatorOffset, err = r.w.SkipBytes(1)
+ if err != nil {
+ return
+ }
}
for _, val := range r.buffer {