This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 12c4c211 feat(reader): null struct default values in create_column
(#1847)
12c4c211 is described below
commit 12c4c2110174b76eb82e584cb7933efdc9afcecc
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Nov 13 05:21:02 2025 -0500
feat(reader): null struct default values in create_column (#1847)
Fixes
`TestSparkReaderDeletes.testPosDeletesOnParquetFileWithMultipleRowGroups`
in Iceberg Java 1.10 with DataFusion Comet.
## Which issue does this PR close?
- Partially address #1749.
## What changes are included in this PR?
- While `RecordBatchTransformer` does not have exhaustive nested type
support yet, this adds logic to `create_column` in the specific scenario
for a schema evolution with a new struct column that uses the default
NULL value.
- If the column has a default value other than NULL defined, it will
fall into the existing match arm and say it is unsupported.
## Are these changes tested?
New test to reflect what happens with Iceberg Java 1.10's
`TestSparkReaderDeletes.testPosDeletesOnParquetFileWithMultipleRowGroups`.
The test is misleading, since I figured testing positional deletes would
just be a delete vector and be schema agnostic, but [it includes schema
change with binary and struct types so we need default NULL
values](https://github.com/apache/iceberg/blob/53c046efda5d6c6ac67caf7de29849ab7ac6d406/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java#L65).
---
.../iceberg/src/arrow/record_batch_transformer.rs | 93 ++++++++++++++++++++++
1 file changed, 93 insertions(+)
diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index e7d8b8f0..07ec4391 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -21,7 +21,9 @@ use std::sync::Arc;
use arrow_array::{
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Float32Array,
Float64Array, Int32Array, Int64Array, NullArray, RecordBatch,
RecordBatchOptions, StringArray,
+ StructArray,
};
+use arrow_buffer::NullBuffer;
use arrow_cast::cast;
use arrow_schema::{
DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
SchemaRef,
@@ -594,6 +596,21 @@ impl RecordBatchTransformer {
let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
Arc::new(BinaryArray::from_opt_vec(vals))
}
+ (DataType::Struct(fields), None) => {
+ // Create a StructArray filled with nulls. Per Iceberg spec,
optional struct fields
+ // default to null when added to the schema. We defer non-null
default struct values
+ // and leave them as not implemented yet.
+ let null_arrays: Vec<ArrayRef> = fields
+ .iter()
+ .map(|field| Self::create_column(field.data_type(), &None,
num_rows))
+ .collect::<Result<Vec<_>>>()?;
+
+ Arc::new(StructArray::new(
+ fields.clone(),
+ null_arrays,
+ Some(NullBuffer::new_null(num_rows)),
+ ))
+ }
(DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
(dt, _) => {
return Err(Error::new(
@@ -743,6 +760,82 @@ mod test {
assert!(date_column.is_null(2));
}
+ #[test]
+ fn schema_evolution_adds_struct_column_with_nulls() {
+ // Test that when a struct column is added after data files are
written,
+ // the transformer can materialize the missing struct column with null
values.
+ // This reproduces the scenario from Iceberg 1.10.0
TestSparkReaderDeletes tests
+ // where binaryData and structData columns were added to the schema.
+ let snapshot_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "data",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(
+ 3,
+ "struct_col",
+ Type::Struct(crate::spec::StructType::new(vec![
+ NestedField::optional(
+ 100,
+ "inner_field",
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ ])),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let projected_iceberg_field_ids = [1, 2, 3];
+
+ let mut transformer =
+ RecordBatchTransformer::build(snapshot_schema,
&projected_iceberg_field_ids);
+
+ let file_schema = Arc::new(ArrowSchema::new(vec![
+ simple_field("id", DataType::Int32, false, "1"),
+ simple_field("data", DataType::Utf8, false, "2"),
+ ]));
+
+ let file_batch = RecordBatch::try_new(file_schema, vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StringArray::from(vec!["a", "b", "c"])),
+ ])
+ .unwrap();
+
+ let result = transformer.process_record_batch(file_batch).unwrap();
+
+ assert_eq!(result.num_columns(), 3);
+ assert_eq!(result.num_rows(), 3);
+
+ let id_column = result
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_column.values(), &[1, 2, 3]);
+
+ let data_column = result
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(data_column.value(0), "a");
+ assert_eq!(data_column.value(1), "b");
+ assert_eq!(data_column.value(2), "c");
+
+ let struct_column = result
+ .column(2)
+ .as_any()
+ .downcast_ref::<arrow_array::StructArray>()
+ .unwrap();
+ assert!(struct_column.is_null(0));
+ assert!(struct_column.is_null(1));
+ assert!(struct_column.is_null(2));
+ }
+
pub fn source_record_batch() -> RecordBatch {
RecordBatch::try_new(
arrow_schema_promotion_addition_and_renaming_required(),