This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1384a4f2d feat(core): Add support for `_file` column (#1824)
1384a4f2d is described below

commit 1384a4f2d71ed16b73f3b1f139d5dbd4e5035428
Author: Gerald Berger <[email protected]>
AuthorDate: Tue Dec 9 11:46:49 2025 +0100

    feat(core): Add support for `_file` column (#1824)
    
    ## Which issue does this PR close?
    
    
    - Closes #1766.
    
    ## What changes are included in this PR?
    
    Integrates virtual field handling for the `_file` metadata column into
    `RecordBatchTransformer` using a pre-computed constants map, eliminating
    post-processing and duplicate lookups.
    
    ## Key Changes
    
    **New `metadata_columns.rs` module**: Centralized utilities for metadata
    columns
    - Constants: `RESERVED_FIELD_ID_FILE`, `RESERVED_COL_NAME_FILE`
    - Helper functions: `get_metadata_column_name()`,
    `get_metadata_field_id()`, `is_metadata_field()`,
    `is_metadata_column_name()`
    
    **Enhanced `RecordBatchTransformer`**:
    - Added `constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>` -
    pre-computed during initialization
    - New `with_constant()` method - computes Arrow type once during setup
    - Updated to use pre-computed types and values (avoids duplicate
    lookups)
    - Handles `DataType::RunEndEncoded` for constant strings (memory
    efficient)
    
    **Simplified `reader.rs`**:
    - Pass full `project_field_ids` (including virtual) to
    RecordBatchTransformer
    - Single `with_constant()` call to register `_file` column
    - Removed post-processing loop
    
    **Updated `scan/mod.rs`**:
    - Use `is_metadata_column_name()` and `get_metadata_field_id()` instead
    of hardcoded checks
    ## Are these changes tested?
    
    Yes, comprehensive tests have been added to verify the functionality:
    
    ### New Tests (7 tests added)
    
    #### Table Scan API Tests (7 tests)
    
    1. **`test_select_with_file_column`** - Verifies basic functionality of
    selecting `_file` with regular columns
    2. **`test_select_file_column_position`** - Verifies column ordering is
    preserved
    3. **`test_select_file_column_only`** - Tests selecting only the `_file`
    column
    4. **`test_file_column_with_multiple_files`** - Tests multiple data
    files scenario
    5. **`test_file_column_at_start`** - Tests `_file` at position 0
    6. **`test_file_column_at_end`** - Tests `_file` at the last position
    7. **`test_select_with_repeated_column_names`** - Tests repeated column
    selection
---
 crates/iceberg/src/arrow/reader.rs                 |  29 +-
 .../iceberg/src/arrow/record_batch_transformer.rs  | 476 ++++++++++++---------
 crates/iceberg/src/arrow/schema.rs                 |  54 +++
 crates/iceberg/src/arrow/value.rs                  | 216 +++++++++-
 crates/iceberg/src/lib.rs                          |   1 +
 crates/iceberg/src/metadata_columns.rs             | 127 ++++++
 crates/iceberg/src/scan/mod.rs                     | 341 ++++++++++++++-
 7 files changed, 1031 insertions(+), 213 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index ab5a96f75..de8a1420e 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -54,6 +54,7 @@ use 
crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
 use 
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
 use crate::expr::{BoundPredicate, BoundReference};
 use crate::io::{FileIO, FileMetadata, FileRead};
+use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
 use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
 use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, 
Type};
 use crate::utils::available_parallelism;
@@ -250,12 +251,20 @@ impl ArrowReader {
             initial_stream_builder
         };
 
+        // Filter out metadata fields for Parquet projection (they don't exist 
in files)
+        let project_field_ids_without_metadata: Vec<i32> = task
+            .project_field_ids
+            .iter()
+            .filter(|&&id| !is_metadata_field(id))
+            .copied()
+            .collect();
+
         // Create projection mask based on field IDs
         // - If file has embedded IDs: field-ID-based projection 
(missing_field_ids=false)
         // - If name mapping applied: field-ID-based projection 
(missing_field_ids=true but IDs now match)
         // - If fallback IDs: position-based projection 
(missing_field_ids=true)
         let projection_mask = Self::get_arrow_projection_mask(
-            &task.project_field_ids,
+            &project_field_ids_without_metadata,
             &task.schema,
             record_batch_stream_builder.parquet_schema(),
             record_batch_stream_builder.schema(),
@@ -266,16 +275,23 @@ impl ArrowReader {
             
record_batch_stream_builder.with_projection(projection_mask.clone());
 
         // RecordBatchTransformer performs any transformations required on the 
RecordBatches
-        // that come back from the file, such as type promotion, default 
column insertion
-        // and column re-ordering.
+        // that come back from the file, such as type promotion, default 
column insertion,
+        // column re-ordering, partition constants, and virtual field addition 
(like _file)
         let mut record_batch_transformer_builder =
             RecordBatchTransformerBuilder::new(task.schema_ref(), 
task.project_field_ids());
 
+        // Add the _file metadata column if it's in the projected fields
+        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
+            let file_datum = Datum::string(task.data_file_path.clone());
+            record_batch_transformer_builder =
+                
record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, 
file_datum);
+        }
+
         if let (Some(partition_spec), Some(partition_data)) =
             (task.partition_spec.clone(), task.partition.clone())
         {
             record_batch_transformer_builder =
-                
record_batch_transformer_builder.with_partition(partition_spec, partition_data);
+                
record_batch_transformer_builder.with_partition(partition_spec, 
partition_data)?;
         }
 
         let mut record_batch_transformer = 
record_batch_transformer_builder.build();
@@ -416,7 +432,10 @@ impl ArrowReader {
             record_batch_stream_builder
                 .build()?
                 .map(move |batch| match batch {
-                    Ok(batch) => 
record_batch_transformer.process_record_batch(batch),
+                    Ok(batch) => {
+                        // Process the record batch (type promotion, column 
reordering, virtual fields, etc.)
+                        record_batch_transformer.process_record_batch(batch)
+                    }
                     Err(err) => Err(err.into()),
                 });
 
diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs 
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index a20adb6a5..f30d4a09c 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -19,24 +19,23 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use arrow_array::{
-    Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Float32Array,
-    Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, 
RecordBatchOptions, StringArray,
-    StructArray,
+    Array as ArrowArray, ArrayRef, Int32Array, RecordBatch, 
RecordBatchOptions, RunArray,
 };
-use arrow_buffer::NullBuffer;
 use arrow_cast::cast;
 use arrow_schema::{
-    DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, 
SchemaRef,
+    DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as 
ArrowSchemaRef, SchemaRef,
 };
 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
-use crate::arrow::schema_to_arrow_schema;
+use crate::arrow::value::{create_primitive_array_repeated, 
create_primitive_array_single_element};
+use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema};
+use crate::metadata_columns::get_metadata_field;
 use crate::spec::{
-    Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, 
Transform,
+    Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, 
Struct, Transform,
 };
 use crate::{Error, ErrorKind, Result};
 
