This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b810cbc23 GH-37102: [Go][Parquet] Encoding: Make BitWriter Reserve 
when ReserveBytes (#37112)
6b810cbc23 is described below

commit 6b810cbc23fb4f73ab8684ff1457888cfa833c5d
Author: mwish <[email protected]>
AuthorDate: Tue Aug 15 22:56:52 2023 +0800

    GH-37102: [Go][Parquet] Encoding: Make BitWriter Reserve when ReserveBytes 
(#37112)
    
    
    
    ### Rationale for this change
    
    See https://github.com/apache/arrow/issues/37102. Also fixes #35718
    
    Golang BitWriter didn't reserve when `ReserveBytes` called. So it make it 
wrong.
    
    ### What changes are included in this PR?
    
    Change `ReserveBytes` impl
    
    ### Are these changes tested?
    
    Currently not
    
    ### Are there any user-facing changes?
    
    bugfix
    
    * Closes: #37102
    
    Lead-authored-by: Matt Topol <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/parquet/internal/encoding/delta_bit_packing.go |  2 +-
 go/parquet/internal/encoding/encoding_test.go     | 24 +++++++++++++++++++++++
 go/parquet/internal/encoding/levels.go            |  3 +--
 go/parquet/internal/utils/bit_writer.go           | 16 ++++++++++-----
 go/parquet/internal/utils/rle.go                  |  8 +++++---
 5 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/go/parquet/internal/encoding/delta_bit_packing.go 
b/go/parquet/internal/encoding/delta_bit_packing.go
index ab542eabb2..d327150673 100644
--- a/go/parquet/internal/encoding/delta_bit_packing.go
+++ b/go/parquet/internal/encoding/delta_bit_packing.go
@@ -355,7 +355,7 @@ func (enc *deltaBitPackEncoder) flushBlock() {
 
        enc.bitWriter.WriteZigZagVlqInt(minDelta)
        // reserve enough bytes to write out our miniblock deltas
-       offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks))
+       offset, _ := enc.bitWriter.SkipBytes(int(enc.numMiniBlocks))
 
        valuesToWrite := int64(len(enc.deltas))
        for i := 0; i < int(enc.numMiniBlocks); i++ {
diff --git a/go/parquet/internal/encoding/encoding_test.go 
b/go/parquet/internal/encoding/encoding_test.go
index de82494325..f82d9d75d3 100644
--- a/go/parquet/internal/encoding/encoding_test.go
+++ b/go/parquet/internal/encoding/encoding_test.go
@@ -646,6 +646,30 @@ func TestWriteDeltaBitPackedInt64(t *testing.T) {
                        assert.Equalf(t, values[i:j], valueBuf, "indexes 
%d:%d", i, j)
                }
        })
+
+       t.Run("GH-37102", func(t *testing.T) {
+               values := []int64{
+                       0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 3000000000000000000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0,
+               }
+
+               enc := encoding.NewEncoder(parquet.Types.Int64, 
parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
+               enc.(encoding.Int64Encoder).Put(values)
+               buf, _ := enc.FlushValues()
+               defer buf.Release()
+
+               dec := encoding.NewDecoder(parquet.Types.Int64, 
parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
+               dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
+
+               valueBuf := make([]int64, len(values))
+
+               decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
+               assert.Equal(t, len(valueBuf), decoded)
+               assert.Equal(t, values, valueBuf)
+       })
 }
 
 func TestDeltaLengthByteArrayEncoding(t *testing.T) {
diff --git a/go/parquet/internal/encoding/levels.go 
b/go/parquet/internal/encoding/levels.go
index f0767b4976..d6a47f3366 100644
--- a/go/parquet/internal/encoding/levels.go
+++ b/go/parquet/internal/encoding/levels.go
@@ -20,7 +20,6 @@ import (
        "bytes"
        "encoding/binary"
        "fmt"
-       "io"
        "math/bits"
 
        "github.com/JohnCGriffin/overflow"
@@ -75,7 +74,7 @@ func (l *LevelEncoder) Reset(maxLvl int16) {
 
 // Init is called to set up the desired encoding type, max level and 
underlying writer for a
 // level encoder to control where the resulting encoded buffer will end up.
-func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w 
io.WriterAt) {
+func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w 
utils.WriterAtWithLen) {
        l.bitWidth = bits.Len64(uint64(maxLvl))
        l.encoding = format.Encoding(encoding)
        switch l.encoding {
diff --git a/go/parquet/internal/utils/bit_writer.go 
b/go/parquet/internal/utils/bit_writer.go
index 110f232641..837a85d554 100644
--- a/go/parquet/internal/utils/bit_writer.go
+++ b/go/parquet/internal/utils/bit_writer.go
@@ -56,16 +56,21 @@ func (w *WriterAtBuffer) WriteAt(p []byte, off int64) (n 
int, err error) {
        return
 }
 
+func (w *WriterAtBuffer) Reserve(nbytes int) {
+       // no-op. We should not expand or otherwise modify the underlying buffer
+}
+
 // WriterAtWithLen is an interface for an io.WriterAt with a Len function
 type WriterAtWithLen interface {
        io.WriterAt
        Len() int
+       Reserve(int)
 }
 
 // BitWriter is a utility for writing values of specific bit widths to a stream
 // using a uint64 as a buffer to build up between flushing for efficiency.
 type BitWriter struct {
-       wr         io.WriterAt
+       wr         WriterAtWithLen
        buffer     uint64
        byteoffset int
        bitoffset  uint
@@ -74,18 +79,19 @@ type BitWriter struct {
 
 // NewBitWriter initializes a new bit writer to write to the passed in 
interface
 // using WriteAt to write the appropriate offsets and values.
-func NewBitWriter(w io.WriterAt) *BitWriter {
+func NewBitWriter(w WriterAtWithLen) *BitWriter {
        return &BitWriter{wr: w}
 }
 
-// ReserveBytes reserves the next aligned nbytes, skipping them and returning
+// SkipBytes reserves the next aligned nbytes, skipping them and returning
 // the offset to use with WriteAt to write to those reserved bytes. Used for
 // RLE encoding to fill in the indicators after encoding.
-func (b *BitWriter) ReserveBytes(nbytes int) int {
+func (b *BitWriter) SkipBytes(nbytes int) (int, error) {
        b.Flush(true)
        ret := b.byteoffset
        b.byteoffset += nbytes
-       return ret
+       b.wr.Reserve(b.byteoffset)
+       return ret, nil
 }
 
 // WriteAt fulfills the io.WriterAt interface to write len(p) bytes from p
diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go
index eed97b6d01..bcb69fa858 100644
--- a/go/parquet/internal/utils/rle.go
+++ b/go/parquet/internal/utils/rle.go
@@ -22,7 +22,6 @@ package utils
 import (
        "bytes"
        "encoding/binary"
-       "io"
        "math"
 
        "github.com/apache/arrow/go/v13/arrow/bitutil"
@@ -465,7 +464,7 @@ type RleEncoder struct {
        indicatorBuffer [1]byte
 }
 
-func NewRleEncoder(w io.WriterAt, width int) *RleEncoder {
+func NewRleEncoder(w WriterAtWithLen, width int) *RleEncoder {
        return &RleEncoder{
                w:                      NewBitWriter(w),
                buffer:                 make([]uint64, 0, 8),
@@ -521,7 +520,10 @@ func (r *RleEncoder) flushBuffered(done bool) (err error) {
 
 func (r *RleEncoder) flushLiteral(updateIndicator bool) (err error) {
        if r.literalIndicatorOffset == -1 {
-               r.literalIndicatorOffset = r.w.ReserveBytes(1)
+               r.literalIndicatorOffset, err = r.w.SkipBytes(1)
+               if err != nil {
+                       return
+               }
        }
 
        for _, val := range r.buffer {

Reply via email to