This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 60e1d81ed9c7ff409d96f1c658698df19ce79748
Author: Matt Topol <zotthewiz...@gmail.com>
AuthorDate: Tue Nov 15 10:55:56 2022 -0500

    ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
    
    Fix dictionary replacement for IPC streams. Currently they incorrectly get 
concatenated together instead of replaced when not using deltas. This will 
properly replace dictionaries when encountering a non-delta dictionary message.
    
    Authored-by: Matt Topol <zotthewiz...@gmail.com>
    Signed-off-by: Matt Topol <zotthewiz...@gmail.com>
---
 go/arrow/internal/dictutils/dict.go | 10 +++-
 go/arrow/ipc/ipc_test.go            | 94 +++++++++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/go/arrow/internal/dictutils/dict.go 
b/go/arrow/internal/dictutils/dict.go
index 9dd17c6540..561923f626 100644
--- a/go/arrow/internal/dictutils/dict.go
+++ b/go/arrow/internal/dictutils/dict.go
@@ -344,10 +344,18 @@ func (memo *Memo) AddDelta(id int64, v arrow.ArrayData) {
        memo.id2dict[id] = append(d, v)
 }
 
+// AddOrReplace puts the provided dictionary into the memo table. If it
+// already exists, then the new data will replace it. Otherwise it is added
+// to the memo table.
 func (memo *Memo) AddOrReplace(id int64, v arrow.ArrayData) bool {
        d, ok := memo.id2dict[id]
        if ok {
-               d = append(d, v)
+               // replace the dictionary and release any existing ones
+               for _, dict := range d {
+                       dict.Release()
+               }
+               d[0] = v
+               d = d[:1]
        } else {
                d = []arrow.ArrayData{v}
        }
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2f0816c0d0..d6d0abf225 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -330,3 +330,97 @@ func TestIPCTable(t *testing.T) {
                n++
        }
 }
+
+// ARROW-18317
+func TestDictionary(t *testing.T) {
+       pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer pool.AssertSize(t, 0)
+
+       // A schema with a single dictionary field
+       schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: 
&arrow.DictionaryType{
+               IndexType: arrow.PrimitiveTypes.Uint16,
+               ValueType: arrow.BinaryTypes.String,
+               Ordered:   false,
+       }}}, nil)
+
+       // IPC writer and reader
+       var bufWriter bytes.Buffer
+       ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+       defer ipcWriter.Close()
+
+       bufReader := bytes.NewReader([]byte{})
+       var ipcReader *ipc.Reader
+
+       bldr := array.NewBuilder(pool, schema.Field(0).Type)
+       defer bldr.Release()
+       require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+       arr := bldr.NewArray()
+       defer arr.Release()
+       // Create a first record with field = "value_0"
+       record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+       defer record.Release()
+
+       expectedJson, err := record.MarshalJSON()
+       require.NoError(t, err)
+       // Serialize and deserialize the record via an IPC stream
+       json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, 
ipcWriter, bufReader, ipcReader)
+       require.NoError(t, err)
+       // Compare the expected JSON with the actual JSON
+       require.JSONEq(t, string(expectedJson), string(json))
+
+       // Create a second record with field = "value_1"
+       require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+       arr = bldr.NewArray()
+       defer arr.Release()
+       record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+
+       // record, _, err = array.RecordFromJSON(pool, schema, 
strings.NewReader(`[{"field": ["value_1"]}]`))
+       // require.NoError(t, err)
+       defer record.Release()
+
+       expectedJson, err = record.MarshalJSON()
+       require.NoError(t, err)
+       // Serialize and deserialize the record via an IPC stream
+       json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, 
ipcWriter, bufReader, ipcReader)
+       require.NoError(t, err)
+       // Compare the expected JSON with the actual JSON
+       // field = "value_0" but should be "value_1"
+       require.JSONEq(t, string(expectedJson), string(json))
+       require.NoError(t, ipcReader.Err())
+       ipcReader.Release()
+}
+
+// Encode and decode a record over a tuple of IPC writer and reader.
+// IPC writer and reader are the same from one call to another.
+func encodeDecodeIpcStream(t *testing.T,
+       record arrow.Record,
+       bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
+       bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, 
error) {
+
+       // Serialize the record via an ipc writer
+       if err := ipcWriter.Write(record); err != nil {
+               return nil, ipcReader, err
+       }
+       serializedRecord := bufWriter.Bytes()
+       bufWriter.Reset()
+
+       // Deserialize the record via an ipc reader
+       bufReader.Reset(serializedRecord)
+       if ipcReader == nil {
+               newIpcReader, err := ipc.NewReader(bufReader)
+               if err != nil {
+                       return nil, newIpcReader, err
+               }
+               ipcReader = newIpcReader
+       }
+       ipcReader.Next()
+       record = ipcReader.Record()
+
+       // Return the decoded record as a json string
+       json, err := record.MarshalJSON()
+       if err != nil {
+               return nil, ipcReader, err
+       }
+       return json, ipcReader, nil
+}

Reply via email to