-/// Build a map of field ID to constant value for identity-partitioned fields.
+/// Build a map of field ID to constant value (as Datum) for 
identity-partitioned fields.
 ///
 /// Implements Iceberg spec "Column Projection" rule #1: use partition 
metadata constants
 /// only for identity-transformed fields. Non-identity transforms (bucket, 
truncate, year, etc.)
@@ -53,20 +52,65 @@ use crate::{Error, ErrorKind, Result};
 fn constants_map(
     partition_spec: &PartitionSpec,
     partition_data: &Struct,
-) -> HashMap<i32, PrimitiveLiteral> {
+    schema: &IcebergSchema,
+) -> Result<HashMap<i32, Datum>> {
     let mut constants = HashMap::new();
 
     for (pos, field) in partition_spec.fields().iter().enumerate() {
         // Only identity transforms should use constant values from partition 
metadata
         if matches!(field.transform, Transform::Identity) {
+            // Get the field from schema to extract its type
+            let iceberg_field = 
schema.field_by_id(field.source_id).ok_or(Error::new(
+                ErrorKind::Unexpected,
+                format!("Field {} not found in schema", field.source_id),
+            ))?;
+
+            // Ensure the field type is primitive
+            let prim_type = match &*iceberg_field.field_type {
+                crate::spec::Type::Primitive(prim_type) => prim_type,
+                _ => {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        format!(
+                            "Partition field {} has non-primitive type {:?}",
+                            field.source_id, iceberg_field.field_type
+                        ),
+                    ));
+                }
+            };
+
             // Get the partition value for this field
-            if let Some(Literal::Primitive(value)) = &partition_data[pos] {
-                constants.insert(field.source_id, value.clone());
+            // Handle both None (null) and Some(Literal::Primitive) cases
+            match &partition_data[pos] {
+                None => {
+                    // TODO 
(https://github.com/apache/iceberg-rust/issues/1914): Add support for null 
datum values.
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        format!(
+                            "Partition field {} has null value for identity 
transform",
+                            field.source_id
+                        ),
+                    ));
+                }
+                Some(Literal::Primitive(value)) => {
+                    // Create a Datum from the primitive type and value
+                    let datum = Datum::new(prim_type.clone(), value.clone());
+                    constants.insert(field.source_id, datum);
+                }
+                Some(literal) => {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        format!(
+                            "Partition field {} has non-primitive value: {:?}",
+                            field.source_id, literal
+                        ),
+                    ));
+                }
             }
         }
     }
 
-    constants
+    Ok(constants)
 }
 
 /// Indicates how a particular column in a processed RecordBatch should
