JakeDern commented on PR #8001:
URL: https://github.com/apache/arrow-rs/pull/8001#issuecomment-3134275403
@asubiotto @alamb I have an initial implementation of both writer and reader
ready for some feedback. Thanks @asubiotto for the starter code, some of the
additional tests were especially helpful!
Something I'm curious to get thoughts on after digging through both the Go
and Rust writer code is that the ability for the ipc writers in the rust code
to produce delta dictionaries is very limited.
Both the ipc writers in Go and Rust look at dictionaries on incoming
RecordBatches one at a time, doing a comparison of the new dictionary with the
old in order to determine if the new is a superset of old. I thought this was
odd because the following test I wrote in Rust produces 0 delta dictionaries
whereas I thought it would produce 2:
```rust
#[test]
fn test_deltas() {
// Dictionary resets at ["C", "D"]
let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A",
"B", "E"]];
run_test(batches, false);
}
```
However since the writer only looks at the dictionary values of the last
RecordBatch, it has to completely reset when `&["C", "D"]`. comes in and
similarly again when `&["A", "B", "E"]` comes in.
This was inconsistent with what I'd seen in Go where similar code _did_
produce two delta dictionaries, despite seeming to follow the same/similar
algorithm.
So I dug a bit more and wrote some Go code with two different writers and
then read it from rust. Batches 0, 2, and 3 are written using one dictionary
builder and batch 1 is written using another:
```go
dictType := &arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Int16,
ValueType: arrow.BinaryTypes.String,
Ordered: false,
}
schema := arrow.NewSchema([]arrow.Field{
{Name: "foo", Type: dictType},
}, nil)
buf := bytes.NewBuffer([]byte{})
writer := ipc.NewWriter(buf, ipc.WithSchema(schema),
ipc.WithDictionaryDeltas(true))
allocator := memory.NewGoAllocator()
dict_builder := array.NewDictionaryBuilder(allocator, dictType)
builder := array.NewStringBuilder(allocator)
builder.AppendStringValues([]string{"A", "B", "C"}, []bool{})
dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
record := array.NewRecord(schema, []arrow.Array{
dict_builder.NewArray(),
}, 3)
if err := writer.Write(record); err != nil {
panic(err)
}
// Reset builder
dict_builder2 := array.NewDictionaryBuilder(allocator, dictType)
builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
dict_builder2.AppendArray(array.NewStringData(builder.NewArray().Data()))
record2 := array.NewRecord(schema, []arrow.Array{
dict_builder2.NewArray(),
}, 3)
if err := writer.Write(record2); err != nil {
panic(err)
}
builder.AppendStringValues([]string{"A", "B", "E"}, []bool{})
dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
record3 := array.NewRecord(schema, []arrow.Array{
dict_builder.NewArray(),
}, 3)
if err := writer.Write(record3); err != nil {
panic(err)
}
builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
record4 := array.NewRecord(schema, []arrow.Array{
dict_builder.NewArray(),
}, 3)
if err := writer.Write(record4); err != nil {
panic(err)
}
// write buf out to ~/delta_test/delta.arrow
if err := os.WriteFile("/home/jakedern/delta_test/delta2.arrow",
buf.Bytes(), 0644); err != nil {
panic(fmt.Errorf("failed to write delta file: %w", err))
}
```
```rust
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
[arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
[
"A",
"B",
"C",
]
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
[arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
[
"A",
"B",
"D",
]
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
[arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
[
"A",
"B",
"C",
"E",
]
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = true
[arrow-ipc/src/reader.rs:725:5] &dictionary_values = StringArray
[
"D",
]
```
What we see is that writing a RecordBatch from a different builder basically
resets the `ipc.Writer` because it doesn't know about values from the first
builder. The insight that I missed the first time is that in Go, there is a
[dictionary](https://github.com/apache/arrow-go/blob/679d97b05d9aa2279cbe56a4ef5cfa94aa657db8/arrow/array/data.go#L42)
stapled to the RecordBatches by the RecordBatchWriter which contains all of
the values produced by that builder. And the ipc writer uses that rather than
the RecordBatch values.
The takeaway being that in Go we need some cooperation between the builder
and the ipc writer to get delta dictionaries a reasonably amount of time. If we
shovel RecordBatches from different builders into the same ipc writer then we
get bad behavior for delta dictionaries.
In rust, we don't have that `dictionary` field and therefore our ability to
write delta dictionaries is always pretty bad. Unless consecutive batches
contain values that are supersets of the previous then we get no deltas and
just waste time comparing dictionaries. Additionally we [reset the internal
dictionary](https://github.com/apache/arrow-rs/blob/94230402c2d31e7da5dc73d1a284cf17940c093c/arrow-array/src/builder/generic_bytes_dictionary_builder.rs#L435)
after creating every batch in rust, presumably because we can't do anything
with it anyway.
My questions are:
1. Is there a better way to handle this than requiring cooperation between
the dictionary builder and the ipc writers? It was difficult for me to figure
out this information and I could imagine that this would surprise a lot of
people who expect it to "just work" and who aren't getting delta dictionaries
in different circumstances.
2. Is there a straightforward way/desire to add a similar `dictionary` field
to rust data? So at least we can get delta dictionaries under similar
condititions as Go writers can.
Curious to hear any thoughts, thank you both for the help so far!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]