This is an automated email from the ASF dual-hosted git repository. kou pushed a commit to branch maint-10.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 113ccb2b4b458c8c83894b269521b34e16ff68a8 Author: Laurent Quérel <laurent.que...@gmail.com> AuthorDate: Tue Nov 15 13:36:06 2022 -0800 ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639) Test not working because of a potential issue in dictionary management. Lead-authored-by: Laurent Querel <l.que...@f5.com> Co-authored-by: Matt Topol <zotthewiz...@gmail.com> Signed-off-by: Matt Topol <zotthewiz...@gmail.com> --- go/arrow/ipc/ipc.go | 8 ++++++ go/arrow/ipc/ipc_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++--- go/arrow/ipc/writer.go | 22 +++++++++-------- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index 71d810b136..44a19b8ae8 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -70,6 +70,7 @@ type config struct { compressNP int ensureNativeEndian bool noAutoSchema bool + emitDictDeltas bool } func newConfig(opts ...Option) *config { @@ -160,6 +161,13 @@ func WithDelayReadSchema(v bool) Option { } } +// WithDictionaryDeltas specifies whether or not to emit dictionary deltas. +func WithDictionaryDeltas(v bool) Option { + return func(cfg *config) { + cfg.emitDictDeltas = v + } +} + var ( _ arrio.Reader = (*Reader)(nil) _ arrio.Writer = (*Writer)(nil) diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go index d6d0abf225..1bde7192a5 100644 --- a/go/arrow/ipc/ipc_test.go +++ b/go/arrow/ipc/ipc_test.go @@ -24,12 +24,13 @@ import ( "strconv" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/apache/arrow/go/v10/arrow" "github.com/apache/arrow/go/v10/arrow/array" "github.com/apache/arrow/go/v10/arrow/ipc" "github.com/apache/arrow/go/v10/arrow/memory" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestArrow12072(t *testing.T) { @@ -345,7 +346,7 @@ func TestDictionary(t *testing.T) { // IPC writer and reader var bufWriter bytes.Buffer - ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema)) + ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(false)) defer ipcWriter.Close() bufReader := bytes.NewReader([]byte{}) @@ -391,6 +392,63 @@ func TestDictionary(t *testing.T) { ipcReader.Release() } +// ARROW-18326 +func TestDictionaryDeltas(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + // A schema with a single dictionary field + schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint16, + ValueType: arrow.BinaryTypes.String, + Ordered: false, + }}}, nil) + + // IPC writer and reader + var bufWriter bytes.Buffer + ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(true)) + defer ipcWriter.Close() + + bufReader := bytes.NewReader([]byte{}) + var ipcReader *ipc.Reader + + bldr := array.NewBuilder(pool, schema.Field(0).Type) + defer bldr.Release() + require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`))) + + arr := bldr.NewArray() + defer arr.Release() + // Create a first record with field = "value_0" + record := array.NewRecord(schema, []arrow.Array{arr}, 1) + defer record.Release() + + expectedJson, err := record.MarshalJSON() + require.NoError(t, err) + // Serialize and deserialize the record via an IPC stream + json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) + require.NoError(t, err) + // Compare the expected JSON with the actual JSON + require.JSONEq(t, string(expectedJson), string(json)) + + // Create a second record with field = "value_1" + require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`))) + arr = bldr.NewArray() + defer arr.Release() + record = array.NewRecord(schema, []arrow.Array{arr}, 1) + defer record.Release() + + expectedJson, err = record.MarshalJSON() + require.NoError(t, err) + // Serialize and deserialize the record via an IPC stream + json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) + require.NoError(t, err) + // Compare the expected JSON with the actual JSON + // field = "value_0" but should be "value_1" + require.JSONEq(t, string(expectedJson), string(json)) + require.NoError(t, ipcReader.Err()) + ipcReader.Release() +} + // Encode and decode a record over a tuple of IPC writer and reader. // IPC writer and reader are the same from one call to another. func encodeDecodeIpcStream(t *testing.T, diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 9af88d9c2d..ef20e70815 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -99,11 +99,12 @@ type Writer struct { func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer { cfg := newConfig(opts...) return &Writer{ - mem: cfg.alloc, - pw: pw, - schema: cfg.schema, - codec: cfg.codec, - compressNP: cfg.compressNP, + mem: cfg.alloc, + pw: pw, + schema: cfg.schema, + codec: cfg.codec, + compressNP: cfg.compressNP, + emitDictDeltas: cfg.emitDictDeltas, } } @@ -111,11 +112,12 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer { func NewWriter(w io.Writer, opts ...Option) *Writer { cfg := newConfig(opts...) return &Writer{ - w: w, - mem: cfg.alloc, - pw: &swriter{w: w}, - schema: cfg.schema, - codec: cfg.codec, + w: w, + mem: cfg.alloc, + pw: &swriter{w: w}, + schema: cfg.schema, + codec: cfg.codec, + emitDictDeltas: cfg.emitDictDeltas, } }