@@ -146,13 +190,13 @@ enum SchemaComparison {
 
 /// Builder for RecordBatchTransformer to improve ergonomics when constructing 
with optional parameters.
 ///
-/// See [`RecordBatchTransformer`] for details on partition spec and partition 
data.
+/// Constant fields are pre-computed for both virtual/metadata fields (like 
_file) and
+/// identity-partitioned fields to avoid duplicate work during batch 
processing.
 #[derive(Debug)]
 pub(crate) struct RecordBatchTransformerBuilder {
     snapshot_schema: Arc<IcebergSchema>,
     projected_iceberg_field_ids: Vec<i32>,
-    partition_spec: Option<Arc<PartitionSpec>>,
-    partition_data: Option<Struct>,
+    constant_fields: HashMap<i32, Datum>,
 }
 
 impl RecordBatchTransformerBuilder {
@@ -163,32 +207,48 @@ impl RecordBatchTransformerBuilder {
         Self {
             snapshot_schema,
             projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
-            partition_spec: None,
-            partition_data: None,
+            constant_fields: HashMap::new(),
         }
     }
 
+    /// Add a constant value for a specific field ID.
+    /// This is used for virtual/metadata fields like _file that have constant 
values per batch.
+    ///
+    /// # Arguments
+    /// * `field_id` - The field ID to associate with the constant
+    /// * `datum` - The constant value (with type) for this field
+    pub(crate) fn with_constant(mut self, field_id: i32, datum: Datum) -> Self 
{
+        self.constant_fields.insert(field_id, datum);
+        self
+    }
+
     /// Set partition spec and data together for identifying 
identity-transformed partition columns.
     ///
     /// Both partition_spec and partition_data must be provided together since 
the spec defines
     /// which fields are identity-partitioned, and the data provides their 
constant values.
-    /// One without the other cannot produce a valid constants map.
+    /// This method computes the partition constants and merges them into 
constant_fields.
     pub(crate) fn with_partition(
         mut self,
         partition_spec: Arc<PartitionSpec>,
         partition_data: Struct,
-    ) -> Self {
-        self.partition_spec = Some(partition_spec);
-        self.partition_data = Some(partition_data);
-        self
+    ) -> Result<Self> {
+        // Compute partition constants for identity-transformed fields 
(already returns Datum)
+        let partition_constants =
+            constants_map(&partition_spec, &partition_data, 
&self.snapshot_schema)?;
+
+        // Add partition constants to constant_fields
+        for (field_id, datum) in partition_constants {
+            self.constant_fields.insert(field_id, datum);
+        }
+
+        Ok(self)
     }
 
     pub(crate) fn build(self) -> RecordBatchTransformer {
         RecordBatchTransformer {
             snapshot_schema: self.snapshot_schema,
             projected_iceberg_field_ids: self.projected_iceberg_field_ids,
-            partition_spec: self.partition_spec,
-            partition_data: self.partition_data,
+            constant_fields: self.constant_fields,
             batch_transform: None,
         }
     }
@@ -228,16 +288,10 @@ impl RecordBatchTransformerBuilder {
 pub(crate) struct RecordBatchTransformer {
     snapshot_schema: Arc<IcebergSchema>,
     projected_iceberg_field_ids: Vec<i32>,
-
-    /// Partition spec for identifying identity-transformed partition columns 
(spec rule #1).
-    /// Only fields with identity transforms use partition data constants; 
non-identity transforms
-    /// (bucket, truncate, etc.) must read source columns from data files.
-    partition_spec: Option<Arc<PartitionSpec>>,
-
-    /// Partition data providing constant values for identity-transformed 
partition columns (spec rule #1).
-    /// For example, in a file at path `dept=engineering/file.parquet`, this 
would contain
-    /// the value "engineering" for the dept field.
-    partition_data: Option<Struct>,
+    // Pre-computed constant field information: field_id -> Datum
+    // Includes both virtual/metadata fields (like _file) and 
identity-partitioned fields
+    // Datum holds both the Iceberg type and the value
+    constant_fields: HashMap<i32, Datum>,
 
     // BatchTransform gets lazily constructed based on the schema of
     // the first RecordBatch we receive from the file
@@ -279,8 +333,7 @@ impl RecordBatchTransformer {
                     record_batch.schema_ref(),
                     self.snapshot_schema.as_ref(),
                     &self.projected_iceberg_field_ids,
-                    self.partition_spec.as_ref().map(|s| s.as_ref()),
-                    self.partition_data.as_ref(),
+                    &self.constant_fields,
                 )?);
 
                 self.process_record_batch(record_batch)?
@@ -299,8 +352,7 @@ impl RecordBatchTransformer {
         source_schema: &ArrowSchemaRef,
         snapshot_schema: &IcebergSchema,
         projected_iceberg_field_ids: &[i32],
-        partition_spec: Option<&PartitionSpec>,
-        partition_data: Option<&Struct>,
+        constant_fields: &HashMap<i32, Datum>,
     ) -> Result<BatchTransform> {
         let mapped_unprojected_arrow_schema = 
Arc::new(schema_to_arrow_schema(snapshot_schema)?);
         let field_id_to_mapped_schema_map =
@@ -311,22 +363,54 @@ impl RecordBatchTransformer {
         let fields: Result<Vec<_>> = projected_iceberg_field_ids
             .iter()
             .map(|field_id| {
-                Ok(field_id_to_mapped_schema_map
-                    .get(field_id)
-                    .ok_or(Error::new(ErrorKind::Unexpected, "field not 
found"))?
-                    .0
-                    .clone())
+                // Check if this is a constant field
+                if constant_fields.contains_key(field_id) {
+                    // For metadata/virtual fields (like _file), get name from 
metadata_columns
+                    // For partition fields, get name from schema (they exist 
in schema)
+                    if let Ok(iceberg_field) = get_metadata_field(*field_id) {
+                        // This is a metadata/virtual field - convert Iceberg 
field to Arrow
+                        let datum = 
constant_fields.get(field_id).ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            "constant field not found",
+                        ))?;
+                        let arrow_type = datum_to_arrow_type_with_ree(datum);
+                        let arrow_field =
+                            Field::new(&iceberg_field.name, arrow_type, 
!iceberg_field.required)
+                                .with_metadata(HashMap::from([(
+                                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                                    iceberg_field.id.to_string(),
+                                )]));
+                        Ok(Arc::new(arrow_field))
+                    } else {
+                        // This is a partition constant field (exists in 
schema but uses constant value)
+                        let field = &field_id_to_mapped_schema_map
+                            .get(field_id)
+                            .ok_or(Error::new(ErrorKind::Unexpected, "field 
not found"))?
+                            .0;
+                        let datum = 
constant_fields.get(field_id).ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            "constant field not found",
+                        ))?;
+                        let arrow_type = datum_to_arrow_type_with_ree(datum);
+                        // Use the type from constant_fields (REE for 
constants)
+                        let constant_field =
+                            Field::new(field.name(), arrow_type, 
field.is_nullable())
+                                .with_metadata(field.metadata().clone());
+                        Ok(Arc::new(constant_field))
+                    }
+                } else {
+                    // Regular field - use schema as-is
+                    Ok(field_id_to_mapped_schema_map
+                        .get(field_id)
+                        .ok_or(Error::new(ErrorKind::Unexpected, "field not 
found"))?
+                        .0
+                        .clone())
+                }
             })
             .collect();
 
         let target_schema = Arc::new(ArrowSchema::new(fields?));
 
-        let constants_map = if let (Some(spec), Some(data)) = (partition_spec, 
partition_data) {
-            constants_map(spec, data)
-        } else {
-            HashMap::new()
-        };
-
         match Self::compare_schemas(source_schema, &target_schema) {
             SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
             SchemaComparison::NameChangesOnly => 
Ok(BatchTransform::ModifySchema { target_schema }),
@@ -336,8 +420,7 @@ impl RecordBatchTransformer {
                     snapshot_schema,
                     projected_iceberg_field_ids,
                     field_id_to_mapped_schema_map,
-                    constants_map,
-                    partition_spec,
+                    constant_fields,
                 )?,
                 target_schema,
             }),
@@ -394,8 +477,7 @@ impl RecordBatchTransformer {
         snapshot_schema: &IcebergSchema,
         projected_iceberg_field_ids: &[i32],
         field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
-        constants_map: HashMap<i32, PrimitiveLiteral>,
-        _partition_spec: Option<&PartitionSpec>,
+        constant_fields: &HashMap<i32, Datum>,
     ) -> Result<Vec<ColumnSource>> {
         let field_id_to_source_schema_map =
             Self::build_field_id_to_arrow_schema_map(source_schema)?;
@@ -403,6 +485,18 @@ impl RecordBatchTransformer {
         projected_iceberg_field_ids
             .iter()
             .map(|field_id| {
+                // Check if this is a constant field (metadata/virtual or 
identity-partitioned)
+                // Constant fields always use their pre-computed constant 
values, regardless of whether
+                // they exist in the Parquet file. This is per Iceberg spec 
rule #1: partition metadata
+                // is authoritative and should be preferred over file data.
+                if let Some(datum) = constant_fields.get(field_id) {
+                    let arrow_type = datum_to_arrow_type_with_ree(datum);
+                    return Ok(ColumnSource::Add {
+                        value: Some(datum.literal().clone()),
+                        target_type: arrow_type,
+                    });
+                }
+
                 let (target_field, _) =
                     field_id_to_mapped_schema_map
                         .get(field_id)
@@ -451,13 +545,8 @@ impl RecordBatchTransformer {
                 );
 
                 // Apply spec's fallback steps for "not present" fields.
-                let column_source = if let Some(constant_value) = 
constants_map.get(field_id) {
-                    // Rule #1: Identity partition constant
-                    ColumnSource::Add {
-                        value: Some(constant_value.clone()),
-                        target_type: target_type.clone(),
-                    }
-                } else if let Some(source) = field_by_id {
+                // Rule #1 (constants) is handled at the beginning of this 
function
+                let column_source = if let Some(source) = field_by_id {
                     source
                 } else {
                     // Rules #2, #3 and #4:
@@ -471,6 +560,7 @@ impl RecordBatchTransformer {
                             None
                         }
                     });
+
                     ColumnSource::Add {
                         value: default_value,
                         target_type: target_type.clone(),
@@ -539,86 +629,36 @@ impl RecordBatchTransformer {
         prim_lit: &Option<PrimitiveLiteral>,
         num_rows: usize,
     ) -> Result<ArrayRef> {
-        Ok(match (target_type, prim_lit) {
-            (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
-                Arc::new(BooleanArray::from(vec![*value; num_rows]))
-            }
-            (DataType::Boolean, None) => {
-                let vals: Vec<Option<bool>> = vec![None; num_rows];
-                Arc::new(BooleanArray::from(vals))
-            }
-            (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
-                Arc::new(Int32Array::from(vec![*value; num_rows]))
-            }
-            (DataType::Int32, None) => {
-                let vals: Vec<Option<i32>> = vec![None; num_rows];
-                Arc::new(Int32Array::from(vals))
-            }
-            (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
-                Arc::new(Date32Array::from(vec![*value; num_rows]))
-            }
-            (DataType::Date32, None) => {
-                let vals: Vec<Option<i32>> = vec![None; num_rows];
-                Arc::new(Date32Array::from(vals))
-            }
-            (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
-                Arc::new(Int64Array::from(vec![*value; num_rows]))
-            }
-            (DataType::Int64, None) => {
-                let vals: Vec<Option<i64>> = vec![None; num_rows];
-                Arc::new(Int64Array::from(vals))
-            }
-            (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
-                Arc::new(Float32Array::from(vec![value.0; num_rows]))
-            }
-            (DataType::Float32, None) => {
-                let vals: Vec<Option<f32>> = vec![None; num_rows];
-                Arc::new(Float32Array::from(vals))
-            }
-            (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
-                Arc::new(Float64Array::from(vec![value.0; num_rows]))
-            }
-            (DataType::Float64, None) => {
-                let vals: Vec<Option<f64>> = vec![None; num_rows];
-                Arc::new(Float64Array::from(vals))
-            }
-            (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
-                Arc::new(StringArray::from(vec![value.clone(); num_rows]))
-            }
-            (DataType::Utf8, None) => {
-                let vals: Vec<Option<String>> = vec![None; num_rows];
-                Arc::new(StringArray::from(vals))
-            }
-            (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
-                Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
-            }
-            (DataType::Binary, None) => {
-                let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
-                Arc::new(BinaryArray::from_opt_vec(vals))
-            }
-            (DataType::Struct(fields), None) => {
-                // Create a StructArray filled with nulls. Per Iceberg spec, 
optional struct fields
-                // default to null when added to the schema. We defer non-null 
default struct values
-                // and leave them as not implemented yet.
-                let null_arrays: Vec<ArrayRef> = fields
-                    .iter()
-                    .map(|field| Self::create_column(field.data_type(), &None, 
num_rows))
-                    .collect::<Result<Vec<_>>>()?;
-
-                Arc::new(StructArray::new(
-                    fields.clone(),
-                    null_arrays,
-                    Some(NullBuffer::new_null(num_rows)),
+        // Check if this is a RunEndEncoded type (for constant fields)
+        if let DataType::RunEndEncoded(_, values_field) = target_type {
+            // Helper to create a Run-End Encoded array
+            let create_ree_array = |values_array: ArrayRef| -> 
Result<ArrayRef> {
+                let run_ends = if num_rows == 0 {
+                    Int32Array::from(Vec::<i32>::new())
+                } else {
+                    Int32Array::from(vec![num_rows as i32])
+                };
+                Ok(Arc::new(
+                    RunArray::try_new(&run_ends, &values_array).map_err(|e| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "Failed to create RunArray for constant value",
+                        )
+                        .with_source(e)
+                    })?,
                 ))
-            }
-            (DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
-            (dt, _) => {
-                return Err(Error::new(
-                    ErrorKind::Unexpected,
-                    format!("unexpected target column type {}", dt),
-                ));
-            }
-        })
+            };
+
+            // Create the values array using the helper function
+            let values_array =
+                
create_primitive_array_single_element(values_field.data_type(), prim_lit)?;
+
+            // Wrap in Run-End Encoding
+            create_ree_array(values_array)
+        } else {
+            // Non-REE type (simple arrays for non-constant fields)
+            create_primitive_array_repeated(target_type, prim_lit, num_rows)
+        }
     }
 }
 
@@ -639,6 +679,54 @@ mod test {
     };
     use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, 
Type};
 
+    /// Helper to extract string values from either StringArray or 
RunEndEncoded<StringArray>
+    /// Returns empty string for null values
+    fn get_string_value(array: &dyn Array, index: usize) -> String {
+        if let Some(string_array) = 
array.as_any().downcast_ref::<StringArray>() {
+            if string_array.is_null(index) {
+                String::new()
+            } else {
+                string_array.value(index).to_string()
+            }
+        } else if let Some(run_array) = array
+            .as_any()
+            
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+        {
+            let values = run_array.values();
+            let string_values = values
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .expect("REE values should be StringArray");
+            // For REE, all rows have the same value (index 0 in the values 
array)
+            if string_values.is_null(0) {
+                String::new()
+            } else {
+                string_values.value(0).to_string()
+            }
+        } else {
+            panic!("Expected StringArray or RunEndEncoded<StringArray>");
+        }
+    }
+
+    /// Helper to extract int values from either Int32Array or 
RunEndEncoded<Int32Array>
+    fn get_int_value(array: &dyn Array, index: usize) -> i32 {
+        if let Some(int_array) = array.as_any().downcast_ref::<Int32Array>() {
+            int_array.value(index)
+        } else if let Some(run_array) = array
+            .as_any()
+            
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+        {
+            let values = run_array.values();
+            let int_values = values
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .expect("REE values should be Int32Array");
+            int_values.value(0)
+        } else {
+            panic!("Expected Int32Array or RunEndEncoded<Int32Array>");
+        }
+    }
+
     #[test]
     fn build_field_id_to_source_schema_map_works() {
         let arrow_schema = arrow_schema_already_same_as_target();
@@ -1137,6 +1225,7 @@ mod test {
         let mut transformer =
             RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
                 .with_partition(partition_spec, partition_data)
+                .expect("Failed to add partition constants")
                 .build();
 
         // Create a Parquet RecordBatch with actual data
@@ -1257,6 +1346,7 @@ mod test {
         let mut transformer =
             RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
                 .with_partition(partition_spec, partition_data)
+                .expect("Failed to add partition constants")
                 .build();
 
         let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
@@ -1271,30 +1361,23 @@ mod test {
         assert_eq!(result.num_columns(), 3);
         assert_eq!(result.num_rows(), 2);
 
-        let id_column = result
-            .column(0)
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        assert_eq!(id_column.value(0), 100);
-        assert_eq!(id_column.value(1), 200);
+        // Use helpers to handle both simple and REE arrays
+        assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
+        assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
 
-        let dept_column = result
-            .column(1)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        // This value MUST come from partition metadata (constant)
-        assert_eq!(dept_column.value(0), "engineering");
-        assert_eq!(dept_column.value(1), "engineering");
+        // dept column comes from partition metadata (constant) - will be REE
+        assert_eq!(
+            get_string_value(result.column(1).as_ref(), 0),
+            "engineering"
+        );
+        assert_eq!(
+            get_string_value(result.column(1).as_ref(), 1),
+            "engineering"
+        );
 
-        let name_column = result
-            .column(2)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert_eq!(name_column.value(0), "Alice");
-        assert_eq!(name_column.value(1), "Bob");
+        // name column comes from file
+        assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice");
+        assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob");
     }
 
     /// Test bucket partitioning with renamed source column.
@@ -1372,6 +1455,7 @@ mod test {
         let mut transformer =
             RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
                 .with_partition(partition_spec, partition_data)
+                .expect("Failed to add partition constants")
                 .build();
 
         // Create a Parquet RecordBatch with actual data
@@ -1476,6 +1560,7 @@ mod test {
         let mut transformer =
             RecordBatchTransformerBuilder::new(snapshot_schema, 
&projected_field_ids)
                 .with_partition(partition_spec, partition_data)
+                .expect("Failed to add partition constants")
                 .build();
 
         let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
@@ -1492,48 +1577,37 @@ mod test {
         // Verify each column demonstrates the correct spec rule:
 
         // Normal case: id from Parquet by field ID
-        let id_column = result
-            .column(0)
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        assert_eq!(id_column.value(0), 100);
-        assert_eq!(id_column.value(1), 200);
+        // Use helpers to handle both simple and REE arrays
+        assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
+        assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
+
+        // Rule #1: dept from partition metadata (identity transform) - will 
be REE
+        assert_eq!(
+            get_string_value(result.column(1).as_ref(), 0),
+            "engineering"
+        );
+        assert_eq!(
+            get_string_value(result.column(1).as_ref(), 1),
+            "engineering"
+        );
 
-        // Rule #1: dept from partition metadata (identity transform)
-        let dept_column = result
-            .column(1)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert_eq!(dept_column.value(0), "engineering");
-        assert_eq!(dept_column.value(1), "engineering");
+        // Rule #2: data from Parquet via name mapping - will be regular array
+        assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1");
+        assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2");
 
-        // Rule #2: data from Parquet via name mapping
-        let data_column = result
-            .column(2)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert_eq!(data_column.value(0), "value1");
-        assert_eq!(data_column.value(1), "value2");
-
-        // Rule #3: category from initial_default
-        let category_column = result
-            .column(3)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert_eq!(category_column.value(0), "default_category");
-        assert_eq!(category_column.value(1), "default_category");
+        // Rule #3: category from initial_default - will be REE
+        assert_eq!(
+            get_string_value(result.column(3).as_ref(), 0),
+            "default_category"
+        );
+        assert_eq!(
+            get_string_value(result.column(3).as_ref(), 1),
+            "default_category"
+        );
 
-        // Rule #4: notes is null (no default, not in Parquet, not in 
partition)
-        let notes_column = result
-            .column(4)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        assert!(notes_column.is_null(0));
-        assert!(notes_column.is_null(1));
+        // Rule #4: notes is null (no default, not in Parquet, not in 
partition) - will be REE with null
+        // For null REE arrays, we still use the helper which handles 
extraction
+        assert_eq!(get_string_value(result.column(4).as_ref(), 0), "");
+        assert_eq!(get_string_value(result.column(4).as_ref(), 1), "");
     }
 }
diff --git a/crates/iceberg/src/arrow/schema.rs 
b/crates/iceberg/src/arrow/schema.rs
index ec0135bd7..4f4f083c7 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -1019,6 +1019,60 @@ impl TryFrom<&crate::spec::Schema> for ArrowSchema {
     }
 }
 
+/// Converts a Datum (Iceberg type + primitive literal) to its corresponding 
Arrow DataType
+/// with Run-End Encoding (REE).
+///
+/// This function is used for constant fields in record batches, where all 
values are the same.
+/// Run-End Encoding provides efficient storage for such constant columns.
+///
+/// # Arguments
+/// * `datum` - The Datum to convert, which contains both type and value 
information
+///
+/// # Returns
+/// Arrow DataType with Run-End Encoding applied
+///
+/// # Example
+/// ```
+/// use iceberg::arrow::datum_to_arrow_type_with_ree;
+/// use iceberg::spec::Datum;
+///
+/// let datum = Datum::string("test_file.parquet");
+/// let ree_type = datum_to_arrow_type_with_ree(&datum);
+/// // Returns: RunEndEncoded(Int32, Utf8)
+/// ```
+pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
+    // Helper to create REE type with the given values type.
+    // Note: values field is nullable as Arrow expects this when building the
+    // final Arrow schema with `RunArray::try_new`.
+    let make_ree = |values_type: DataType| -> DataType {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, 
false));
+        let values_field = Arc::new(Field::new("values", values_type, true));
+        DataType::RunEndEncoded(run_ends_field, values_field)
+    };
+
+    // Match on the PrimitiveType from the Datum to determine the Arrow type
+    match datum.data_type() {
+        PrimitiveType::Boolean => make_ree(DataType::Boolean),
+        PrimitiveType::Int => make_ree(DataType::Int32),
+        PrimitiveType::Long => make_ree(DataType::Int64),
+        PrimitiveType::Float => make_ree(DataType::Float32),
+        PrimitiveType::Double => make_ree(DataType::Float64),
+        PrimitiveType::Date => make_ree(DataType::Date32),
+        PrimitiveType::Time => make_ree(DataType::Int64),
+        PrimitiveType::Timestamp => make_ree(DataType::Int64),
+        PrimitiveType::Timestamptz => make_ree(DataType::Int64),
+        PrimitiveType::TimestampNs => make_ree(DataType::Int64),
+        PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
+        PrimitiveType::String => make_ree(DataType::Utf8),
+        PrimitiveType::Uuid => make_ree(DataType::Binary),
+        PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
+        PrimitiveType::Binary => make_ree(DataType::Binary),
+        PrimitiveType::Decimal { precision, scale } => {
+            make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
diff --git a/crates/iceberg/src/arrow/value.rs 
b/crates/iceberg/src/arrow/value.rs
index f1cf225bb..0e0b85f07 100644
--- a/crates/iceberg/src/arrow/value.rs
+++ b/crates/iceberg/src/arrow/value.rs
@@ -15,18 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use arrow_array::{
     Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, 
FixedSizeBinaryArray,
     FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, 
LargeBinaryArray,
     LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, 
StructArray,
     Time64MicrosecondArray, TimestampMicrosecondArray, 
TimestampNanosecondArray,
 };
+use arrow_buffer::NullBuffer;
 use arrow_schema::{DataType, FieldRef};
 use uuid::Uuid;
 
 use super::get_field_id;
 use crate::spec::{
-    ListType, Literal, Map, MapType, NestedField, PartnerAccessor, 
PrimitiveType,
+    ListType, Literal, Map, MapType, NestedField, PartnerAccessor, 
PrimitiveLiteral, PrimitiveType,
     SchemaWithPartnerVisitor, Struct, StructType, Type, 
visit_struct_with_partner,
     visit_type_with_partner,
 };
@@ -617,6 +620,217 @@ pub fn arrow_primitive_to_literal(
     )
 }
 
+/// Create a single-element array from a primitive literal.
+///
+/// This is used for creating constant arrays (Run-End Encoded arrays) where 
we need
+/// a single value that represents all rows.
+pub(crate) fn create_primitive_array_single_element(
+    data_type: &DataType,
+    prim_lit: &Option<PrimitiveLiteral>,
+) -> Result<ArrayRef> {
+    match (data_type, prim_lit) {
+        (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
+            Ok(Arc::new(BooleanArray::from(vec![*v])))
+        }
+        (DataType::Boolean, None) => 
Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))),
+        (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
+            Ok(Arc::new(Int32Array::from(vec![*v])))
+        }
+        (DataType::Int32, None) => 
Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None]))),
+        (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
+            Ok(Arc::new(Date32Array::from(vec![*v])))
+        }
+        (DataType::Date32, None) => 
Ok(Arc::new(Date32Array::from(vec![Option::<i32>::None]))),
+        (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
+            Ok(Arc::new(Int64Array::from(vec![*v])))
+        }
+        (DataType::Int64, None) => 
Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
+        (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
+            Ok(Arc::new(Float32Array::from(vec![v.0])))
+        }
+        (DataType::Float32, None) => 
Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None]))),
+        (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
+            Ok(Arc::new(Float64Array::from(vec![v.0])))
+        }
+        (DataType::Float64, None) => 
Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None]))),
+        (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
+            Ok(Arc::new(StringArray::from(vec![v.as_str()])))
+        }
+        (DataType::Utf8, None) => 
Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))),
+        (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
+            Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()])))
+        }
+        (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![
+            Option::<&[u8]>::None,
+        ]))),
+        (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(v))) => {
+            Ok(Arc::new(arrow_array::Decimal128Array::from(vec![{ *v }])))
+        }
+        (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(v))) => {
+            Ok(Arc::new(arrow_array::Decimal128Array::from(vec![
+                *v as i128,
+            ])))
+        }
+        (DataType::Decimal128(_, _), None) => {
+            Ok(Arc::new(arrow_array::Decimal128Array::from(vec![
+                Option::<i128>::None,
+            ])))
+        }
+        (DataType::Struct(fields), None) => {
+            // Create a single-element StructArray with nulls
+            let null_arrays: Vec<ArrayRef> = fields
+                .iter()
+                .map(|f| {
+                    // Recursively create null arrays for struct fields
+                    // For primitive fields in structs, use simple null arrays 
(not REE within struct)
+                    match f.data_type() {
+                        DataType::Boolean => {
+                            
Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
+                                as ArrayRef)
+                        }
+                        DataType::Int32 | DataType::Date32 => {
+                            
Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None])) as ArrayRef)
+                        }
+                        DataType::Int64 => {
+                            
Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
+                        }
+                        DataType::Float32 => {
+                            
Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
+                        }
+                        DataType::Float64 => {
+                            
Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None])) as ArrayRef)
+                        }
+                        DataType::Utf8 => {
+                            
Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef)
+                        }
+                        DataType::Binary => {
+                            Ok(
+                                
Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
+                                    as ArrayRef,
+                            )
+                        }
+                        _ => Err(Error::new(
+                            ErrorKind::Unexpected,
+                            format!("Unsupported struct field type: {:?}", 
f.data_type()),
+                        )),
+                    }
+                })
+                .collect::<Result<Vec<_>>>()?;
+            Ok(Arc::new(arrow_array::StructArray::new(
+                fields.clone(),
+                null_arrays,
+                Some(arrow_buffer::NullBuffer::new_null(1)),
+            )))
+        }
+        _ => Err(Error::new(
+            ErrorKind::Unexpected,
+            format!(
+                "Unsupported constant type combination: {:?} with {:?}",
+                data_type, prim_lit
+            ),
+        )),
+    }
+}
+
+/// Create a repeated array from a primitive literal for a given number of 
rows.
+///
+/// This is used for creating non-constant arrays where we need the same value
+/// repeated for each row.
+pub(crate) fn create_primitive_array_repeated(
+    data_type: &DataType,
+    prim_lit: &Option<PrimitiveLiteral>,
+    num_rows: usize,
+) -> Result<ArrayRef> {
+    Ok(match (data_type, prim_lit) {
+        (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
+            Arc::new(BooleanArray::from(vec![*value; num_rows]))
+        }
+        (DataType::Boolean, None) => {
+            let vals: Vec<Option<bool>> = vec![None; num_rows];
+            Arc::new(BooleanArray::from(vals))
+        }
+        (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
+            Arc::new(Int32Array::from(vec![*value; num_rows]))
+        }
+        (DataType::Int32, None) => {
+            let vals: Vec<Option<i32>> = vec![None; num_rows];
+            Arc::new(Int32Array::from(vals))
+        }
+        (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
+            Arc::new(Date32Array::from(vec![*value; num_rows]))
+        }
+        (DataType::Date32, None) => {
+            let vals: Vec<Option<i32>> = vec![None; num_rows];
+            Arc::new(Date32Array::from(vals))
+        }
+        (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
+            Arc::new(Int64Array::from(vec![*value; num_rows]))
+        }
+        (DataType::Int64, None) => {
+            let vals: Vec<Option<i64>> = vec![None; num_rows];
+            Arc::new(Int64Array::from(vals))
+        }
+        (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
+            Arc::new(Float32Array::from(vec![value.0; num_rows]))
+        }
+        (DataType::Float32, None) => {
+            let vals: Vec<Option<f32>> = vec![None; num_rows];
+            Arc::new(Float32Array::from(vals))
+        }
+        (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
+            Arc::new(Float64Array::from(vec![value.0; num_rows]))
+        }
+        (DataType::Float64, None) => {
+            let vals: Vec<Option<f64>> = vec![None; num_rows];
+            Arc::new(Float64Array::from(vals))
+        }
+        (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
+            Arc::new(StringArray::from(vec![value.clone(); num_rows]))
+        }
+        (DataType::Utf8, None) => {
+            let vals: Vec<Option<String>> = vec![None; num_rows];
+            Arc::new(StringArray::from(vals))
+        }
+        (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
+            Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
+        }
+        (DataType::Binary, None) => {
+            let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
+            Arc::new(BinaryArray::from_opt_vec(vals))
+        }
+        (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(value))) => 
{
+            Arc::new(Decimal128Array::from(vec![*value; num_rows]))
+        }
+        (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(value))) 
=> {
+            Arc::new(Decimal128Array::from(vec![*value as i128; num_rows]))
+        }
+        (DataType::Decimal128(_, _), None) => {
+            let vals: Vec<Option<i128>> = vec![None; num_rows];
+            Arc::new(Decimal128Array::from(vals))
+        }
+        (DataType::Struct(fields), None) => {
+            // Create a StructArray filled with nulls
+            let null_arrays: Vec<ArrayRef> = fields
+                .iter()
+                .map(|field| 
create_primitive_array_repeated(field.data_type(), &None, num_rows))
+                .collect::<Result<Vec<_>>>()?;
+
+            Arc::new(StructArray::new(
+                fields.clone(),
+                null_arrays,
+                Some(NullBuffer::new_null(num_rows)),
+            ))
+        }
+        (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)),
+        (dt, _) => {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("unexpected target column type {}", dt),
+            ));
+        }
+    })
+}
+
 #[cfg(test)]
 mod test {
     use std::collections::HashMap;
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index aae8efed7..8d8f40f72 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -96,4 +96,5 @@ mod utils;
 pub mod writer;
 
 mod delete_vector;
+pub mod metadata_columns;
 pub mod puffin;
diff --git a/crates/iceberg/src/metadata_columns.rs 
b/crates/iceberg/src/metadata_columns.rs
new file mode 100644
index 000000000..b11b5cadb
--- /dev/null
+++ b/crates/iceberg/src/metadata_columns.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metadata columns (virtual/reserved fields) for Iceberg tables.
+//!
+//! This module defines metadata columns that can be requested in projections
+//! but are not stored in data files. Instead, they are computed on-the-fly
+//! during reading. Examples include the _file column (file path) and future
+//! columns like partition values or row numbers.
+
+use std::sync::Arc;
+
+use once_cell::sync::Lazy;
+
+use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type};
+use crate::{Error, ErrorKind, Result};
+
+/// Reserved field ID for the file path (_file) column per Iceberg spec
+pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1;
+
+/// Reserved column name for the file path metadata column
+pub const RESERVED_COL_NAME_FILE: &str = "_file";
+
+/// Documentation for the _file metadata column
+pub const RESERVED_COL_DOC_FILE: &str = "Path of the file in which a row is 
stored";
+
+/// Lazy-initialized Iceberg field definition for the _file metadata column.
+/// This field represents the file path as a required string field.
+static FILE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
+    Arc::new(
+        NestedField::required(
+            RESERVED_FIELD_ID_FILE,
+            RESERVED_COL_NAME_FILE,
+            Type::Primitive(PrimitiveType::String),
+        )
+        .with_doc(RESERVED_COL_DOC_FILE),
+    )
+});
+
+/// Returns the Iceberg field definition for the _file metadata column.
+///
+/// # Returns
+/// A reference to the _file field definition as an Iceberg NestedField
+pub fn file_field() -> &'static NestedFieldRef {
+    &FILE_FIELD
+}
+
+/// Returns the Iceberg field definition for a metadata field ID.
+///
+/// # Arguments
+/// * `field_id` - The metadata field ID
+///
+/// # Returns
+/// The Iceberg field definition for the metadata column, or an error if not a 
metadata field
+pub fn get_metadata_field(field_id: i32) -> Result<NestedFieldRef> {
+    match field_id {
+        RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())),
+        _ if is_metadata_field(field_id) => {
+            // Future metadata fields can be added here
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                format!(
+                    "Metadata field ID {} recognized but field definition not 
implemented",
+                    field_id
+                ),
+            ))
+        }
+        _ => Err(Error::new(
+            ErrorKind::Unexpected,
+            format!("Field ID {} is not a metadata field", field_id),
+        )),
+    }
+}
+
+/// Returns the field ID for a metadata column name.
+///
+/// # Arguments
+/// * `column_name` - The metadata column name
+///
+/// # Returns
+/// The field ID of the metadata column, or an error if the column name is not 
recognized
+pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
+    match column_name {
+        RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE),
+        _ => Err(Error::new(
+            ErrorKind::Unexpected,
+            format!("Unknown/unsupported metadata column name: {column_name}"),
+        )),
+    }
+}
+
+/// Checks if a field ID is a metadata field.
+///
+/// # Arguments
+/// * `field_id` - The field ID to check
+///
+/// # Returns
+/// `true` if the field ID is a (currently supported) metadata field, `false` 
otherwise
+pub fn is_metadata_field(field_id: i32) -> bool {
+    field_id == RESERVED_FIELD_ID_FILE
+    // Additional metadata fields can be checked here in the future
+}
+
+/// Checks if a column name is a metadata column.
+///
+/// # Arguments
+/// * `column_name` - The column name to check
+///
+/// # Returns
+/// `true` if the column name is a metadata column, `false` otherwise
+pub fn is_metadata_column_name(column_name: &str) -> bool {
+    get_metadata_field_id(column_name).is_ok()
+}
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 3e319ca06..24c03b0b2 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -36,6 +36,7 @@ use crate::delete_file_index::DeleteFileIndex;
 use 
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
 use crate::expr::{Bind, BoundPredicate, Predicate};
 use crate::io::FileIO;
