[ 
https://issues.apache.org/jira/browse/ARROW-18317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634080#comment-17634080
 ] 

Laurent Querel commented on ARROW-18317:
----------------------------------------

I created the following draft PR [https://github.com/apache/arrow/pull/14639] 

However, the test does not work with the dictionary deltas option set to true. 
I'm not sure if dictionary deltas are properly handled in the existing code. 

Could you help me on this?

> [Go] Problem of dictionary update during a communication via an IPC stream 
> ---------------------------------------------------------------------------
>
>                 Key: ARROW-18317
>                 URL: https://issues.apache.org/jira/browse/ARROW-18317
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Go
>    Affects Versions: 10.0.0
>            Reporter: Laurent Querel
>            Assignee: Matthew Topol
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 10.0.1, 11.0.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Dictionaries do not seem to be updated correctly when sending a record on an 
> IPC stream. 
> The following example creates a 1st record with a single field named "field" 
> and initialized with the value "value_0. This record is then serialized with 
> an ipc writer and deserialized with an ipc reader.
> A second record is then created with the value "value_1". After serialization 
> and deserialization, the expected value for the field is "value_1" but I get 
> "value_0". 
> Based on a quick analysis via the debugger, I suspect an error in combining 
> the dictionary from step 1 with the dictionary from step 2. The resulting 
> dictionary contains the concatenation of the two dictionaries (i.e. 
> value_0value_1), but the offsets values used to read the field (of the second 
> record) refer "value_0". It may be that the offset arrays are not correctly 
> combined or something like that when the second record is received.
> Below a code snippet to reproduce the issue.
> {code:java}
> // NOTE: Release methods are not managed in this test for simplicity.
> func TestDictionary(t *testing.T) {
>    pool := memory.NewGoAllocator()
>    // A schema with a single dictionary field
>    schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: 
> &arrow.DictionaryType{
>       IndexType: arrow.PrimitiveTypes.Uint16,
>       ValueType: arrow.BinaryTypes.String,
>       Ordered:   false,
>    }}}, nil)
>    // IPC writer and reader
>    var bufWriter bytes.Buffer
>    ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
>    bufReader := bytes.NewReader([]byte{})
>    var ipcReader *ipc.Reader
>    // Create a first record with field = "value_0" 
>    record := CreateRecord(t, pool, schema, 0)
>    expectedJson, err := record.MarshalJSON()
>    require.NoError(t, err)
>    // Serialize and deserialize the record via an IPC stream
>    json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, 
> ipcWriter, bufReader, ipcReader)
>    require.NoError(t, err)
>    // Compare the expected JSON with the actual JSON
>    require.JSONEq(t, string(expectedJson), string(json))
>    // Create a second record with field = "value_1" 
>    record = CreateRecord(t, pool, schema, 1)
>    expectedJson, err = record.MarshalJSON()
>    require.NoError(t, err)
>    // Serialize and deserialize the record via an IPC stream
>    json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, 
> ipcWriter, bufReader, ipcReader)
>    require.NoError(t, err)
>    // Compare the expected JSON with the actual JSON
>    // field = "value_0" but should be "value_1"
>    require.JSONEq(t, string(expectedJson), string(json))
> }
> // Create a record with a single field.
> // The value of field `field` depends on the value passed in parameter.
> func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, 
> value int) arrow.Record {
>    rb := array.NewRecordBuilder(pool, schema)
>    fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
>    err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
>    if err != nil {
>       t.Fatal(err)
>    }
>    return rb.NewRecord()
> }
> // Encode and decode a record over a tuple of IPC writer and reader.
> // IPC writer and reader are the same from one call to another.
> func EncodeDecodeIpcStream(t *testing.T,
>    record arrow.Record,
>    bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
>    bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, 
> error) {
>    // Serialize the record via an ipc writer
>    if err := ipcWriter.Write(record); err != nil {
>       return nil, ipcReader, err
>    }
>    serializedRecord := bufWriter.Bytes()
>    bufWriter.Reset()
>    // Deserialize the record via an ipc reader
>    bufReader.Reset(serializedRecord)
>    if ipcReader == nil {
>       newIpcReader, err := ipc.NewReader(bufReader)
>       if err != nil {
>          return nil, newIpcReader, err
>       }
>       ipcReader = newIpcReader
>    }
>    ipcReader.Next()
>    record = ipcReader.Record()
>    // Return the decoded record as a json string
>    json, err := record.MarshalJSON()
>    if err != nil {
>       return nil, ipcReader, err
>    }
>    return json, ipcReader, nil
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to