[ https://issues.apache.org/jira/browse/ARROW-18317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthew Topol reassigned ARROW-18317: ------------------------------------- Assignee: Matthew Topol > [Go] Problem of dictionary update during a communication via an IPC stream > --------------------------------------------------------------------------- > > Key: ARROW-18317 > URL: https://issues.apache.org/jira/browse/ARROW-18317 > Project: Apache Arrow > Issue Type: Bug > Components: Go > Affects Versions: 10.0.0 > Reporter: Laurent Querel > Assignee: Matthew Topol > Priority: Major > > Dictionaries do not seem to be updated correctly when sending a record on an > IPC stream. > The following example creates a 1st record with a single field named "field" > and initialized with the value "value_0. This record is then serialized with > an ipc writer and deserialized with an ipc reader. > A second record is then created with the value "value_1". After serialization > and deserialization, the expected value for the field is "value_1" but I get > "value_0". > Based on a quick analysis via the debugger, I suspect an error in combining > the dictionary from step 1 with the dictionary from step 2. The resulting > dictionary contains the concatenation of the two dictionaries (i.e. > value_0value_1), but the offsets values used to read the field (of the second > record) refer "value_0". It may be that the offset arrays are not correctly > combined or something like that when the second record is received. > Below a code snippet to reproduce the issue. > {code:java} > // NOTE: Release methods are not managed in this test for simplicity. > func TestDictionary(t *testing.T) { > pool := memory.NewGoAllocator() > // A schema with a single dictionary field > schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: > &arrow.DictionaryType{ > IndexType: arrow.PrimitiveTypes.Uint16, > ValueType: arrow.BinaryTypes.String, > Ordered: false, > }}}, nil) > // IPC writer and reader > var bufWriter bytes.Buffer > ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema)) > bufReader := bytes.NewReader([]byte{}) > var ipcReader *ipc.Reader > // Create a first record with field = "value_0" > record := CreateRecord(t, pool, schema, 0) > expectedJson, err := record.MarshalJSON() > require.NoError(t, err) > // Serialize and deserialize the record via an IPC stream > json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, > ipcWriter, bufReader, ipcReader) > require.NoError(t, err) > // Compare the expected JSON with the actual JSON > require.JSONEq(t, string(expectedJson), string(json)) > // Create a second record with field = "value_1" > record = CreateRecord(t, pool, schema, 1) > expectedJson, err = record.MarshalJSON() > require.NoError(t, err) > // Serialize and deserialize the record via an IPC stream > json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, > ipcWriter, bufReader, ipcReader) > require.NoError(t, err) > // Compare the expected JSON with the actual JSON > // field = "value_0" but should be "value_1" > require.JSONEq(t, string(expectedJson), string(json)) > } > // Create a record with a single field. > // The value of field `field` depends on the value passed in parameter. > func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, > value int) arrow.Record { > rb := array.NewRecordBuilder(pool, schema) > fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder) > err := fieldB.AppendString(fmt.Sprintf("value_%d", value)) > if err != nil { > t.Fatal(err) > } > return rb.NewRecord() > } > // Encode and decode a record over a tuple of IPC writer and reader. > // IPC writer and reader are the same from one call to another. > func EncodeDecodeIpcStream(t *testing.T, > record arrow.Record, > bufWriter *bytes.Buffer, ipcWriter *ipc.Writer, > bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, > error) { > // Serialize the record via an ipc writer > if err := ipcWriter.Write(record); err != nil { > return nil, ipcReader, err > } > serializedRecord := bufWriter.Bytes() > bufWriter.Reset() > // Deserialize the record via an ipc reader > bufReader.Reset(serializedRecord) > if ipcReader == nil { > newIpcReader, err := ipc.NewReader(bufReader) > if err != nil { > return nil, newIpcReader, err > } > ipcReader = newIpcReader > } > ipcReader.Next() > record = ipcReader.Record() > // Return the decoded record as a json string > json, err := record.MarshalJSON() > if err != nil { > return nil, ipcReader, err > } > return json, ipcReader, nil > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)