[ 
https://issues.apache.org/jira/browse/ARROW-18317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634012#comment-17634012
 ] 

Laurent Querel commented on ARROW-18317:
----------------------------------------

[~zeroshade] Thanks. I will check this PR with my code today.

However, I have a question. It seems that the fix is to replace and not merge 
the dictionaries at the receiver level. What happens if the sender sends data 
from the 1st dictionary?

This also leads me to ask the question of delta dictionaries support. How can 
we activate it to minimize the number of dictionary transmission? 

> [Go] Problem of dictionary update during a communication via an IPC stream 
> ---------------------------------------------------------------------------
>
>                 Key: ARROW-18317
>                 URL: https://issues.apache.org/jira/browse/ARROW-18317
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Go
>    Affects Versions: 10.0.0
>            Reporter: Laurent Querel
>            Assignee: Matthew Topol
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 10.0.1, 11.0.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Dictionaries do not seem to be updated correctly when sending a record on an 
> IPC stream. 
> The following example creates a 1st record with a single field named "field" 
> and initialized with the value "value_0. This record is then serialized with 
> an ipc writer and deserialized with an ipc reader.
> A second record is then created with the value "value_1". After serialization 
> and deserialization, the expected value for the field is "value_1" but I get 
> "value_0". 
> Based on a quick analysis via the debugger, I suspect an error in combining 
> the dictionary from step 1 with the dictionary from step 2. The resulting 
> dictionary contains the concatenation of the two dictionaries (i.e. 
> value_0value_1), but the offsets values used to read the field (of the second 
> record) refer "value_0". It may be that the offset arrays are not correctly 
> combined or something like that when the second record is received.
> Below a code snippet to reproduce the issue.
> {code:java}
> // NOTE: Release methods are not managed in this test for simplicity.
> func TestDictionary(t *testing.T) {
>    pool := memory.NewGoAllocator()
>    // A schema with a single dictionary field
>    schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: 
> &arrow.DictionaryType{
>       IndexType: arrow.PrimitiveTypes.Uint16,
>       ValueType: arrow.BinaryTypes.String,
>       Ordered:   false,
>    }}}, nil)
>    // IPC writer and reader
>    var bufWriter bytes.Buffer
>    ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
>    bufReader := bytes.NewReader([]byte{})
>    var ipcReader *ipc.Reader
>    // Create a first record with field = "value_0" 
>    record := CreateRecord(t, pool, schema, 0)
>    expectedJson, err := record.MarshalJSON()
>    require.NoError(t, err)
>    // Serialize and deserialize the record via an IPC stream
>    json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, 
> ipcWriter, bufReader, ipcReader)
>    require.NoError(t, err)
>    // Compare the expected JSON with the actual JSON
>    require.JSONEq(t, string(expectedJson), string(json))
>    // Create a second record with field = "value_1" 
>    record = CreateRecord(t, pool, schema, 1)
>    expectedJson, err = record.MarshalJSON()
>    require.NoError(t, err)
>    // Serialize and deserialize the record via an IPC stream
>    json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, 
> ipcWriter, bufReader, ipcReader)
>    require.NoError(t, err)
>    // Compare the expected JSON with the actual JSON
>    // field = "value_0" but should be "value_1"
>    require.JSONEq(t, string(expectedJson), string(json))
> }
> // Create a record with a single field.
> // The value of field `field` depends on the value passed in parameter.
> func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, 
> value int) arrow.Record {
>    rb := array.NewRecordBuilder(pool, schema)
>    fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
>    err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
>    if err != nil {
>       t.Fatal(err)
>    }
>    return rb.NewRecord()
> }
> // Encode and decode a record over a tuple of IPC writer and reader.
> // IPC writer and reader are the same from one call to another.
> func EncodeDecodeIpcStream(t *testing.T,
>    record arrow.Record,
>    bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
>    bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, 
> error) {
>    // Serialize the record via an ipc writer
>    if err := ipcWriter.Write(record); err != nil {
>       return nil, ipcReader, err
>    }
>    serializedRecord := bufWriter.Bytes()
>    bufWriter.Reset()
>    // Deserialize the record via an ipc reader
>    bufReader.Reset(serializedRecord)
>    if ipcReader == nil {
>       newIpcReader, err := ipc.NewReader(bufReader)
>       if err != nil {
>          return nil, newIpcReader, err
>       }
>       ipcReader = newIpcReader
>    }
>    ipcReader.Next()
>    record = ipcReader.Record()
>    // Return the decoded record as a json string
>    json, err := record.MarshalJSON()
>    if err != nil {
>       return nil, ipcReader, err
>    }
>    return json, ipcReader, nil
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to