This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 113ccb2b4b458c8c83894b269521b34e16ff68a8
Author: Laurent Quérel <laurent.que...@gmail.com>
AuthorDate: Tue Nov 15 13:36:06 2022 -0800

    ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)
    
    Test not working because of a potential issue in dictionary management.
    
    Lead-authored-by: Laurent Querel <l.que...@f5.com>
    Co-authored-by: Matt Topol <zotthewiz...@gmail.com>
    Signed-off-by: Matt Topol <zotthewiz...@gmail.com>
---
 go/arrow/ipc/ipc.go      |  8 ++++++
 go/arrow/ipc/ipc_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++---
 go/arrow/ipc/writer.go   | 22 +++++++++--------
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 71d810b136..44a19b8ae8 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -70,6 +70,7 @@ type config struct {
        compressNP         int
        ensureNativeEndian bool
        noAutoSchema       bool
+       emitDictDeltas     bool
 }
 
 func newConfig(opts ...Option) *config {
@@ -160,6 +161,13 @@ func WithDelayReadSchema(v bool) Option {
        }
 }
 
+// WithDictionaryDeltas specifies whether or not to emit dictionary deltas.
+func WithDictionaryDeltas(v bool) Option {
+       return func(cfg *config) {
+               cfg.emitDictDeltas = v
+       }
+}
+
 var (
        _ arrio.Reader = (*Reader)(nil)
        _ arrio.Writer = (*Writer)(nil)
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index d6d0abf225..1bde7192a5 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -24,12 +24,13 @@ import (
        "strconv"
        "testing"
 
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
        "github.com/apache/arrow/go/v10/arrow"
        "github.com/apache/arrow/go/v10/arrow/array"
        "github.com/apache/arrow/go/v10/arrow/ipc"
        "github.com/apache/arrow/go/v10/arrow/memory"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/require"
 )
 
 func TestArrow12072(t *testing.T) {
@@ -345,7 +346,7 @@ func TestDictionary(t *testing.T) {
 
        // IPC writer and reader
        var bufWriter bytes.Buffer
-       ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+       ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), 
ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(false))
        defer ipcWriter.Close()
 
        bufReader := bytes.NewReader([]byte{})
@@ -391,6 +392,63 @@ func TestDictionary(t *testing.T) {
        ipcReader.Release()
 }
 
+// ARROW-18326
+func TestDictionaryDeltas(t *testing.T) {
+       pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer pool.AssertSize(t, 0)
+
+       // A schema with a single dictionary field
+       schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: 
&arrow.DictionaryType{
+               IndexType: arrow.PrimitiveTypes.Uint16,
+               ValueType: arrow.BinaryTypes.String,
+               Ordered:   false,
+       }}}, nil)
+
+       // IPC writer and reader
+       var bufWriter bytes.Buffer
+       ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), 
ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(true))
+       defer ipcWriter.Close()
+
+       bufReader := bytes.NewReader([]byte{})
+       var ipcReader *ipc.Reader
+
+       bldr := array.NewBuilder(pool, schema.Field(0).Type)
+       defer bldr.Release()
+       require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+       arr := bldr.NewArray()
+       defer arr.Release()
+       // Create a first record with field = "value_0"
+       record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+       defer record.Release()
+
+       expectedJson, err := record.MarshalJSON()
+       require.NoError(t, err)
+       // Serialize and deserialize the record via an IPC stream
+       json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, 
ipcWriter, bufReader, ipcReader)
+       require.NoError(t, err)
+       // Compare the expected JSON with the actual JSON
+       require.JSONEq(t, string(expectedJson), string(json))
+
+       // Create a second record with field = "value_1"
+       require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+       arr = bldr.NewArray()
+       defer arr.Release()
+       record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+       defer record.Release()
+
+       expectedJson, err = record.MarshalJSON()
+       require.NoError(t, err)
+       // Serialize and deserialize the record via an IPC stream
+       json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, 
ipcWriter, bufReader, ipcReader)
+       require.NoError(t, err)
+       // Compare the expected JSON with the actual JSON
+       // field = "value_0" but should be "value_1"
+       require.JSONEq(t, string(expectedJson), string(json))
+       require.NoError(t, ipcReader.Err())
+       ipcReader.Release()
+}
+
 // Encode and decode a record over a tuple of IPC writer and reader.
 // IPC writer and reader are the same from one call to another.
 func encodeDecodeIpcStream(t *testing.T,
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 9af88d9c2d..ef20e70815 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -99,11 +99,12 @@ type Writer struct {
 func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
        cfg := newConfig(opts...)
        return &Writer{
-               mem:        cfg.alloc,
-               pw:         pw,
-               schema:     cfg.schema,
-               codec:      cfg.codec,
-               compressNP: cfg.compressNP,
+               mem:            cfg.alloc,
+               pw:             pw,
+               schema:         cfg.schema,
+               codec:          cfg.codec,
+               compressNP:     cfg.compressNP,
+               emitDictDeltas: cfg.emitDictDeltas,
        }
 }
 
@@ -111,11 +112,12 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts 
...Option) *Writer {
 func NewWriter(w io.Writer, opts ...Option) *Writer {
        cfg := newConfig(opts...)
        return &Writer{
-               w:      w,
-               mem:    cfg.alloc,
-               pw:     &swriter{w: w},
-               schema: cfg.schema,
-               codec:  cfg.codec,
+               w:              w,
+               mem:            cfg.alloc,
+               pw:             &swriter{w: w},
+               schema:         cfg.schema,
+               codec:          cfg.codec,
+               emitDictDeltas: cfg.emitDictDeltas,
        }
 }
 

Reply via email to