Laurent Querel created ARROW-18317: -------------------------------------- Summary: [Go] Dictionary issue with IPC stream Key: ARROW-18317 URL: https://issues.apache.org/jira/browse/ARROW-18317 Project: Apache Arrow Issue Type: Bug Components: Go Affects Versions: 10.0.0 Reporter: Laurent Querel
Dictionaries are not correctly updated when sending a record on an IPC stream. The following example creates a 1st record with a single field named "field" and initialized with the value "value_0. This record is then serialized with an ipc writer and deserialized with an ipc reader. A second record is then created with the value "value_1". After serialization and deserialization, the expected value for the field is "value_1" but I get "value_0". Based on a quick analysis via the debugger, I suspect an error in combining the dictionary from step 1 with the dictionary from step 2. Below a code snippet to reproduce the issue. {code:java} // NOTE: Release methods are not managed in this test for simplicity. func TestDictionary(t *testing.T) { pool := memory.NewGoAllocator() // A schema with a single dictionary field schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{ IndexType: arrow.PrimitiveTypes.Uint16, ValueType: arrow.BinaryTypes.String, Ordered: false, }}}, nil) // IPC writer and reader var bufWriter bytes.Buffer ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema)) bufReader := bytes.NewReader([]byte{}) var ipcReader *ipc.Reader // Create a first record with field = "value_0" record := CreateRecord(t, pool, schema, 0) expectedJson, err := record.MarshalJSON() require.NoError(t, err) // Serialize and deserialize the record via an IPC stream json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) require.NoError(t, err) // Compare the expected JSON with the actual JSON require.JSONEq(t, string(expectedJson), string(json)) // Create a second record with field = "value_1" record = CreateRecord(t, pool, schema, 1) expectedJson, err = record.MarshalJSON() require.NoError(t, err) // Serialize and deserialize the record via an IPC stream json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) require.NoError(t, err) // Compare the expected JSON with the actual JSON // field = "value_0" but should be "value_1" require.JSONEq(t, string(expectedJson), string(json)) } func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, value int) arrow.Record { rb := array.NewRecordBuilder(pool, schema) fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder) err := fieldB.AppendString(fmt.Sprintf("value_%d", value)) if err != nil { t.Fatal(err) } return rb.NewRecord() } func EncodeDecodeIpcStream(t *testing.T, record arrow.Record, bufWriter *bytes.Buffer, ipcWriter *ipc.Writer, bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) { // Serialize the record via an ipc writer if err := ipcWriter.Write(record); err != nil { return nil, ipcReader, err } serializedRecord := bufWriter.Bytes() bufWriter.Reset() // Deserialize the record via an ipc reader bufReader.Reset(serializedRecord) if ipcReader == nil { newIpcReader, err := ipc.NewReader(bufReader) if err != nil { return nil, newIpcReader, err } ipcReader = newIpcReader } ipcReader.Next() record = ipcReader.Record() // Return the decoded record as a json string json, err := record.MarshalJSON() if err != nil { return nil, ipcReader, err } return json, ipcReader, nil } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)