This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 70e4069fae [arrow-ipc]: dictionary builders for delta - doc fix and
integration tests for nested types (#9853)
70e4069fae is described below
commit 70e4069faeea69b5252c6145cf5600e3434a0852
Author: albertlockett <[email protected]>
AuthorDate: Sun May 3 08:56:02 2026 -0400
[arrow-ipc]: dictionary builders for delta - doc fix and integration tests
for nested types (#9853)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Closes #NNN.
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
In https://github.com/apache/arrow-rs/issues/9600 we added capability to
call `finish_preserve_values` on any array builder, and it will
propagate the choice to preserve dictionary values down into the nested
builders. I thought it would be good to extend the integration tests we
have in
https://github.com/apache/arrow-rs/blob/main/arrow-ipc/tests/test_delta_dictionary.rs
to cover the use cases, which was to call the method on builders with
nested child builders (such as `StructBuilder` and `ListBuilder`).
While reviewing the PR for #9600 I also noticed a small issue with the
docs related to using the StreamWriter with delta dictionaries, notably
that we set up some options to use delta dictionaries but don't pass the
options into the `StreamWriter` constructor:
https://docs.rs/arrow-ipc/58.1.0/arrow_ipc/writer/struct.StreamWriter.html#example---efficient-delta-dictionaries
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
- adds cases to the integration tests for delta dictionaries covering
`ListBuilder` and `StructBuilder`
- small docs correction for `StreamWriter`
# Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
it is simply docs & tests
# Are there any user-facing changes?
No
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
---
arrow-ipc/src/tests/delta_dictionary.rs | 166 ++++++++++++++++++++++++--------
arrow-ipc/src/writer.rs | 2 +-
2 files changed, 129 insertions(+), 39 deletions(-)
diff --git a/arrow-ipc/src/tests/delta_dictionary.rs
b/arrow-ipc/src/tests/delta_dictionary.rs
index dfd8cd33e5..60a6c945a3 100644
--- a/arrow-ipc/src/tests/delta_dictionary.rs
+++ b/arrow-ipc/src/tests/delta_dictionary.rs
@@ -24,9 +24,11 @@ use crate::{
writer::FileWriter,
};
use arrow_array::{
- Array, ArrayRef, DictionaryArray, RecordBatch, StringArray,
builder::StringDictionaryBuilder,
+ Array, ArrayRef, DictionaryArray, ListArray, RecordBatch, StringArray,
StructArray,
+ builder::{ArrayBuilder, ListBuilder, StringDictionaryBuilder,
StructBuilder},
types::Int32Type,
};
+
use arrow_schema::{DataType, Field, Schema};
use std::io::Cursor;
use std::sync::Arc;
@@ -35,7 +37,7 @@ use std::sync::Arc;
fn test_zero_row_dict() {
let batches: &[&[&str]] = &[&[], &["A"], &[], &["B", "C"], &[]];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(vec![]),
MessageType::RecordBatch,
@@ -48,7 +50,7 @@ fn test_zero_row_dict() {
);
run_resend_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(vec![]),
MessageType::RecordBatch,
@@ -72,7 +74,7 @@ fn test_mixed_delta() {
];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -87,7 +89,7 @@ fn test_mixed_delta() {
);
run_resend_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -106,7 +108,7 @@ fn test_mixed_delta() {
fn test_disjoint_delta() {
let batches: &[&[&str]] = &[&["A"], &["B"], &["C", "E"]];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -118,7 +120,7 @@ fn test_disjoint_delta() {
);
run_resend_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -134,7 +136,7 @@ fn test_disjoint_delta() {
fn test_increasing_delta() {
let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"]];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -146,7 +148,7 @@ fn test_increasing_delta() {
);
run_resend_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -162,7 +164,7 @@ fn test_increasing_delta() {
fn test_single_delta() {
let batches: &[&[&str]] = &[&["A", "B", "C"], &["D"]];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A", "B", "C"])),
MessageType::RecordBatch,
@@ -172,7 +174,7 @@ fn test_single_delta() {
);
run_resend_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A", "B", "C"])),
MessageType::RecordBatch,
@@ -186,7 +188,7 @@ fn test_single_delta() {
fn test_single_same_value_sequence() {
let batches: &[&[&str]] = &[&["A"], &["A"], &["A"], &["A"]];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -197,7 +199,7 @@ fn test_single_same_value_sequence() {
);
run_resend_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A"])),
MessageType::RecordBatch,
@@ -216,7 +218,7 @@ fn str_vec(strings: &[&str]) -> Vec<String> {
fn test_multi_same_value_sequence() {
let batches: &[&[&str]] = &[&["A", "B", "C"], &["A", "B", "C"]];
run_delta_sequence_test(
- batches,
+ &build_batches(batches),
&[
MessageType::Dict(str_vec(&["A", "B", "C"])),
MessageType::RecordBatch,
@@ -232,17 +234,17 @@ enum MessageType {
RecordBatch,
}
-fn run_resend_sequence_test(batches: &[&[&str]], sequence: &[MessageType]) {
+fn run_resend_sequence_test(batches: &[RecordBatch], sequence: &[MessageType])
{
let opts =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Resend);
run_sequence_test(batches, sequence, opts);
}
-fn run_delta_sequence_test(batches: &[&[&str]], sequence: &[MessageType]) {
+fn run_delta_sequence_test(batches: &[RecordBatch], sequence: &[MessageType]) {
let opts =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
run_sequence_test(batches, sequence, opts);
}
-fn run_sequence_test(batches: &[&[&str]], sequence: &[MessageType], options:
IpcWriteOptions) {
+fn run_sequence_test(batches: &[RecordBatch], sequence: &[MessageType],
options: IpcWriteOptions) {
let stream_buf = write_all_to_stream(options.clone(), batches);
let ipc_stream = get_ipc_message_stream(stream_buf);
for (message, expected) in ipc_stream.iter().zip(sequence.iter()) {
@@ -310,7 +312,7 @@ fn test_replace_same_length() {
&["A", "B", "C", "D", "E", "F"],
&["A", "G", "H", "I", "J", "K"],
];
- run_parity_test(batches);
+ run_parity_test(&build_batches(batches));
}
#[test]
@@ -323,14 +325,14 @@ fn test_sparse_deltas() {
&["parquet", "B"],
&["123", "B", "C"],
];
- run_parity_test(batches);
+ run_parity_test(&build_batches(batches));
}
#[test]
fn test_deltas_with_reset() {
// Dictionary resets at ["C", "D"]
let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A", "B",
"C", "D"]];
- run_parity_test(batches);
+ run_parity_test(&build_batches(batches));
}
/// FileWriter can only tolerate very specific patterns of delta dictionaries,
@@ -338,7 +340,19 @@ fn test_deltas_with_reset() {
#[test]
fn test_deltas_with_file() {
let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"], &["A",
"B", "C", "D"]];
- run_parity_test(batches);
+ run_parity_test(&build_batches(batches));
+}
+
+#[test]
+fn test_deltas_with_in_struct() {
+ let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"], &["A",
"B", "C", "D"]];
+ run_parity_test(&build_struct_batches(batches));
+}
+
+#[test]
+fn test_deltas_with_in_list() {
+ let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"], &["A",
"B", "C", "D"]];
+ run_parity_test(&build_list_batches(batches));
}
/// Encode all batches three times and compare all three for the same results
@@ -348,7 +362,7 @@ fn test_deltas_with_file() {
/// - Stream encoding without delta
/// - File encoding with delta (File format does not allow replacement
/// dictionaries)
-fn run_parity_test(batches: &[&[&str]]) {
+fn run_parity_test(batches: &[RecordBatch]) {
let delta_options =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
let delta_stream_buf = write_all_to_stream(delta_options.clone(), batches);
@@ -368,16 +382,16 @@ fn run_parity_test(batches: &[&[&str]]) {
let (first_stream, other_streams) = streams.split_first_mut().unwrap();
for (idx, batch) in first_stream.by_ref().enumerate() {
- let first_dict = extract_dictionary(batch);
- let expected_values = batches[idx];
- assert_eq!(expected_values, &dict_to_vec(first_dict.clone()));
+ let first_dict = extract_dictionary(&batch);
+ let expected_values = dict_to_vec(&extract_dictionary(&batches[idx]));
+ assert_eq!(expected_values, dict_to_vec(&first_dict));
for stream in other_streams.iter_mut() {
let next_batch = stream
.next()
.expect("All streams should yield same number of elements");
- let next_dict = extract_dictionary(next_batch);
- assert_eq!(expected_values, &dict_to_vec(next_dict.clone()));
+ let next_dict = extract_dictionary(&next_batch);
+ assert_eq!(expected_values, dict_to_vec(&next_dict));
assert_eq!(first_dict, next_dict);
}
}
@@ -390,7 +404,7 @@ fn run_parity_test(batches: &[&[&str]]) {
}
}
-fn dict_to_vec(dict: DictionaryArray<Int32Type>) -> Vec<String> {
+fn dict_to_vec(dict: &DictionaryArray<Int32Type>) -> Vec<String> {
dict.downcast_dict::<StringArray>()
.unwrap()
.into_iter()
@@ -418,35 +432,43 @@ fn get_file_batches(buf: Vec<u8>) -> Box<dyn
Iterator<Item = RecordBatch>> {
)
}
-fn extract_dictionary(batch: RecordBatch) ->
DictionaryArray<arrow_array::types::Int32Type> {
- batch
- .column(0)
+fn extract_dictionary(batch: &RecordBatch) ->
DictionaryArray<arrow_array::types::Int32Type> {
+ let mut column = batch.column(0);
+
+ // if we've been passed a struct, assume the first column contains the dict
+ if let Some(struct_arr) = column.as_any().downcast_ref::<StructArray>() {
+ column = struct_arr.column(0);
+ }
+
+ // if we've been passed a list, assume the lists' values are the dict
+ if let Some(list_arr) = column.as_any().downcast_ref::<ListArray>() {
+ column = list_arr.values();
+ }
+
+ column
.as_any()
.downcast_ref::<DictionaryArray<arrow_array::types::Int32Type>>()
.unwrap()
.clone()
}
-fn write_all_to_file(options: IpcWriteOptions, vals: &[&[&str]]) -> Vec<u8> {
- let batches = build_batches(vals);
+fn write_all_to_file(options: IpcWriteOptions, batches: &[RecordBatch]) ->
Vec<u8> {
let mut buf: Vec<u8> = Vec::new();
let mut writer =
FileWriter::try_new_with_options(&mut buf, &batches[0].schema(),
options).unwrap();
for batch in batches {
- writer.write(&batch).unwrap();
+ writer.write(batch).unwrap();
}
writer.finish().unwrap();
buf
}
-fn write_all_to_stream(options: IpcWriteOptions, vals: &[&[&str]]) -> Vec<u8> {
- let batches = build_batches(vals);
-
+fn write_all_to_stream(options: IpcWriteOptions, batches: &[RecordBatch]) ->
Vec<u8> {
let mut buf: Vec<u8> = Vec::new();
let mut writer =
StreamWriter::try_new_with_options(&mut buf, &batches[0].schema(),
options).unwrap();
for batch in batches {
- writer.write(&batch).unwrap();
+ writer.write(batch).unwrap();
}
writer.finish().unwrap();
@@ -477,3 +499,71 @@ fn build_batch(
RecordBatch::try_new(schema.clone(), vec![Arc::new(array) as
ArrayRef]).unwrap()
}
+
+/// build batches where the dictionary array is nested within a struct array.
The dictionary array
+/// is the first field within the struct.
+fn build_struct_batches(vals: &[&[&str]]) -> Vec<RecordBatch> {
+ let total_vals = vals.iter().map(|v| v.len()).sum();
+ let mut struct_builder = StructBuilder::from_fields(
+ vec![Field::new(
+ "struct",
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8)),
+ false,
+ )],
+ total_vals,
+ );
+
+ vals.iter()
+ .map(|v| build_struct_batch(v, &mut struct_builder))
+ .collect()
+}
+
+fn build_struct_batch(vals: &[&str], struct_builder: &mut StructBuilder) ->
RecordBatch {
+ for &val in vals {
+ let dict_builder = struct_builder
+
.field_builder::<StringDictionaryBuilder<arrow_array::types::Int32Type>>(0)
+ .unwrap();
+ dict_builder.append_value(val);
+ struct_builder.append(true);
+ }
+
+ let array = struct_builder.finish_preserve_values();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "dict",
+ array.data_type().clone(),
+ true,
+ )]));
+
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(array) as
ArrayRef]).unwrap()
+}
+
+/// builds batches where the dictionary array is nested within a list array
+fn build_list_batches(vals: &[&[&str]]) -> Vec<RecordBatch> {
+ let mut list_builder =
ListBuilder::new(StringDictionaryBuilder::<Int32Type>::new());
+
+ vals.iter()
+ .map(|v| build_list_batch(v, &mut list_builder))
+ .collect()
+}
+
+fn build_list_batch(
+ vals: &[&str],
+ list_builder: &mut ListBuilder<StringDictionaryBuilder<Int32Type>>,
+) -> RecordBatch {
+ for &val in vals {
+ let vals_builder = list_builder.values();
+ vals_builder.append(val).unwrap();
+ list_builder.append(true);
+ }
+
+ let array = list_builder.finish_preserve_values();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "dict",
+ array.data_type().clone(),
+ true,
+ )]));
+
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(array) as
ArrayRef]).unwrap()
+}
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index a05072a2c4..46e2dd7739 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -1351,7 +1351,7 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
/// // enable delta dictionaries in the writer
/// let options =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
-/// let mut writer = StreamWriter::try_new(&mut stream, &schema).unwrap();
+/// let mut writer = StreamWriter::try_new_with_options(&mut stream, &schema,
options).unwrap();
///
/// // When writing the first batch, a dictionary message with 'a' and 'b'
will be written
/// // prior to the record batch.