This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1384a4f2d feat(core): Add support for `_file` column (#1824)
1384a4f2d is described below
commit 1384a4f2d71ed16b73f3b1f139d5dbd4e5035428
Author: Gerald Berger <[email protected]>
AuthorDate: Tue Dec 9 11:46:49 2025 +0100
feat(core): Add support for `_file` column (#1824)
## Which issue does this PR close?
- Closes #1766.
## What changes are included in this PR?
Integrates virtual field handling for the `_file` metadata column into
`RecordBatchTransformer` using a pre-computed constants map, eliminating
post-processing and duplicate lookups.
## Key Changes
**New `metadata_columns.rs` module**: Centralized utilities for metadata
columns
- Constants: `RESERVED_FIELD_ID_FILE`, `RESERVED_COL_NAME_FILE`
- Helper functions: `get_metadata_column_name()`,
`get_metadata_field_id()`, `is_metadata_field()`,
`is_metadata_column_name()`
**Enhanced `RecordBatchTransformer`**:
- Added `constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>` -
pre-computed during initialization
- New `with_constant()` method - computes Arrow type once during setup
- Updated to use pre-computed types and values (avoids duplicate
lookups)
- Handles `DataType::RunEndEncoded` for constant strings (memory
efficient)
**Simplified `reader.rs`**:
- Pass full `project_field_ids` (including virtual) to
RecordBatchTransformer
- Single `with_constant()` call to register `_file` column
- Removed post-processing loop
**Updated `scan/mod.rs`**:
- Use `is_metadata_column_name()` and `get_metadata_field_id()` instead
of hardcoded checks
## Are these changes tested?
Yes, comprehensive tests have been added to verify the functionality:
### New Tests (7 tests added)
#### Table Scan API Tests (7 tests)
1. **`test_select_with_file_column`** - Verifies basic functionality of
selecting `_file` with regular columns
2. **`test_select_file_column_position`** - Verifies column ordering is
preserved
3. **`test_select_file_column_only`** - Tests selecting only the `_file`
column
4. **`test_file_column_with_multiple_files`** - Tests multiple data
files scenario
5. **`test_file_column_at_start`** - Tests `_file` at position 0
6. **`test_file_column_at_end`** - Tests `_file` at the last position
7. **`test_select_with_repeated_column_names`** - Tests repeated column
selection
---
crates/iceberg/src/arrow/reader.rs | 29 +-
.../iceberg/src/arrow/record_batch_transformer.rs | 476 ++++++++++++---------
crates/iceberg/src/arrow/schema.rs | 54 +++
crates/iceberg/src/arrow/value.rs | 216 +++++++++-
crates/iceberg/src/lib.rs | 1 +
crates/iceberg/src/metadata_columns.rs | 127 ++++++
crates/iceberg/src/scan/mod.rs | 341 ++++++++++++++-
7 files changed, 1031 insertions(+), 213 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index ab5a96f75..de8a1420e 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -54,6 +54,7 @@ use
crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
+use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema,
Type};
use crate::utils::available_parallelism;
@@ -250,12 +251,20 @@ impl ArrowReader {
initial_stream_builder
};
+ // Filter out metadata fields for Parquet projection (they don't exist
in files)
+ let project_field_ids_without_metadata: Vec<i32> = task
+ .project_field_ids
+ .iter()
+ .filter(|&&id| !is_metadata_field(id))
+ .copied()
+ .collect();
+
// Create projection mask based on field IDs
// - If file has embedded IDs: field-ID-based projection
(missing_field_ids=false)
// - If name mapping applied: field-ID-based projection
(missing_field_ids=true but IDs now match)
// - If fallback IDs: position-based projection
(missing_field_ids=true)
let projection_mask = Self::get_arrow_projection_mask(
- &task.project_field_ids,
+ &project_field_ids_without_metadata,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
@@ -266,16 +275,23 @@ impl ArrowReader {
record_batch_stream_builder.with_projection(projection_mask.clone());
// RecordBatchTransformer performs any transformations required on the
RecordBatches
- // that come back from the file, such as type promotion, default
column insertion
- // and column re-ordering.
+ // that come back from the file, such as type promotion, default
column insertion,
+ // column re-ordering, partition constants, and virtual field addition
(like _file)
let mut record_batch_transformer_builder =
RecordBatchTransformerBuilder::new(task.schema_ref(),
task.project_field_ids());
+ // Add the _file metadata column if it's in the projected fields
+ if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
+ let file_datum = Datum::string(task.data_file_path.clone());
+ record_batch_transformer_builder =
+
record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE,
file_datum);
+ }
+
if let (Some(partition_spec), Some(partition_data)) =
(task.partition_spec.clone(), task.partition.clone())
{
record_batch_transformer_builder =
-
record_batch_transformer_builder.with_partition(partition_spec, partition_data);
+
record_batch_transformer_builder.with_partition(partition_spec,
partition_data)?;
}
let mut record_batch_transformer =
record_batch_transformer_builder.build();
@@ -416,7 +432,10 @@ impl ArrowReader {
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
- Ok(batch) =>
record_batch_transformer.process_record_batch(batch),
+ Ok(batch) => {
+ // Process the record batch (type promotion, column
reordering, virtual fields, etc.)
+ record_batch_transformer.process_record_batch(batch)
+ }
Err(err) => Err(err.into()),
});
diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index a20adb6a5..f30d4a09c 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -19,24 +19,23 @@ use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{
- Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Float32Array,
- Float64Array, Int32Array, Int64Array, NullArray, RecordBatch,
RecordBatchOptions, StringArray,
- StructArray,
+ Array as ArrowArray, ArrayRef, Int32Array, RecordBatch,
RecordBatchOptions, RunArray,
};
-use arrow_buffer::NullBuffer;
use arrow_cast::cast;
use arrow_schema::{
- DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
SchemaRef,
+ DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as
ArrowSchemaRef, SchemaRef,
};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
-use crate::arrow::schema_to_arrow_schema;
+use crate::arrow::value::{create_primitive_array_repeated,
create_primitive_array_single_element};
+use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema};
+use crate::metadata_columns::get_metadata_field;
use crate::spec::{
- Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct,
Transform,
+ Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema,
Struct, Transform,
};
use crate::{Error, ErrorKind, Result};
-/// Build a map of field ID to constant value for identity-partitioned fields.
+/// Build a map of field ID to constant value (as Datum) for
identity-partitioned fields.
///
/// Implements Iceberg spec "Column Projection" rule #1: use partition
metadata constants
/// only for identity-transformed fields. Non-identity transforms (bucket,
truncate, year, etc.)
@@ -53,20 +52,65 @@ use crate::{Error, ErrorKind, Result};
fn constants_map(
partition_spec: &PartitionSpec,
partition_data: &Struct,
-) -> HashMap<i32, PrimitiveLiteral> {
+ schema: &IcebergSchema,
+) -> Result<HashMap<i32, Datum>> {
let mut constants = HashMap::new();
for (pos, field) in partition_spec.fields().iter().enumerate() {
// Only identity transforms should use constant values from partition
metadata
if matches!(field.transform, Transform::Identity) {
+ // Get the field from schema to extract its type
+ let iceberg_field =
schema.field_by_id(field.source_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ format!("Field {} not found in schema", field.source_id),
+ ))?;
+
+ // Ensure the field type is primitive
+ let prim_type = match &*iceberg_field.field_type {
+ crate::spec::Type::Primitive(prim_type) => prim_type,
+ _ => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Partition field {} has non-primitive type {:?}",
+ field.source_id, iceberg_field.field_type
+ ),
+ ));
+ }
+ };
+
// Get the partition value for this field
- if let Some(Literal::Primitive(value)) = &partition_data[pos] {
- constants.insert(field.source_id, value.clone());
+ // Handle both None (null) and Some(Literal::Primitive) cases
+ match &partition_data[pos] {
+ None => {
+ // TODO
(https://github.com/apache/iceberg-rust/issues/1914): Add support for null
datum values.
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Partition field {} has null value for identity
transform",
+ field.source_id
+ ),
+ ));
+ }
+ Some(Literal::Primitive(value)) => {
+ // Create a Datum from the primitive type and value
+ let datum = Datum::new(prim_type.clone(), value.clone());
+ constants.insert(field.source_id, datum);
+ }
+ Some(literal) => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Partition field {} has non-primitive value: {:?}",
+ field.source_id, literal
+ ),
+ ));
+ }
}
}
}
- constants
+ Ok(constants)
}
/// Indicates how a particular column in a processed RecordBatch should
@@ -146,13 +190,13 @@ enum SchemaComparison {
/// Builder for RecordBatchTransformer to improve ergonomics when constructing
with optional parameters.
///
-/// See [`RecordBatchTransformer`] for details on partition spec and partition
data.
+/// Constant fields are pre-computed for both virtual/metadata fields (like
_file) and
+/// identity-partitioned fields to avoid duplicate work during batch
processing.
#[derive(Debug)]
pub(crate) struct RecordBatchTransformerBuilder {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: Vec<i32>,
- partition_spec: Option<Arc<PartitionSpec>>,
- partition_data: Option<Struct>,
+ constant_fields: HashMap<i32, Datum>,
}
impl RecordBatchTransformerBuilder {
@@ -163,32 +207,48 @@ impl RecordBatchTransformerBuilder {
Self {
snapshot_schema,
projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
- partition_spec: None,
- partition_data: None,
+ constant_fields: HashMap::new(),
}
}
+ /// Add a constant value for a specific field ID.
+ /// This is used for virtual/metadata fields like _file that have constant
values per batch.
+ ///
+ /// # Arguments
+ /// * `field_id` - The field ID to associate with the constant
+ /// * `datum` - The constant value (with type) for this field
+ pub(crate) fn with_constant(mut self, field_id: i32, datum: Datum) -> Self
{
+ self.constant_fields.insert(field_id, datum);
+ self
+ }
+
/// Set partition spec and data together for identifying
identity-transformed partition columns.
///
/// Both partition_spec and partition_data must be provided together since
the spec defines
/// which fields are identity-partitioned, and the data provides their
constant values.
- /// One without the other cannot produce a valid constants map.
+ /// This method computes the partition constants and merges them into
constant_fields.
pub(crate) fn with_partition(
mut self,
partition_spec: Arc<PartitionSpec>,
partition_data: Struct,
- ) -> Self {
- self.partition_spec = Some(partition_spec);
- self.partition_data = Some(partition_data);
- self
+ ) -> Result<Self> {
+ // Compute partition constants for identity-transformed fields
(already returns Datum)
+ let partition_constants =
+ constants_map(&partition_spec, &partition_data,
&self.snapshot_schema)?;
+
+ // Add partition constants to constant_fields
+ for (field_id, datum) in partition_constants {
+ self.constant_fields.insert(field_id, datum);
+ }
+
+ Ok(self)
}
pub(crate) fn build(self) -> RecordBatchTransformer {
RecordBatchTransformer {
snapshot_schema: self.snapshot_schema,
projected_iceberg_field_ids: self.projected_iceberg_field_ids,
- partition_spec: self.partition_spec,
- partition_data: self.partition_data,
+ constant_fields: self.constant_fields,
batch_transform: None,
}
}
@@ -228,16 +288,10 @@ impl RecordBatchTransformerBuilder {
pub(crate) struct RecordBatchTransformer {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: Vec<i32>,
-
- /// Partition spec for identifying identity-transformed partition columns
(spec rule #1).
- /// Only fields with identity transforms use partition data constants;
non-identity transforms
- /// (bucket, truncate, etc.) must read source columns from data files.
- partition_spec: Option<Arc<PartitionSpec>>,
-
- /// Partition data providing constant values for identity-transformed
partition columns (spec rule #1).
- /// For example, in a file at path `dept=engineering/file.parquet`, this
would contain
- /// the value "engineering" for the dept field.
- partition_data: Option<Struct>,
+ // Pre-computed constant field information: field_id -> Datum
+ // Includes both virtual/metadata fields (like _file) and
identity-partitioned fields
+ // Datum holds both the Iceberg type and the value
+ constant_fields: HashMap<i32, Datum>,
// BatchTransform gets lazily constructed based on the schema of
// the first RecordBatch we receive from the file
@@ -279,8 +333,7 @@ impl RecordBatchTransformer {
record_batch.schema_ref(),
self.snapshot_schema.as_ref(),
&self.projected_iceberg_field_ids,
- self.partition_spec.as_ref().map(|s| s.as_ref()),
- self.partition_data.as_ref(),
+ &self.constant_fields,
)?);
self.process_record_batch(record_batch)?
@@ -299,8 +352,7 @@ impl RecordBatchTransformer {
source_schema: &ArrowSchemaRef,
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
- partition_spec: Option<&PartitionSpec>,
- partition_data: Option<&Struct>,
+ constant_fields: &HashMap<i32, Datum>,
) -> Result<BatchTransform> {
let mapped_unprojected_arrow_schema =
Arc::new(schema_to_arrow_schema(snapshot_schema)?);
let field_id_to_mapped_schema_map =
@@ -311,22 +363,54 @@ impl RecordBatchTransformer {
let fields: Result<Vec<_>> = projected_iceberg_field_ids
.iter()
.map(|field_id| {
- Ok(field_id_to_mapped_schema_map
- .get(field_id)
- .ok_or(Error::new(ErrorKind::Unexpected, "field not
found"))?
- .0
- .clone())
+ // Check if this is a constant field
+ if constant_fields.contains_key(field_id) {
+ // For metadata/virtual fields (like _file), get name from
metadata_columns
+ // For partition fields, get name from schema (they exist
in schema)
+ if let Ok(iceberg_field) = get_metadata_field(*field_id) {
+ // This is a metadata/virtual field - convert Iceberg
field to Arrow
+ let datum =
constant_fields.get(field_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "constant field not found",
+ ))?;
+ let arrow_type = datum_to_arrow_type_with_ree(datum);
+ let arrow_field =
+ Field::new(&iceberg_field.name, arrow_type,
!iceberg_field.required)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ iceberg_field.id.to_string(),
+ )]));
+ Ok(Arc::new(arrow_field))
+ } else {
+ // This is a partition constant field (exists in
schema but uses constant value)
+ let field = &field_id_to_mapped_schema_map
+ .get(field_id)
+ .ok_or(Error::new(ErrorKind::Unexpected, "field
not found"))?
+ .0;
+ let datum =
constant_fields.get(field_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "constant field not found",
+ ))?;
+ let arrow_type = datum_to_arrow_type_with_ree(datum);
+ // Use the type from constant_fields (REE for
constants)
+ let constant_field =
+ Field::new(field.name(), arrow_type,
field.is_nullable())
+ .with_metadata(field.metadata().clone());
+ Ok(Arc::new(constant_field))
+ }
+ } else {
+ // Regular field - use schema as-is
+ Ok(field_id_to_mapped_schema_map
+ .get(field_id)
+ .ok_or(Error::new(ErrorKind::Unexpected, "field not
found"))?
+ .0
+ .clone())
+ }
})
.collect();
let target_schema = Arc::new(ArrowSchema::new(fields?));
- let constants_map = if let (Some(spec), Some(data)) = (partition_spec,
partition_data) {
- constants_map(spec, data)
- } else {
- HashMap::new()
- };
-
match Self::compare_schemas(source_schema, &target_schema) {
SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
SchemaComparison::NameChangesOnly =>
Ok(BatchTransform::ModifySchema { target_schema }),
@@ -336,8 +420,7 @@ impl RecordBatchTransformer {
snapshot_schema,
projected_iceberg_field_ids,
field_id_to_mapped_schema_map,
- constants_map,
- partition_spec,
+ constant_fields,
)?,
target_schema,
}),
@@ -394,8 +477,7 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
- constants_map: HashMap<i32, PrimitiveLiteral>,
- _partition_spec: Option<&PartitionSpec>,
+ constant_fields: &HashMap<i32, Datum>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
@@ -403,6 +485,18 @@ impl RecordBatchTransformer {
projected_iceberg_field_ids
.iter()
.map(|field_id| {
+ // Check if this is a constant field (metadata/virtual or
identity-partitioned)
+ // Constant fields always use their pre-computed constant
values, regardless of whether
+ // they exist in the Parquet file. This is per Iceberg spec
rule #1: partition metadata
+ // is authoritative and should be preferred over file data.
+ if let Some(datum) = constant_fields.get(field_id) {
+ let arrow_type = datum_to_arrow_type_with_ree(datum);
+ return Ok(ColumnSource::Add {
+ value: Some(datum.literal().clone()),
+ target_type: arrow_type,
+ });
+ }
+
let (target_field, _) =
field_id_to_mapped_schema_map
.get(field_id)
@@ -451,13 +545,8 @@ impl RecordBatchTransformer {
);
// Apply spec's fallback steps for "not present" fields.
- let column_source = if let Some(constant_value) =
constants_map.get(field_id) {
- // Rule #1: Identity partition constant
- ColumnSource::Add {
- value: Some(constant_value.clone()),
- target_type: target_type.clone(),
- }
- } else if let Some(source) = field_by_id {
+ // Rule #1 (constants) is handled at the beginning of this
function
+ let column_source = if let Some(source) = field_by_id {
source
} else {
// Rules #2, #3 and #4:
@@ -471,6 +560,7 @@ impl RecordBatchTransformer {
None
}
});
+
ColumnSource::Add {
value: default_value,
target_type: target_type.clone(),
@@ -539,86 +629,36 @@ impl RecordBatchTransformer {
prim_lit: &Option<PrimitiveLiteral>,
num_rows: usize,
) -> Result<ArrayRef> {
- Ok(match (target_type, prim_lit) {
- (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
- Arc::new(BooleanArray::from(vec![*value; num_rows]))
- }
- (DataType::Boolean, None) => {
- let vals: Vec<Option<bool>> = vec![None; num_rows];
- Arc::new(BooleanArray::from(vals))
- }
- (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
- Arc::new(Int32Array::from(vec![*value; num_rows]))
- }
- (DataType::Int32, None) => {
- let vals: Vec<Option<i32>> = vec![None; num_rows];
- Arc::new(Int32Array::from(vals))
- }
- (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
- Arc::new(Date32Array::from(vec![*value; num_rows]))
- }
- (DataType::Date32, None) => {
- let vals: Vec<Option<i32>> = vec![None; num_rows];
- Arc::new(Date32Array::from(vals))
- }
- (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
- Arc::new(Int64Array::from(vec![*value; num_rows]))
- }
- (DataType::Int64, None) => {
- let vals: Vec<Option<i64>> = vec![None; num_rows];
- Arc::new(Int64Array::from(vals))
- }
- (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
- Arc::new(Float32Array::from(vec![value.0; num_rows]))
- }
- (DataType::Float32, None) => {
- let vals: Vec<Option<f32>> = vec![None; num_rows];
- Arc::new(Float32Array::from(vals))
- }
- (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
- Arc::new(Float64Array::from(vec![value.0; num_rows]))
- }
- (DataType::Float64, None) => {
- let vals: Vec<Option<f64>> = vec![None; num_rows];
- Arc::new(Float64Array::from(vals))
- }
- (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
- Arc::new(StringArray::from(vec![value.clone(); num_rows]))
- }
- (DataType::Utf8, None) => {
- let vals: Vec<Option<String>> = vec![None; num_rows];
- Arc::new(StringArray::from(vals))
- }
- (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
- Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
- }
- (DataType::Binary, None) => {
- let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
- Arc::new(BinaryArray::from_opt_vec(vals))
- }
- (DataType::Struct(fields), None) => {
- // Create a StructArray filled with nulls. Per Iceberg spec,
optional struct fields
- // default to null when added to the schema. We defer non-null
default struct values
- // and leave them as not implemented yet.
- let null_arrays: Vec<ArrayRef> = fields
- .iter()
- .map(|field| Self::create_column(field.data_type(), &None,
num_rows))
- .collect::<Result<Vec<_>>>()?;
-
- Arc::new(StructArray::new(
- fields.clone(),
- null_arrays,
- Some(NullBuffer::new_null(num_rows)),
+ // Check if this is a RunEndEncoded type (for constant fields)
+ if let DataType::RunEndEncoded(_, values_field) = target_type {
+ // Helper to create a Run-End Encoded array
+ let create_ree_array = |values_array: ArrayRef| ->
Result<ArrayRef> {
+ let run_ends = if num_rows == 0 {
+ Int32Array::from(Vec::<i32>::new())
+ } else {
+ Int32Array::from(vec![num_rows as i32])
+ };
+ Ok(Arc::new(
+ RunArray::try_new(&run_ends, &values_array).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to create RunArray for constant value",
+ )
+ .with_source(e)
+ })?,
))
- }
- (DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
- (dt, _) => {
- return Err(Error::new(
- ErrorKind::Unexpected,
- format!("unexpected target column type {}", dt),
- ));
- }
- })
+ };
+
+ // Create the values array using the helper function
+ let values_array =
+
create_primitive_array_single_element(values_field.data_type(), prim_lit)?;
+
+ // Wrap in Run-End Encoding
+ create_ree_array(values_array)
+ } else {
+ // Non-REE type (simple arrays for non-constant fields)
+ create_primitive_array_repeated(target_type, prim_lit, num_rows)
+ }
}
}
@@ -639,6 +679,54 @@ mod test {
};
use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct,
Type};
+ /// Helper to extract string values from either StringArray or
RunEndEncoded<StringArray>
+ /// Returns empty string for null values
+ fn get_string_value(array: &dyn Array, index: usize) -> String {
+ if let Some(string_array) =
array.as_any().downcast_ref::<StringArray>() {
+ if string_array.is_null(index) {
+ String::new()
+ } else {
+ string_array.value(index).to_string()
+ }
+ } else if let Some(run_array) = array
+ .as_any()
+
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+ {
+ let values = run_array.values();
+ let string_values = values
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("REE values should be StringArray");
+ // For REE, all rows have the same value (index 0 in the values
array)
+ if string_values.is_null(0) {
+ String::new()
+ } else {
+ string_values.value(0).to_string()
+ }
+ } else {
+ panic!("Expected StringArray or RunEndEncoded<StringArray>");
+ }
+ }
+
+ /// Helper to extract int values from either Int32Array or
RunEndEncoded<Int32Array>
+ fn get_int_value(array: &dyn Array, index: usize) -> i32 {
+ if let Some(int_array) = array.as_any().downcast_ref::<Int32Array>() {
+ int_array.value(index)
+ } else if let Some(run_array) = array
+ .as_any()
+
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+ {
+ let values = run_array.values();
+ let int_values = values
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("REE values should be Int32Array");
+ int_values.value(0)
+ } else {
+ panic!("Expected Int32Array or RunEndEncoded<Int32Array>");
+ }
+ }
+
#[test]
fn build_field_id_to_source_schema_map_works() {
let arrow_schema = arrow_schema_already_same_as_target();
@@ -1137,6 +1225,7 @@ mod test {
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
.with_partition(partition_spec, partition_data)
+ .expect("Failed to add partition constants")
.build();
// Create a Parquet RecordBatch with actual data
@@ -1257,6 +1346,7 @@ mod test {
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
.with_partition(partition_spec, partition_data)
+ .expect("Failed to add partition constants")
.build();
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
@@ -1271,30 +1361,23 @@ mod test {
assert_eq!(result.num_columns(), 3);
assert_eq!(result.num_rows(), 2);
- let id_column = result
- .column(0)
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap();
- assert_eq!(id_column.value(0), 100);
- assert_eq!(id_column.value(1), 200);
+ // Use helpers to handle both simple and REE arrays
+ assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
+ assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
- let dept_column = result
- .column(1)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- // This value MUST come from partition metadata (constant)
- assert_eq!(dept_column.value(0), "engineering");
- assert_eq!(dept_column.value(1), "engineering");
+ // dept column comes from partition metadata (constant) - will be REE
+ assert_eq!(
+ get_string_value(result.column(1).as_ref(), 0),
+ "engineering"
+ );
+ assert_eq!(
+ get_string_value(result.column(1).as_ref(), 1),
+ "engineering"
+ );
- let name_column = result
- .column(2)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- assert_eq!(name_column.value(0), "Alice");
- assert_eq!(name_column.value(1), "Bob");
+ // name column comes from file
+ assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice");
+ assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob");
}
/// Test bucket partitioning with renamed source column.
@@ -1372,6 +1455,7 @@ mod test {
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
.with_partition(partition_spec, partition_data)
+ .expect("Failed to add partition constants")
.build();
// Create a Parquet RecordBatch with actual data
@@ -1476,6 +1560,7 @@ mod test {
let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
.with_partition(partition_spec, partition_data)
+ .expect("Failed to add partition constants")
.build();
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
@@ -1492,48 +1577,37 @@ mod test {
// Verify each column demonstrates the correct spec rule:
// Normal case: id from Parquet by field ID
- let id_column = result
- .column(0)
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap();
- assert_eq!(id_column.value(0), 100);
- assert_eq!(id_column.value(1), 200);
+ // Use helpers to handle both simple and REE arrays
+ assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100);
+ assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200);
+
+ // Rule #1: dept from partition metadata (identity transform) - will
be REE
+ assert_eq!(
+ get_string_value(result.column(1).as_ref(), 0),
+ "engineering"
+ );
+ assert_eq!(
+ get_string_value(result.column(1).as_ref(), 1),
+ "engineering"
+ );
- // Rule #1: dept from partition metadata (identity transform)
- let dept_column = result
- .column(1)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- assert_eq!(dept_column.value(0), "engineering");
- assert_eq!(dept_column.value(1), "engineering");
+ // Rule #2: data from Parquet via name mapping - will be regular array
+ assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1");
+ assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2");
- // Rule #2: data from Parquet via name mapping
- let data_column = result
- .column(2)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- assert_eq!(data_column.value(0), "value1");
- assert_eq!(data_column.value(1), "value2");
-
- // Rule #3: category from initial_default
- let category_column = result
- .column(3)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- assert_eq!(category_column.value(0), "default_category");
- assert_eq!(category_column.value(1), "default_category");
+ // Rule #3: category from initial_default - will be REE
+ assert_eq!(
+ get_string_value(result.column(3).as_ref(), 0),
+ "default_category"
+ );
+ assert_eq!(
+ get_string_value(result.column(3).as_ref(), 1),
+ "default_category"
+ );
- // Rule #4: notes is null (no default, not in Parquet, not in
partition)
- let notes_column = result
- .column(4)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- assert!(notes_column.is_null(0));
- assert!(notes_column.is_null(1));
+ // Rule #4: notes is null (no default, not in Parquet, not in
partition) - will be REE with null
+ // For null REE arrays, we still use the helper which handles
extraction
+ assert_eq!(get_string_value(result.column(4).as_ref(), 0), "");
+ assert_eq!(get_string_value(result.column(4).as_ref(), 1), "");
}
}
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index ec0135bd7..4f4f083c7 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -1019,6 +1019,60 @@ impl TryFrom<&crate::spec::Schema> for ArrowSchema {
}
}
+/// Converts a Datum (Iceberg type + primitive literal) to its corresponding
Arrow DataType
+/// with Run-End Encoding (REE).
+///
+/// This function is used for constant fields in record batches, where all
values are the same.
+/// Run-End Encoding provides efficient storage for such constant columns.
+///
+/// # Arguments
+/// * `datum` - The Datum to convert, which contains both type and value
information
+///
+/// # Returns
+/// Arrow DataType with Run-End Encoding applied
+///
+/// # Example
+/// ```
+/// use iceberg::arrow::datum_to_arrow_type_with_ree;
+/// use iceberg::spec::Datum;
+///
+/// let datum = Datum::string("test_file.parquet");
+/// let ree_type = datum_to_arrow_type_with_ree(&datum);
+/// // Returns: RunEndEncoded(Int32, Utf8)
+/// ```
+pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
+ // Helper to create REE type with the given values type.
+ // Note: values field is nullable as Arrow expects this when building the
+ // final Arrow schema with `RunArray::try_new`.
+ let make_ree = |values_type: DataType| -> DataType {
+ let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32,
false));
+ let values_field = Arc::new(Field::new("values", values_type, true));
+ DataType::RunEndEncoded(run_ends_field, values_field)
+ };
+
+ // Match on the PrimitiveType from the Datum to determine the Arrow type
+ match datum.data_type() {
+ PrimitiveType::Boolean => make_ree(DataType::Boolean),
+ PrimitiveType::Int => make_ree(DataType::Int32),
+ PrimitiveType::Long => make_ree(DataType::Int64),
+ PrimitiveType::Float => make_ree(DataType::Float32),
+ PrimitiveType::Double => make_ree(DataType::Float64),
+ PrimitiveType::Date => make_ree(DataType::Date32),
+ PrimitiveType::Time => make_ree(DataType::Int64),
+ PrimitiveType::Timestamp => make_ree(DataType::Int64),
+ PrimitiveType::Timestamptz => make_ree(DataType::Int64),
+ PrimitiveType::TimestampNs => make_ree(DataType::Int64),
+ PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
+ PrimitiveType::String => make_ree(DataType::Utf8),
+ PrimitiveType::Uuid => make_ree(DataType::Binary),
+ PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
+ PrimitiveType::Binary => make_ree(DataType::Binary),
+ PrimitiveType::Decimal { precision, scale } => {
+ make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use std::collections::HashMap;
diff --git a/crates/iceberg/src/arrow/value.rs
b/crates/iceberg/src/arrow/value.rs
index f1cf225bb..0e0b85f07 100644
--- a/crates/iceberg/src/arrow/value.rs
+++ b/crates/iceberg/src/arrow/value.rs
@@ -15,18 +15,21 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
FixedSizeBinaryArray,
FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
LargeBinaryArray,
LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
StructArray,
Time64MicrosecondArray, TimestampMicrosecondArray,
TimestampNanosecondArray,
};
+use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, FieldRef};
use uuid::Uuid;
use super::get_field_id;
use crate::spec::{
- ListType, Literal, Map, MapType, NestedField, PartnerAccessor,
PrimitiveType,
+ ListType, Literal, Map, MapType, NestedField, PartnerAccessor,
PrimitiveLiteral, PrimitiveType,
SchemaWithPartnerVisitor, Struct, StructType, Type,
visit_struct_with_partner,
visit_type_with_partner,
};
@@ -617,6 +620,217 @@ pub fn arrow_primitive_to_literal(
)
}
+/// Create a single-element array from a primitive literal.
+///
+/// This is used for creating constant arrays (Run-End Encoded arrays) where
we need
+/// a single value that represents all rows.
+pub(crate) fn create_primitive_array_single_element(
+ data_type: &DataType,
+ prim_lit: &Option<PrimitiveLiteral>,
+) -> Result<ArrayRef> {
+ match (data_type, prim_lit) {
+ (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
+ Ok(Arc::new(BooleanArray::from(vec![*v])))
+ }
+ (DataType::Boolean, None) =>
Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))),
+ (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
+ Ok(Arc::new(Int32Array::from(vec![*v])))
+ }
+ (DataType::Int32, None) =>
Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None]))),
+ (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
+ Ok(Arc::new(Date32Array::from(vec![*v])))
+ }
+ (DataType::Date32, None) =>
Ok(Arc::new(Date32Array::from(vec![Option::<i32>::None]))),
+ (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
+ Ok(Arc::new(Int64Array::from(vec![*v])))
+ }
+ (DataType::Int64, None) =>
Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
+ (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
+ Ok(Arc::new(Float32Array::from(vec![v.0])))
+ }
+ (DataType::Float32, None) =>
Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None]))),
+ (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
+ Ok(Arc::new(Float64Array::from(vec![v.0])))
+ }
+ (DataType::Float64, None) =>
Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None]))),
+ (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
+ Ok(Arc::new(StringArray::from(vec![v.as_str()])))
+ }
+ (DataType::Utf8, None) =>
Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))),
+ (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
+ Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()])))
+ }
+ (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![
+ Option::<&[u8]>::None,
+ ]))),
+ (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(v))) => {
+ Ok(Arc::new(arrow_array::Decimal128Array::from(vec![{ *v }])))
+ }
+ (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(v))) => {
+ Ok(Arc::new(arrow_array::Decimal128Array::from(vec![
+ *v as i128,
+ ])))
+ }
+ (DataType::Decimal128(_, _), None) => {
+ Ok(Arc::new(arrow_array::Decimal128Array::from(vec![
+ Option::<i128>::None,
+ ])))
+ }
+ (DataType::Struct(fields), None) => {
+ // Create a single-element StructArray with nulls
+ let null_arrays: Vec<ArrayRef> = fields
+ .iter()
+ .map(|f| {
+ // Recursively create null arrays for struct fields
+ // For primitive fields in structs, use simple null arrays
(not REE within struct)
+ match f.data_type() {
+ DataType::Boolean => {
+
Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
+ as ArrayRef)
+ }
+ DataType::Int32 | DataType::Date32 => {
+
Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None])) as ArrayRef)
+ }
+ DataType::Int64 => {
+
Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
+ }
+ DataType::Float32 => {
+
Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
+ }
+ DataType::Float64 => {
+
Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None])) as ArrayRef)
+ }
+ DataType::Utf8 => {
+
Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef)
+ }
+ DataType::Binary => {
+ Ok(
+
Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
+ as ArrayRef,
+ )
+ }
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Unsupported struct field type: {:?}",
f.data_type()),
+ )),
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Ok(Arc::new(arrow_array::StructArray::new(
+ fields.clone(),
+ null_arrays,
+ Some(arrow_buffer::NullBuffer::new_null(1)),
+ )))
+ }
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Unsupported constant type combination: {:?} with {:?}",
+ data_type, prim_lit
+ ),
+ )),
+ }
+}
+
+/// Create a repeated array from a primitive literal for a given number of
rows.
+///
+/// This is used for creating non-constant arrays where we need the same value
+/// repeated for each row.
+pub(crate) fn create_primitive_array_repeated(
+ data_type: &DataType,
+ prim_lit: &Option<PrimitiveLiteral>,
+ num_rows: usize,
+) -> Result<ArrayRef> {
+ Ok(match (data_type, prim_lit) {
+ (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
+ Arc::new(BooleanArray::from(vec![*value; num_rows]))
+ }
+ (DataType::Boolean, None) => {
+ let vals: Vec<Option<bool>> = vec![None; num_rows];
+ Arc::new(BooleanArray::from(vals))
+ }
+ (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
+ Arc::new(Int32Array::from(vec![*value; num_rows]))
+ }
+ (DataType::Int32, None) => {
+ let vals: Vec<Option<i32>> = vec![None; num_rows];
+ Arc::new(Int32Array::from(vals))
+ }
+ (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
+ Arc::new(Date32Array::from(vec![*value; num_rows]))
+ }
+ (DataType::Date32, None) => {
+ let vals: Vec<Option<i32>> = vec![None; num_rows];
+ Arc::new(Date32Array::from(vals))
+ }
+ (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
+ Arc::new(Int64Array::from(vec![*value; num_rows]))
+ }
+ (DataType::Int64, None) => {
+ let vals: Vec<Option<i64>> = vec![None; num_rows];
+ Arc::new(Int64Array::from(vals))
+ }
+ (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
+ Arc::new(Float32Array::from(vec![value.0; num_rows]))
+ }
+ (DataType::Float32, None) => {
+ let vals: Vec<Option<f32>> = vec![None; num_rows];
+ Arc::new(Float32Array::from(vals))
+ }
+ (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
+ Arc::new(Float64Array::from(vec![value.0; num_rows]))
+ }
+ (DataType::Float64, None) => {
+ let vals: Vec<Option<f64>> = vec![None; num_rows];
+ Arc::new(Float64Array::from(vals))
+ }
+ (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
+ Arc::new(StringArray::from(vec![value.clone(); num_rows]))
+ }
+ (DataType::Utf8, None) => {
+ let vals: Vec<Option<String>> = vec![None; num_rows];
+ Arc::new(StringArray::from(vals))
+ }
+ (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
+ Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
+ }
+ (DataType::Binary, None) => {
+ let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
+ Arc::new(BinaryArray::from_opt_vec(vals))
+ }
+ (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(value))) =>
{
+ Arc::new(Decimal128Array::from(vec![*value; num_rows]))
+ }
+ (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(value)))
=> {
+ Arc::new(Decimal128Array::from(vec![*value as i128; num_rows]))
+ }
+ (DataType::Decimal128(_, _), None) => {
+ let vals: Vec<Option<i128>> = vec![None; num_rows];
+ Arc::new(Decimal128Array::from(vals))
+ }
+ (DataType::Struct(fields), None) => {
+ // Create a StructArray filled with nulls
+ let null_arrays: Vec<ArrayRef> = fields
+ .iter()
+ .map(|field|
create_primitive_array_repeated(field.data_type(), &None, num_rows))
+ .collect::<Result<Vec<_>>>()?;
+
+ Arc::new(StructArray::new(
+ fields.clone(),
+ null_arrays,
+ Some(NullBuffer::new_null(num_rows)),
+ ))
+ }
+ (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)),
+ (dt, _) => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("unexpected target column type {}", dt),
+ ));
+ }
+ })
+}
+
#[cfg(test)]
mod test {
use std::collections::HashMap;
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index aae8efed7..8d8f40f72 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -96,4 +96,5 @@ mod utils;
pub mod writer;
mod delete_vector;
+pub mod metadata_columns;
pub mod puffin;
diff --git a/crates/iceberg/src/metadata_columns.rs
b/crates/iceberg/src/metadata_columns.rs
new file mode 100644
index 000000000..b11b5cadb
--- /dev/null
+++ b/crates/iceberg/src/metadata_columns.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metadata columns (virtual/reserved fields) for Iceberg tables.
+//!
+//! This module defines metadata columns that can be requested in projections
+//! but are not stored in data files. Instead, they are computed on-the-fly
+//! during reading. Examples include the _file column (file path) and future
+//! columns like partition values or row numbers.
+
+use std::sync::Arc;
+
+use once_cell::sync::Lazy;
+
+use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type};
+use crate::{Error, ErrorKind, Result};
+
+/// Reserved field ID for the file path (_file) column per Iceberg spec
+pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1;
+
+/// Reserved column name for the file path metadata column
+pub const RESERVED_COL_NAME_FILE: &str = "_file";
+
+/// Documentation for the _file metadata column
+pub const RESERVED_COL_DOC_FILE: &str = "Path of the file in which a row is
stored";
+
+/// Lazy-initialized Iceberg field definition for the _file metadata column.
+/// This field represents the file path as a required string field.
+static FILE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
+ Arc::new(
+ NestedField::required(
+ RESERVED_FIELD_ID_FILE,
+ RESERVED_COL_NAME_FILE,
+ Type::Primitive(PrimitiveType::String),
+ )
+ .with_doc(RESERVED_COL_DOC_FILE),
+ )
+});
+
+/// Returns the Iceberg field definition for the _file metadata column.
+///
+/// # Returns
+/// A reference to the _file field definition as an Iceberg NestedField
+pub fn file_field() -> &'static NestedFieldRef {
+ &FILE_FIELD
+}
+
+/// Returns the Iceberg field definition for a metadata field ID.
+///
+/// # Arguments
+/// * `field_id` - The metadata field ID
+///
+/// # Returns
+/// The Iceberg field definition for the metadata column, or an error if not a
metadata field
+pub fn get_metadata_field(field_id: i32) -> Result<NestedFieldRef> {
+ match field_id {
+ RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())),
+ _ if is_metadata_field(field_id) => {
+ // Future metadata fields can be added here
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Metadata field ID {} recognized but field definition not
implemented",
+ field_id
+ ),
+ ))
+ }
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Field ID {} is not a metadata field", field_id),
+ )),
+ }
+}
+
+/// Returns the field ID for a metadata column name.
+///
+/// # Arguments
+/// * `column_name` - The metadata column name
+///
+/// # Returns
+/// The field ID of the metadata column, or an error if the column name is not
recognized
+pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
+ match column_name {
+ RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE),
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Unknown/unsupported metadata column name: {column_name}"),
+ )),
+ }
+}
+
+/// Checks if a field ID is a metadata field.
+///
+/// # Arguments
+/// * `field_id` - The field ID to check
+///
+/// # Returns
+/// `true` if the field ID is a (currently supported) metadata field, `false`
otherwise
+pub fn is_metadata_field(field_id: i32) -> bool {
+ field_id == RESERVED_FIELD_ID_FILE
+ // Additional metadata fields can be checked here in the future
+}
+
+/// Checks if a column name is a metadata column.
+///
+/// # Arguments
+/// * `column_name` - The column name to check
+///
+/// # Returns
+/// `true` if the column name is a metadata column, `false` otherwise
+pub fn is_metadata_column_name(column_name: &str) -> bool {
+ get_metadata_field_id(column_name).is_ok()
+}
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 3e319ca06..24c03b0b2 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -36,6 +36,7 @@ use crate::delete_file_index::DeleteFileIndex;
use
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
+use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
use crate::runtime::spawn;
use crate::spec::{DataContentType, SnapshotRef};
use crate::table::Table;
@@ -217,9 +218,13 @@ impl<'a> TableScanBuilder<'a> {
let schema = snapshot.schema(self.table.metadata())?;
- // Check that all column names exist in the schema.
+ // Check that all column names exist in the schema (skip reserved
columns).
if let Some(column_names) = self.column_names.as_ref() {
for column_name in column_names {
+ // Skip reserved columns that don't exist in the schema
+ if is_metadata_column_name(column_name) {
+ continue;
+ }
if schema.field_by_name(column_name).is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
@@ -240,6 +245,12 @@ impl<'a> TableScanBuilder<'a> {
});
for column_name in column_names.iter() {
+ // Handle metadata columns (like "_file")
+ if is_metadata_column_name(column_name) {
+ field_ids.push(get_metadata_field_id(column_name)?);
+ continue;
+ }
+
let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
@@ -254,10 +265,10 @@ impl<'a> TableScanBuilder<'a> {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
- "Column {column_name} is not a direct child of
schema but a nested field, which is not supported now. Schema: {schema}"
- ),
- )
- })?;
+ "Column {column_name} is not a direct child of schema
but a nested field, which is not supported now. Schema: {schema}"
+ ),
+ )
+ })?;
field_ids.push(field_id);
}
@@ -559,8 +570,10 @@ pub mod tests {
use std::fs::File;
use std::sync::Arc;
+ use arrow_array::cast::AsArray;
use arrow_array::{
- ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array,
RecordBatch, StringArray,
+ Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array,
RecordBatch,
+ StringArray,
};
use futures::{TryStreamExt, stream};
use minijinja::value::Value;
@@ -575,6 +588,7 @@ pub mod tests {
use crate::arrow::ArrowReaderBuilder;
use crate::expr::{BoundPredicate, Reference};
use crate::io::{FileIO, OutputFile};
+ use crate::metadata_columns::RESERVED_COL_NAME_FILE;
use crate::scan::FileScanTask;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal,
ManifestEntry,
@@ -1800,4 +1814,319 @@ pub mod tests {
};
test_fn(task);
}
+
+ #[tokio::test]
+ async fn test_select_with_file_column() {
+ use arrow_array::cast::AsArray;
+
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select regular columns plus the _file column
+ let table_scan = fixture
+ .table
+ .scan()
+ .select(["x", RESERVED_COL_NAME_FILE])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ // Verify we have 2 columns: x and _file
+ assert_eq!(batches[0].num_columns(), 2);
+
+ // Verify the x column exists and has correct data
+ let x_col = batches[0].column_by_name("x").unwrap();
+ let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
+ assert_eq!(x_arr.value(0), 1);
+
+ // Verify the _file column exists
+ let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
+ assert!(
+ file_col.is_some(),
+ "_file column should be present in the batch"
+ );
+
+ // Verify the _file column contains a file path
+ let file_col = file_col.unwrap();
+ assert!(
+ matches!(
+ file_col.data_type(),
+ arrow_schema::DataType::RunEndEncoded(_, _)
+ ),
+ "_file column should use RunEndEncoded type"
+ );
+
+ // Decode the RunArray to verify it contains the file path
+ let run_array = file_col
+ .as_any()
+
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+ .expect("_file column should be a RunArray");
+
+ let values = run_array.values();
+ let string_values = values.as_string::<i32>();
+ assert_eq!(string_values.len(), 1, "Should have a single file path");
+
+ let file_path = string_values.value(0);
+ assert!(
+ file_path.ends_with(".parquet"),
+ "File path should end with .parquet, got: {}",
+ file_path
+ );
+ }
+
+ #[tokio::test]
+ async fn test_select_file_column_position() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select columns in specific order: x, _file, z
+ let table_scan = fixture
+ .table
+ .scan()
+ .select(["x", RESERVED_COL_NAME_FILE, "z"])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_columns(), 3);
+
+ // Verify column order: x at position 0, _file at position 1, z at
position 2
+ let schema = batches[0].schema();
+ assert_eq!(schema.field(0).name(), "x");
+ assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
+ assert_eq!(schema.field(2).name(), "z");
+
+ // Verify columns by name also works
+ assert!(batches[0].column_by_name("x").is_some());
+ assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
+ assert!(batches[0].column_by_name("z").is_some());
+ }
+
+ #[tokio::test]
+ async fn test_select_file_column_only() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select only the _file column
+ let table_scan = fixture
+ .table
+ .scan()
+ .select([RESERVED_COL_NAME_FILE])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ // Should have exactly 1 column
+ assert_eq!(batches[0].num_columns(), 1);
+
+ // Verify it's the _file column
+ let schema = batches[0].schema();
+ assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
+
+ // Verify the batch has the correct number of rows
+ // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
+ // Each file has 1024 rows, so total is 2048 rows
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 2048);
+ }
+
+ #[tokio::test]
+ async fn test_file_column_with_multiple_files() {
+ use std::collections::HashSet;
+
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select x and _file columns
+ let table_scan = fixture
+ .table
+ .scan()
+ .select(["x", RESERVED_COL_NAME_FILE])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ // Collect all unique file paths from the batches
+ let mut file_paths = HashSet::new();
+ for batch in &batches {
+ let file_col =
batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
+ let run_array = file_col
+ .as_any()
+
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+ .expect("_file column should be a RunArray");
+
+ let values = run_array.values();
+ let string_values = values.as_string::<i32>();
+ for i in 0..string_values.len() {
+ file_paths.insert(string_values.value(i).to_string());
+ }
+ }
+
+ // We should have multiple files (the test creates 1.parquet and
3.parquet)
+ assert!(!file_paths.is_empty(), "Should have at least one file path");
+
+ // All paths should end with .parquet
+ for path in &file_paths {
+ assert!(
+ path.ends_with(".parquet"),
+ "All file paths should end with .parquet, got: {}",
+ path
+ );
+ }
+ }
+
+ #[tokio::test]
+ async fn test_file_column_at_start() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select _file at the start
+ let table_scan = fixture
+ .table
+ .scan()
+ .select([RESERVED_COL_NAME_FILE, "x", "y"])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_columns(), 3);
+
+ // Verify _file is at position 0
+ let schema = batches[0].schema();
+ assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
+ assert_eq!(schema.field(1).name(), "x");
+ assert_eq!(schema.field(2).name(), "y");
+ }
+
+ #[tokio::test]
+ async fn test_file_column_at_end() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select _file at the end
+ let table_scan = fixture
+ .table
+ .scan()
+ .select(["x", "y", RESERVED_COL_NAME_FILE])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_columns(), 3);
+
+ // Verify _file is at position 2 (the end)
+ let schema = batches[0].schema();
+ assert_eq!(schema.field(0).name(), "x");
+ assert_eq!(schema.field(1).name(), "y");
+ assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
+ }
+
+ #[tokio::test]
+ async fn test_select_with_repeated_column_names() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Select with repeated column names - both regular columns and
virtual columns
+ // Repeated columns should appear multiple times in the result
(duplicates are allowed)
+ let table_scan = fixture
+ .table
+ .scan()
+ .select([
+ "x",
+ RESERVED_COL_NAME_FILE,
+ "x", // x repeated
+ "y",
+ RESERVED_COL_NAME_FILE, // _file repeated
+ "y", // y repeated
+ ])
+ .with_row_selection_enabled(true)
+ .build()
+ .unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ // Verify we have exactly 6 columns (duplicates are allowed and
preserved)
+ assert_eq!(
+ batches[0].num_columns(),
+ 6,
+ "Should have exactly 6 columns with duplicates"
+ );
+
+ let schema = batches[0].schema();
+
+ // Verify columns appear in the exact order requested: x, _file, x, y,
_file, y
+ assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
+ assert_eq!(
+ schema.field(1).name(),
+ RESERVED_COL_NAME_FILE,
+ "Column 1 should be _file"
+ );
+ assert_eq!(
+ schema.field(2).name(),
+ "x",
+ "Column 2 should be x (duplicate)"
+ );
+ assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
+ assert_eq!(
+ schema.field(4).name(),
+ RESERVED_COL_NAME_FILE,
+ "Column 4 should be _file (duplicate)"
+ );
+ assert_eq!(
+ schema.field(5).name(),
+ "y",
+ "Column 5 should be y (duplicate)"
+ );
+
+ // Verify all columns have correct data types
+ assert!(
+ matches!(schema.field(0).data_type(),
arrow_schema::DataType::Int64),
+ "Column x should be Int64"
+ );
+ assert!(
+ matches!(schema.field(2).data_type(),
arrow_schema::DataType::Int64),
+ "Column x (duplicate) should be Int64"
+ );
+ assert!(
+ matches!(schema.field(3).data_type(),
arrow_schema::DataType::Int64),
+ "Column y should be Int64"
+ );
+ assert!(
+ matches!(schema.field(5).data_type(),
arrow_schema::DataType::Int64),
+ "Column y (duplicate) should be Int64"
+ );
+ assert!(
+ matches!(
+ schema.field(1).data_type(),
+ arrow_schema::DataType::RunEndEncoded(_, _)
+ ),
+ "_file column should use RunEndEncoded type"
+ );
+ assert!(
+ matches!(
+ schema.field(4).data_type(),
+ arrow_schema::DataType::RunEndEncoded(_, _)
+ ),
+ "_file column (duplicate) should use RunEndEncoded type"
+ );
+ }
}