+use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
 use crate::runtime::spawn;
 use crate::spec::{DataContentType, SnapshotRef};
 use crate::table::Table;
@@ -217,9 +218,13 @@ impl<'a> TableScanBuilder<'a> {
 
         let schema = snapshot.schema(self.table.metadata())?;
 
-        // Check that all column names exist in the schema.
+        // Check that all column names exist in the schema (skip reserved 
columns).
         if let Some(column_names) = self.column_names.as_ref() {
             for column_name in column_names {
+                // Skip reserved columns that don't exist in the schema
+                if is_metadata_column_name(column_name) {
+                    continue;
+                }
                 if schema.field_by_name(column_name).is_none() {
                     return Err(Error::new(
                         ErrorKind::DataInvalid,
@@ -240,6 +245,12 @@ impl<'a> TableScanBuilder<'a> {
         });
 
         for column_name in column_names.iter() {
+            // Handle metadata columns (like "_file")
+            if is_metadata_column_name(column_name) {
+                field_ids.push(get_metadata_field_id(column_name)?);
+                continue;
+            }
+
             let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
                 Error::new(
                     ErrorKind::DataInvalid,
@@ -254,10 +265,10 @@ impl<'a> TableScanBuilder<'a> {
                     Error::new(
                         ErrorKind::FeatureUnsupported,
                         format!(
-                            "Column {column_name} is not a direct child of 
schema but a nested field, which is not supported now. Schema: {schema}"
-                        ),
-                    )
-                })?;
+                        "Column {column_name} is not a direct child of schema 
but a nested field, which is not supported now. Schema: {schema}"
+                    ),
+                )
+            })?;
 
             field_ids.push(field_id);
         }
