JakeDern opened a new pull request, #8001:
URL: https://github.com/apache/arrow-rs/pull/8001

   # Which issue does this PR close?
   
   - Closes #6783.
   
   # Rationale for this change
   
   Delta dictionaries are not supported by either the arrow-ipc reader or 
writer. Other languages like Go have delta dictionary support and so reading 
ipc streams produced by those languages sometimes includes delta dictionaries. 
   
   This PR adds just the reader support so that we can consume streams with 
those messages in rust.
   
   # What changes are included in this PR?
   
   - Update `read_dictionary_impl` to support delta dictionaries by 
concatenating the dictionaries if `isDelta()` is true
   
   # Are these changes tested?
   
   I need some pointers on the best way to test this using only rust, but am 
happy to implement any suggestions 🙂. The validation that I did so far involved 
using the Go ipc writer to dump stream data to a file which I then read from 
rust:
   
   The go code writing the stream:
   
   ```go
   dictType := &arrow.DictionaryType{
                IndexType: arrow.PrimitiveTypes.Int16,
                ValueType: arrow.BinaryTypes.String,
                Ordered:   false,
        }
   
        schema := arrow.NewSchema([]arrow.Field{
                {Name: "foo", Type: dictType},
        }, nil)
   
        buf := bytes.NewBuffer([]byte{})
        writer := ipc.NewWriter(buf, ipc.WithSchema(schema), 
ipc.WithDictionaryDeltas(true))
   
        allocator := memory.NewGoAllocator()
   
        dict_builder := array.NewDictionaryBuilder(allocator, dictType)
   
        builder := array.NewStringBuilder(allocator)
        builder.AppendStringValues([]string{"A", "B", "C"}, []bool{})
        dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
        record := array.NewRecord(schema, []arrow.Array{
                dict_builder.NewArray(),
        }, 3)
   
        if err := writer.Write(record); err != nil {
                panic(err)
        }
   
        builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
        dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
        record2 := array.NewRecord(schema, []arrow.Array{
                dict_builder.NewArray(),
        }, 3)
   
        if err := writer.Write(record2); err != nil {
                panic(err)
        }
   
        // write buf out to ~/delta_test/delta.arrow
        if err := os.WriteFile("/home/jakedern/delta_test/delta.arrow", 
buf.Bytes(), 0644); err != nil {
                panic(fmt.Errorf("failed to write delta file: %w", err))
        }
   ```
   
   Rust test reading the stream:
   
   ```rust
    #[test]
       fn test_delta_read() {
           let f = 
std::fs::File::open("/home/jakedern/delta_test/delta.arrow").unwrap();
           let reader = StreamReader::try_new(f, None).unwrap();
           for record in reader.into_iter() {
               let record = record.unwrap();
               dbg!(record);
           }
       }
   ```
   Rust test output:
   
   ```text
    Blocking waiting for file lock on build directory
      Compiling arrow-ipc v55.2.0 (/home/jakedern/repos/arrow-rs/arrow-ipc)
       Finished `test` profile [unoptimized + debuginfo] target(s) in 3.42s
        Running unittests src/lib.rs 
(/home/jakedern/repos/arrow-rs/target/debug/deps/arrow_ipc-8945b55df9ad9a79)
   
   running 1 test
   [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
   [arrow-ipc/src/reader.rs:1633:13] record = RecordBatch {
       schema: Schema {
           fields: [
               Field {
                   name: "foo",
                   data_type: Dictionary(
                       Int16,
                       Utf8,
                   ),
                   nullable: false,
                   dict_id: 0,
                   dict_is_ordered: false,
                   metadata: {},
               },
           ],
           metadata: {},
       },
       columns: [
           DictionaryArray {keys: PrimitiveArray<Int16>
           [
             0,
             1,
             2,
           ] values: StringArray
           [
             "A",
             "B",
             "C",
           ]}
           ,
       ],
       row_count: 3,
   }
   [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = true
   [arrow-ipc/src/reader.rs:1633:13] record = RecordBatch {
       schema: Schema {
           fields: [
               Field {
                   name: "foo",
                   data_type: Dictionary(
                       Int16,
                       Utf8,
                   ),
                   nullable: false,
                   dict_id: 0,
                   dict_is_ordered: false,
                   metadata: {},
               },
           ],
           metadata: {},
       },
       columns: [
           DictionaryArray {keys: PrimitiveArray<Int16>
           [
             0,
             1,
             3,
           ] values: StringArray
           [
             "A",
             "B",
             "C",
             "D",
           ]}
           ,
       ],
       row_count: 3,
   }
   test reader::tests::test_delta_read ... ok
   
   test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 69 filtered out; 
finished in 0.00s
   ```
   
   # Are there any user-facing changes?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to