[ 
https://issues.apache.org/jira/browse/ARROW-18317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Topol reassigned ARROW-18317:
-------------------------------------

    Assignee: Matthew Topol

> [Go] Problem of dictionary update during a communication via an IPC stream 
> ---------------------------------------------------------------------------
>
>                 Key: ARROW-18317
>                 URL: https://issues.apache.org/jira/browse/ARROW-18317
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Go
>    Affects Versions: 10.0.0
>            Reporter: Laurent Querel
>            Assignee: Matthew Topol
>            Priority: Major
>
> Dictionaries do not seem to be updated correctly when sending a record on an 
> IPC stream. 
> The following example creates a 1st record with a single field named "field" 
> and initialized with the value "value_0. This record is then serialized with 
> an ipc writer and deserialized with an ipc reader.
> A second record is then created with the value "value_1". After serialization 
> and deserialization, the expected value for the field is "value_1" but I get 
> "value_0". 
> Based on a quick analysis via the debugger, I suspect an error in combining 
> the dictionary from step 1 with the dictionary from step 2. The resulting 
> dictionary contains the concatenation of the two dictionaries (i.e. 
> value_0value_1), but the offsets values used to read the field (of the second 
> record) refer "value_0". It may be that the offset arrays are not correctly 
> combined or something like that when the second record is received.
> Below a code snippet to reproduce the issue.
> {code:java}
> // NOTE: Release methods are not managed in this test for simplicity.
> func TestDictionary(t *testing.T) {
>    pool := memory.NewGoAllocator()
>    // A schema with a single dictionary field
>    schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: 
> &arrow.DictionaryType{
>       IndexType: arrow.PrimitiveTypes.Uint16,
>       ValueType: arrow.BinaryTypes.String,
>       Ordered:   false,
>    }}}, nil)
>    // IPC writer and reader
>    var bufWriter bytes.Buffer
>    ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
>    bufReader := bytes.NewReader([]byte{})
>    var ipcReader *ipc.Reader
>    // Create a first record with field = "value_0" 
>    record := CreateRecord(t, pool, schema, 0)
>    expectedJson, err := record.MarshalJSON()
>    require.NoError(t, err)
>    // Serialize and deserialize the record via an IPC stream
>    json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, 
> ipcWriter, bufReader, ipcReader)
>    require.NoError(t, err)
>    // Compare the expected JSON with the actual JSON
>    require.JSONEq(t, string(expectedJson), string(json))
>    // Create a second record with field = "value_1" 
>    record = CreateRecord(t, pool, schema, 1)
>    expectedJson, err = record.MarshalJSON()
>    require.NoError(t, err)
>    // Serialize and deserialize the record via an IPC stream
>    json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, 
> ipcWriter, bufReader, ipcReader)
>    require.NoError(t, err)
>    // Compare the expected JSON with the actual JSON
>    // field = "value_0" but should be "value_1"
>    require.JSONEq(t, string(expectedJson), string(json))
> }
> // Create a record with a single field.
> // The value of field `field` depends on the value passed in parameter.
> func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, 
> value int) arrow.Record {
>    rb := array.NewRecordBuilder(pool, schema)
>    fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
>    err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
>    if err != nil {
>       t.Fatal(err)
>    }
>    return rb.NewRecord()
> }
> // Encode and decode a record over a tuple of IPC writer and reader.
> // IPC writer and reader are the same from one call to another.
> func EncodeDecodeIpcStream(t *testing.T,
>    record arrow.Record,
>    bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
>    bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, 
> error) {
>    // Serialize the record via an ipc writer
>    if err := ipcWriter.Write(record); err != nil {
>       return nil, ipcReader, err
>    }
>    serializedRecord := bufWriter.Bytes()
>    bufWriter.Reset()
>    // Deserialize the record via an ipc reader
>    bufReader.Reset(serializedRecord)
>    if ipcReader == nil {
>       newIpcReader, err := ipc.NewReader(bufReader)
>       if err != nil {
>          return nil, newIpcReader, err
>       }
>       ipcReader = newIpcReader
>    }
>    ipcReader.Next()
>    record = ipcReader.Record()
>    // Return the decoded record as a json string
>    json, err := record.MarshalJSON()
>    if err != nil {
>       return nil, ipcReader, err
>    }
>    return json, ipcReader, nil
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to