@@ -559,8 +570,10 @@ pub mod tests {
     use std::fs::File;
     use std::sync::Arc;
 
+    use arrow_array::cast::AsArray;
     use arrow_array::{
-        ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, 
RecordBatch, StringArray,
+        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, 
RecordBatch,
+        StringArray,
     };
     use futures::{TryStreamExt, stream};
     use minijinja::value::Value;
@@ -575,6 +588,7 @@ pub mod tests {
     use crate::arrow::ArrowReaderBuilder;
     use crate::expr::{BoundPredicate, Reference};
     use crate::io::{FileIO, OutputFile};
+    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
     use crate::scan::FileScanTask;
     use crate::spec::{
         DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, 
ManifestEntry,
@@ -1800,4 +1814,319 @@ pub mod tests {
         };
         test_fn(task);
     }
+
+    #[tokio::test]
+    async fn test_select_with_file_column() {
+        use arrow_array::cast::AsArray;
+
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select regular columns plus the _file column
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Verify we have 2 columns: x and _file
+        assert_eq!(batches[0].num_columns(), 2);
+
+        // Verify the x column exists and has correct data
+        let x_col = batches[0].column_by_name("x").unwrap();
+        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
+        assert_eq!(x_arr.value(0), 1);
+
+        // Verify the _file column exists
+        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
+        assert!(
+            file_col.is_some(),
+            "_file column should be present in the batch"
+        );
+
+        // Verify the _file column contains a file path
+        let file_col = file_col.unwrap();
+        assert!(
+            matches!(
+                file_col.data_type(),
+                arrow_schema::DataType::RunEndEncoded(_, _)
+            ),
+            "_file column should use RunEndEncoded type"
+        );
+
+        // Decode the RunArray to verify it contains the file path
+        let run_array = file_col
+            .as_any()
+            
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+            .expect("_file column should be a RunArray");
+
+        let values = run_array.values();
+        let string_values = values.as_string::<i32>();
+        assert_eq!(string_values.len(), 1, "Should have a single file path");
+
+        let file_path = string_values.value(0);
+        assert!(
+            file_path.ends_with(".parquet"),
+            "File path should end with .parquet, got: {}",
+            file_path
+        );
+    }
+
+    #[tokio::test]
+    async fn test_select_file_column_position() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select columns in specific order: x, _file, z
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", RESERVED_COL_NAME_FILE, "z"])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_columns(), 3);
+
+        // Verify column order: x at position 0, _file at position 1, z at 
position 2
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), "x");
+        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
+        assert_eq!(schema.field(2).name(), "z");
+
+        // Verify columns by name also works
+        assert!(batches[0].column_by_name("x").is_some());
+        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
+        assert!(batches[0].column_by_name("z").is_some());
+    }
+
+    #[tokio::test]
+    async fn test_select_file_column_only() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select only the _file column
+        let table_scan = fixture
+            .table
+            .scan()
+            .select([RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Should have exactly 1 column
+        assert_eq!(batches[0].num_columns(), 1);
+
+        // Verify it's the _file column
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
+
+        // Verify the batch has the correct number of rows
+        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
+        // Each file has 1024 rows, so total is 2048 rows
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 2048);
+    }
+
+    #[tokio::test]
+    async fn test_file_column_with_multiple_files() {
+        use std::collections::HashSet;
+
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select x and _file columns
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Collect all unique file paths from the batches
+        let mut file_paths = HashSet::new();
+        for batch in &batches {
+            let file_col = 
batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
+            let run_array = file_col
+                .as_any()
+                
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+                .expect("_file column should be a RunArray");
+
+            let values = run_array.values();
+            let string_values = values.as_string::<i32>();
+            for i in 0..string_values.len() {
+                file_paths.insert(string_values.value(i).to_string());
+            }
+        }
+
+        // We should have multiple files (the test creates 1.parquet and 
3.parquet)
+        assert!(!file_paths.is_empty(), "Should have at least one file path");
+
+        // All paths should end with .parquet
+        for path in &file_paths {
+            assert!(
+                path.ends_with(".parquet"),
+                "All file paths should end with .parquet, got: {}",
+                path
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_file_column_at_start() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select _file at the start
+        let table_scan = fixture
+            .table
+            .scan()
+            .select([RESERVED_COL_NAME_FILE, "x", "y"])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_columns(), 3);
+
+        // Verify _file is at position 0
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
+        assert_eq!(schema.field(1).name(), "x");
+        assert_eq!(schema.field(2).name(), "y");
+    }
+
+    #[tokio::test]
+    async fn test_file_column_at_end() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select _file at the end
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", "y", RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_columns(), 3);
+
+        // Verify _file is at position 2 (the end)
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), "x");
+        assert_eq!(schema.field(1).name(), "y");
+        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
+    }
+
+    #[tokio::test]
+    async fn test_select_with_repeated_column_names() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select with repeated column names - both regular columns and 
virtual columns
+        // Repeated columns should appear multiple times in the result 
(duplicates are allowed)
+        let table_scan = fixture
+            .table
+            .scan()
+            .select([
+                "x",
+                RESERVED_COL_NAME_FILE,
+                "x", // x repeated
+                "y",
+                RESERVED_COL_NAME_FILE, // _file repeated
+                "y",                    // y repeated
+            ])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Verify we have exactly 6 columns (duplicates are allowed and 
preserved)
+        assert_eq!(
+            batches[0].num_columns(),
+            6,
+            "Should have exactly 6 columns with duplicates"
+        );
+
+        let schema = batches[0].schema();
+
+        // Verify columns appear in the exact order requested: x, _file, x, y, 
_file, y
+        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
+        assert_eq!(
+            schema.field(1).name(),
+            RESERVED_COL_NAME_FILE,
+            "Column 1 should be _file"
+        );
+        assert_eq!(
+            schema.field(2).name(),
+            "x",
+            "Column 2 should be x (duplicate)"
+        );
+        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
+        assert_eq!(
+            schema.field(4).name(),
+            RESERVED_COL_NAME_FILE,
+            "Column 4 should be _file (duplicate)"
+        );
+        assert_eq!(
+            schema.field(5).name(),
+            "y",
+            "Column 5 should be y (duplicate)"
+        );
+
+        // Verify all columns have correct data types
+        assert!(
+            matches!(schema.field(0).data_type(), 
arrow_schema::DataType::Int64),
+            "Column x should be Int64"
+        );
+        assert!(
+            matches!(schema.field(2).data_type(), 
arrow_schema::DataType::Int64),
+            "Column x (duplicate) should be Int64"
+        );
+        assert!(
+            matches!(schema.field(3).data_type(), 
arrow_schema::DataType::Int64),
+            "Column y should be Int64"
+        );
+        assert!(
+            matches!(schema.field(5).data_type(), 
arrow_schema::DataType::Int64),
+            "Column y (duplicate) should be Int64"
+        );
+        assert!(
+            matches!(
+                schema.field(1).data_type(),
+                arrow_schema::DataType::RunEndEncoded(_, _)
+            ),
+            "_file column should use RunEndEncoded type"
+        );
+        assert!(
+            matches!(
+                schema.field(4).data_type(),
+                arrow_schema::DataType::RunEndEncoded(_, _)
+            ),
+            "_file column (duplicate) should use RunEndEncoded type"
+        );
+    }
 }

Reply via email to