This is an automated email from the ASF dual-hosted git repository. kou pushed a commit to branch maint-10.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 60e1d81ed9c7ff409d96f1c658698df19ce79748 Author: Matt Topol <zotthewiz...@gmail.com> AuthorDate: Tue Nov 15 10:55:56 2022 -0500 ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636) Fix dictionary replacement for IPC streams. Currently they incorrectly get concatenated together instead of replaced when not using deltas. This will properly replace dictionaries when encountering a non-delta dictionary message. Authored-by: Matt Topol <zotthewiz...@gmail.com> Signed-off-by: Matt Topol <zotthewiz...@gmail.com> --- go/arrow/internal/dictutils/dict.go | 10 +++- go/arrow/ipc/ipc_test.go | 94 +++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/go/arrow/internal/dictutils/dict.go b/go/arrow/internal/dictutils/dict.go index 9dd17c6540..561923f626 100644 --- a/go/arrow/internal/dictutils/dict.go +++ b/go/arrow/internal/dictutils/dict.go @@ -344,10 +344,18 @@ func (memo *Memo) AddDelta(id int64, v arrow.ArrayData) { memo.id2dict[id] = append(d, v) } +// AddOrReplace puts the provided dictionary into the memo table. If it +// already exists, then the new data will replace it. Otherwise it is added +// to the memo table. func (memo *Memo) AddOrReplace(id int64, v arrow.ArrayData) bool { d, ok := memo.id2dict[id] if ok { - d = append(d, v) + // replace the dictionary and release any existing ones + for _, dict := range d { + dict.Release() + } + d[0] = v + d = d[:1] } else { d = []arrow.ArrayData{v} } diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go index 2f0816c0d0..d6d0abf225 100644 --- a/go/arrow/ipc/ipc_test.go +++ b/go/arrow/ipc/ipc_test.go @@ -330,3 +330,97 @@ func TestIPCTable(t *testing.T) { n++ } } + +// ARROW-18317 +func TestDictionary(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + // A schema with a single dictionary field + schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint16, + ValueType: arrow.BinaryTypes.String, + Ordered: false, + }}}, nil) + + // IPC writer and reader + var bufWriter bytes.Buffer + ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema)) + defer ipcWriter.Close() + + bufReader := bytes.NewReader([]byte{}) + var ipcReader *ipc.Reader + + bldr := array.NewBuilder(pool, schema.Field(0).Type) + defer bldr.Release() + require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`))) + + arr := bldr.NewArray() + defer arr.Release() + // Create a first record with field = "value_0" + record := array.NewRecord(schema, []arrow.Array{arr}, 1) + defer record.Release() + + expectedJson, err := record.MarshalJSON() + require.NoError(t, err) + // Serialize and deserialize the record via an IPC stream + json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) + require.NoError(t, err) + // Compare the expected JSON with the actual JSON + require.JSONEq(t, string(expectedJson), string(json)) + + // Create a second record with field = "value_1" + require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`))) + arr = bldr.NewArray() + defer arr.Release() + record = array.NewRecord(schema, []arrow.Array{arr}, 1) + + // record, _, err = array.RecordFromJSON(pool, schema, strings.NewReader(`[{"field": ["value_1"]}]`)) + // require.NoError(t, err) + defer record.Release() + + expectedJson, err = record.MarshalJSON() + require.NoError(t, err) + // Serialize and deserialize the record via an IPC stream + json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) + require.NoError(t, err) + // Compare the expected JSON with the actual JSON + // field = "value_0" but should be "value_1" + require.JSONEq(t, string(expectedJson), string(json)) + require.NoError(t, ipcReader.Err()) + ipcReader.Release() +} + +// Encode and decode a record over a tuple of IPC writer and reader. +// IPC writer and reader are the same from one call to another. +func encodeDecodeIpcStream(t *testing.T, + record arrow.Record, + bufWriter *bytes.Buffer, ipcWriter *ipc.Writer, + bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) { + + // Serialize the record via an ipc writer + if err := ipcWriter.Write(record); err != nil { + return nil, ipcReader, err + } + serializedRecord := bufWriter.Bytes() + bufWriter.Reset() + + // Deserialize the record via an ipc reader + bufReader.Reset(serializedRecord) + if ipcReader == nil { + newIpcReader, err := ipc.NewReader(bufReader) + if err != nil { + return nil, newIpcReader, err + } + ipcReader = newIpcReader + } + ipcReader.Next() + record = ipcReader.Record() + + // Return the decoded record as a json string + json, err := record.MarshalJSON() + if err != nil { + return nil, ipcReader, err + } + return json, ipcReader, nil +}