This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fa07ec6f fix: fix read parquert file when schema change (#1750)
fa07ec6f is described below
commit fa07ec6f26c785de332c75e4f19d87040d543dc3
Author: Dylan <[email protected]>
AuthorDate: Fri Oct 17 19:03:15 2025 +0800
fix: fix read parquert file when schema change (#1750)
---
crates/iceberg/src/arrow/reader.rs | 129 ++++++++++++++++++++++++++++++-------
1 file changed, 106 insertions(+), 23 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index 564b2811..ff4cff0a 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -583,35 +583,25 @@ impl ArrowReader {
true
});
- if column_map.len() != leaf_field_ids.len() {
- let missing_fields = leaf_field_ids
- .iter()
- .filter(|field_id| !column_map.contains_key(field_id))
- .collect::<Vec<_>>();
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Parquet schema {} and Iceberg schema {} do not
match.",
- iceberg_schema, iceberg_schema_of_task
- ),
- )
- .with_context("column_map", format! {"{:?}", column_map})
- .with_context("field_ids", format! {"{:?}", leaf_field_ids})
- .with_context("missing_fields", format! {"{:?}",
missing_fields}));
- }
-
+ // Only project columns that exist in the Parquet file.
+ // Missing columns will be added by RecordBatchTransformer with
default/NULL values.
+ // This supports schema evolution where new columns are added to
the table schema
+ // but old Parquet files don't have them yet.
let mut indices = vec![];
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
- } else {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!("Field {} is not found in Parquet schema.",
field_id),
- ));
}
+ // Skip fields that don't exist in the Parquet file - they
will be added later
+ }
+
+ if indices.is_empty() {
+ // If no columns from the projection exist in the file,
project all columns
+ // This can happen if all requested columns are new and need
to be added by the transformer
+ Ok(ProjectionMask::all())
+ } else {
+ Ok(ProjectionMask::leaves(parquet_schema, indices))
}
- Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}
@@ -1958,4 +1948,97 @@ message schema {
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
+
+ /// Test schema evolution: reading old Parquet file (with only column 'a')
+ /// using a newer table schema (with columns 'a' and 'b').
+ /// This tests that:
+ /// 1. get_arrow_projection_mask allows missing columns
+ /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
+ #[tokio::test]
+ async fn test_schema_evolution_add_column() {
+ use arrow_array::{Array, Int32Array};
+
+ // New table schema: columns 'a' and 'b' (b was added later, file only
has 'a')
+ let new_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(2)
+ .with_fields(vec![
+ NestedField::required(1, "a",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "b",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Create Arrow schema for old Parquet file (only has column 'a')
+ let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
+ Field::new("a", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ // Write old Parquet file with only column 'a'
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+ let to_write = RecordBatch::try_new(arrow_schema_old.clone(),
vec![data_a]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+ let file = File::create(format!("{}/old_file.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ // Read the old Parquet file using the NEW schema (with column 'b')
+ let reader = ArrowReaderBuilder::new(file_io).build();
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/old_file.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: new_schema.clone(),
+ project_field_ids: vec![1, 2], // Request both columns 'a' and
'b'
+ predicate: None,
+ deletes: vec![],
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Verify we got the correct data
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+
+ // Should have 2 columns now
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.num_rows(), 3);
+
+ // Column 'a' should have the original data
+ let col_a = batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(col_a.values(), &[1, 2, 3]);
+
+ // Column 'b' should be all NULLs (it didn't exist in the old file)
+ let col_b = batch
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(col_b.null_count(), 3);
+ assert!(col_b.is_null(0));
+ assert!(col_b.is_null(1));
+ assert!(col_b.is_null(2));
+ }
}