This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fa07ec6f fix: fix read parquert file when schema change (#1750)
fa07ec6f is described below

commit fa07ec6f26c785de332c75e4f19d87040d543dc3
Author: Dylan <[email protected]>
AuthorDate: Fri Oct 17 19:03:15 2025 +0800

    fix: fix read parquert file when schema change (#1750)
---
 crates/iceberg/src/arrow/reader.rs | 129 ++++++++++++++++++++++++++++++-------
 1 file changed, 106 insertions(+), 23 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 564b2811..ff4cff0a 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -583,35 +583,25 @@ impl ArrowReader {
                 true
             });
 
-            if column_map.len() != leaf_field_ids.len() {
-                let missing_fields = leaf_field_ids
-                    .iter()
-                    .filter(|field_id| !column_map.contains_key(field_id))
-                    .collect::<Vec<_>>();
-                return Err(Error::new(
-                    ErrorKind::DataInvalid,
-                    format!(
-                        "Parquet schema {} and Iceberg schema {} do not 
match.",
-                        iceberg_schema, iceberg_schema_of_task
-                    ),
-                )
-                .with_context("column_map", format! {"{:?}", column_map})
-                .with_context("field_ids", format! {"{:?}", leaf_field_ids})
-                .with_context("missing_fields", format! {"{:?}", 
missing_fields}));
-            }
-
+            // Only project columns that exist in the Parquet file.
+            // Missing columns will be added by RecordBatchTransformer with 
default/NULL values.
+            // This supports schema evolution where new columns are added to 
the table schema
+            // but old Parquet files don't have them yet.
             let mut indices = vec![];
             for field_id in leaf_field_ids {
                 if let Some(col_idx) = column_map.get(&field_id) {
                     indices.push(*col_idx);
-                } else {
-                    return Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Field {} is not found in Parquet schema.", 
field_id),
-                    ));
                 }
+                // Skip fields that don't exist in the Parquet file - they 
will be added later
+            }
+
+            if indices.is_empty() {
+                // If no columns from the projection exist in the file, 
project all columns
+                // This can happen if all requested columns are new and need 
to be added by the transformer
+                Ok(ProjectionMask::all())
+            } else {
+                Ok(ProjectionMask::leaves(parquet_schema, indices))
             }
-            Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
 
@@ -1958,4 +1948,97 @@ message schema {
 
         Arc::new(SchemaDescriptor::new(Arc::new(schema)))
     }
+
+    /// Test schema evolution: reading old Parquet file (with only column 'a')
+    /// using a newer table schema (with columns 'a' and 'b').
+    /// This tests that:
+    /// 1. get_arrow_projection_mask allows missing columns
+    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
+    #[tokio::test]
+    async fn test_schema_evolution_add_column() {
+        use arrow_array::{Array, Int32Array};
+
+        // New table schema: columns 'a' and 'b' (b was added later, file only 
has 'a')
+        let new_schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(2)
+                .with_fields(vec![
+                    NestedField::required(1, "a", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::optional(2, "b", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // Create Arrow schema for old Parquet file (only has column 'a')
+        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
+            Field::new("a", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+        ]));
+
+        // Write old Parquet file with only column 'a'
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), 
vec![data_a]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+        let file = File::create(format!("{}/old_file.parquet", 
&table_location)).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Read the old Parquet file using the NEW schema (with column 'b')
+        let reader = ArrowReaderBuilder::new(file_io).build();
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{}/old_file.parquet", table_location),
+                data_file_format: DataFileFormat::Parquet,
+                schema: new_schema.clone(),
+                project_field_ids: vec![1, 2], // Request both columns 'a' and 
'b'
+                predicate: None,
+                deletes: vec![],
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Verify we got the correct data
+        assert_eq!(result.len(), 1);
+        let batch = &result[0];
+
+        // Should have 2 columns now
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 3);
+
+        // Column 'a' should have the original data
+        let col_a = batch
+            .column(0)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(col_a.values(), &[1, 2, 3]);
+
+        // Column 'b' should be all NULLs (it didn't exist in the old file)
+        let col_b = batch
+            .column(1)
+            .as_primitive::<arrow_array::types::Int32Type>();
+        assert_eq!(col_b.null_count(), 3);
+        assert!(col_b.is_null(0));
+        assert!(col_b.is_null(1));
+        assert!(col_b.is_null(2));
+    }
 }

Reply via email to