This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8d4851f0 feat(reader): Add PartitionSpec support to FileScanTask and
RecordBatchTransformer (#1821)
8d4851f0 is described below
commit 8d4851f0db824050c0419bd20d939e5dd3a5a80b
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Nov 13 04:35:12 2025 -0500
feat(reader): Add PartitionSpec support to FileScanTask and
RecordBatchTransformer (#1821)
## Which issue does this PR close?
Partially address #1749.
## What changes are included in this PR?
This PR adds partition spec handling to `FileScanTask` and
`RecordBatchTransformer` to correctly implement the Iceberg spec's
"Column Projection" rules for fields "not present" in data files.
### Problem Statement
Prior to this PR, `iceberg-rust`'s `FileScanTask` had no mechanism to
pass partition information to `RecordBatchTransformer`, causing two
issues:
1. **Incorrect handling of bucket partitioning**: Couldn't distinguish
identity transforms (which should use partition metadata constants) from
non-identity transforms like bucket/truncate/year/month (which must read
from data file). For example, `bucket(4, id)` stores
`id_bucket = 2` (bucket number) in partition metadata, but actual `id`
values (100, 200, 300) are only in the data file. iceberg-rust was
incorrectly treating bucket-partitioned source columns as constants,
breaking runtime filtering and returning incorrect query results.
2. **Field ID conflicts in add_files scenarios**: When importing Hive
tables via `add_files`, partition columns could have field IDs
conflicting with Parquet data columns. Example: Parquet has
field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per
spec, the
correct field is "not present" and requires name mapping fallback.
### Iceberg Specification Requirements
Per the Iceberg spec
(https://iceberg.apache.org/spec/#column-projection), when a field ID is
"not present" in a data file, it must be resolved using these rules:
1. Return the value from partition metadata if an **Identity Transform**
exists
2. Use `schema.name-mapping.default` metadata to map field id to columns
without field id
3. Return the default value if it has a defined `initial-default`
4. Return null in all other cases
**Why this matters:**
- **Identity transforms** (e.g., `identity(dept)`) store actual column
values in partition metadata that can be used as constants without
reading the data file
- **Non-identity transforms** (e.g., `bucket(4, id)`, `day(timestamp)`)
store transformed values in partition metadata (e.g., bucket number 2,
not the actual `id` values 100, 200, 300) and must read source columns
from the data file
### Changes Made
1. **Added partition fields to `FileScanTask`** (`scan/task.rs`):
- `partition: Option<Struct>` - Partition data from manifest entry
- `partition_spec: Option<Arc<PartitionSpec>>` - For transform-aware
constant detection
- `name_mapping: Option<Arc<NameMapping>>` - Name mapping from table
metadata
2. **Implemented `constants_map()` function**
(`arrow/record_batch_transformer.rs`):
- Replicates Java's `PartitionUtil.constantsMap()` behavior
- Only includes fields where transform is `Transform::Identity`
- Used to determine which fields use partition metadata constants vs.
reading from data files
3. **Enhanced `RecordBatchTransformer`**
(`arrow/record_batch_transformer.rs`):
- Added `build_with_partition_data()` method to accept partition spec,
partition data, and name mapping
- Implements all 4 spec rules for column resolution with
identity-transform awareness
- Detects field ID conflicts by verifying both field ID AND name match
- Falls back to name mapping when field IDs are missing/conflicting
(spec rule #2)
4. **Updated `ArrowReader`** (`arrow/reader.rs`):
- Uses `build_with_partition_data()` when partition information is
available
- Falls back to `build()` when not available
5. **Updated manifest entry processing** (`scan/context.rs`):
- Populates partition fields in `FileScanTask` from manifest entry data
### Tests Added
1. **`bucket_partitioning_reads_source_column_from_file`** - Verifies
that bucket-partitioned source columns are read from data files (not
treated as constants from partition metadata)
2. **`identity_partition_uses_constant_from_metadata`** - Verifies that
identity-transformed fields correctly use partition metadata constants
3. **`test_bucket_partitioning_with_renamed_source_column`** - Verifies
field-ID-based mapping works despite column rename
4. **`add_files_partition_columns_without_field_ids`** - Verifies name
mapping resolution for Hive table imports without field IDs (spec rule
#2)
5. **`add_files_with_true_field_id_conflict`** - Verifies correct field
ID conflict detection with name mapping fallback (spec rule #2)
6. **`test_all_four_spec_rules`** - Integration test verifying all 4
spec rules work together
## Are these changes tested?
Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This
also resolved approximately 50 Iceberg Java tests when running with
DataFusion Comet's experimental
https://github.com/apache/datafusion-comet/pull/2528 PR.
---------
Co-authored-by: Renjie Liu <[email protected]>
---
.../src/arrow/caching_delete_file_loader.rs | 3 +
crates/iceberg/src/arrow/delete_file_loader.rs | 4 +-
crates/iceberg/src/arrow/delete_filter.rs | 6 +
crates/iceberg/src/arrow/reader.rs | 339 +++++++-
.../iceberg/src/arrow/record_batch_transformer.rs | 894 +++++++++++++++++++--
crates/iceberg/src/scan/context.rs | 7 +
crates/iceberg/src/scan/mod.rs | 6 +
crates/iceberg/src/scan/task.rs | 54 +-
8 files changed, 1222 insertions(+), 91 deletions(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 8a3ab3a9..f1c4f86f 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -881,6 +881,9 @@ mod tests {
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
// Load the deletes - should handle both types without error
diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs
b/crates/iceberg/src/arrow/delete_file_loader.rs
index c0b1392d..e12daf53 100644
--- a/crates/iceberg/src/arrow/delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/delete_file_loader.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};
use crate::arrow::ArrowReader;
-use crate::arrow::record_batch_transformer::RecordBatchTransformer;
+use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{Schema, SchemaRef};
@@ -82,7 +82,7 @@ impl BasicDeleteFileLoader {
equality_ids: &[i32],
) -> Result<ArrowRecordBatchStream> {
let mut record_batch_transformer =
- RecordBatchTransformer::build(target_schema.clone(), equality_ids);
+ RecordBatchTransformerBuilder::new(target_schema.clone(),
equality_ids).build();
let record_batch_stream = record_batch_stream.map(move |record_batch| {
record_batch.and_then(|record_batch| {
diff --git a/crates/iceberg/src/arrow/delete_filter.rs
b/crates/iceberg/src/arrow/delete_filter.rs
index 4250974b..14b5124e 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -341,6 +341,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
},
FileScanTask {
start: 0,
@@ -352,6 +355,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
},
];
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index fed8f19c..ab5a96f7 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -45,7 +45,7 @@ use parquet::file::metadata::{
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
-use crate::arrow::record_batch_transformer::RecordBatchTransformer;
+use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::delete_vector::DeleteVector;
use crate::error::Result;
@@ -55,7 +55,7 @@ use
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
-use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
+use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema,
Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};
@@ -181,7 +181,8 @@ impl ArrowReader {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) ||
!task.deletes.is_empty();
- let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes,
task.schema.clone());
+ let delete_filter_rx =
+ delete_file_loader.load_deletes(&task.deletes,
Arc::clone(&task.schema));
// Migrated tables lack field IDs, requiring us to inspect the schema
to choose
// between field-ID-based or position-based projection
@@ -193,7 +194,9 @@ impl ArrowReader {
)
.await?;
- // Parquet files from Hive/Spark migrations lack field IDs in their
metadata
+ // Check if Parquet file has embedded field IDs
+ // Corresponds to Java's ParquetSchemaUtil.hasIds()
+ // Reference:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
let missing_field_ids = initial_stream_builder
.schema()
.fields()
@@ -201,11 +204,38 @@ impl ArrowReader {
.next()
.is_some_and(|f|
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
- // Adding position-based fallback IDs at schema level (not per-batch)
enables projection
- // on files that lack embedded field IDs. We recreate the builder to
apply the modified schema.
+ // Three-branch schema resolution strategy matching Java's ReadConf
constructor
+ //
+ // Per Iceberg spec Column Projection rules:
+ // "Columns in Iceberg data files are selected by field id. The table
schema's column
+ // names and order may change after a data file is written, and
projection must be done
+ // using field ids."
+ // https://iceberg.apache.org/spec/#column-projection
+ //
+ // When Parquet files lack field IDs (e.g., Hive/Spark migrations via
add_files),
+ // we must assign field IDs BEFORE reading data to enable correct
projection.
+ //
+ // Java's ReadConf determines field ID strategy:
+ // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use
pruneColumns()
+ // - Branch 2: nameMapping present → applyNameMapping(), then
pruneColumns()
+ // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
let mut record_batch_stream_builder = if missing_field_ids {
- let arrow_schema =
-
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema());
+ // Parquet file lacks field IDs - must assign them before reading
+ let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
+ // Branch 2: Apply name mapping to assign correct Iceberg
field IDs
+ // Per spec rule #2: "Use schema.name-mapping.default metadata
to map field id
+ // to columns without field id"
+ // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
+ apply_name_mapping_to_arrow_schema(
+ Arc::clone(initial_stream_builder.schema()),
+ name_mapping,
+ )?
+ } else {
+ // Branch 3: No name mapping - use position-based fallback IDs
+ // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
+
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
+ };
+
let options = ArrowReaderOptions::new().with_schema(arrow_schema);
Self::create_parquet_record_batch_stream_builder(
@@ -216,11 +246,14 @@ impl ArrowReader {
)
.await?
} else {
+ // Branch 1: File has embedded field IDs - trust them
initial_stream_builder
};
- // Fallback IDs don't match Parquet's embedded field IDs (since they
don't exist),
- // so we must use position-based projection instead of field-ID
matching
+ // Create projection mask based on field IDs
+ // - If file has embedded IDs: field-ID-based projection
(missing_field_ids=false)
+ // - If name mapping applied: field-ID-based projection
(missing_field_ids=true but IDs now match)
+ // - If fallback IDs: position-based projection
(missing_field_ids=true)
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&task.schema,
@@ -234,9 +267,18 @@ impl ArrowReader {
// RecordBatchTransformer performs any transformations required on the
RecordBatches
// that come back from the file, such as type promotion, default
column insertion
- // and column re-ordering
- let mut record_batch_transformer =
- RecordBatchTransformer::build(task.schema_ref(),
task.project_field_ids());
+ // and column re-ordering.
+ let mut record_batch_transformer_builder =
+ RecordBatchTransformerBuilder::new(task.schema_ref(),
task.project_field_ids());
+
+ if let (Some(partition_spec), Some(partition_data)) =
+ (task.partition_spec.clone(), task.partition.clone())
+ {
+ record_batch_transformer_builder =
+
record_batch_transformer_builder.with_partition(partition_spec, partition_data);
+ }
+
+ let mut record_batch_transformer =
record_batch_transformer_builder.build();
if let Some(batch_size) = batch_size {
record_batch_stream_builder =
record_batch_stream_builder.with_batch_size(batch_size);
@@ -919,6 +961,77 @@ fn build_fallback_field_id_map(parquet_schema:
&SchemaDescriptor) -> HashMap<i32
column_map
}
+/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
+///
+/// Assigns Iceberg field IDs based on column names using the name mapping,
+/// enabling correct projection on migrated files (e.g., from Hive/Spark via
add_files).
+///
+/// Per Iceberg spec Column Projection rule #2:
+/// "Use schema.name-mapping.default metadata to map field id to columns
without field id"
+/// https://iceberg.apache.org/spec/#column-projection
+///
+/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and
ApplyNameMapping visitor.
+/// The key difference is Java operates on Parquet MessageType, while we
operate on Arrow Schema.
+///
+/// # Arguments
+/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
+/// * `name_mapping` - Name mapping from table metadata
(TableProperties.DEFAULT_NAME_MAPPING)
+///
+/// # Returns
+/// Arrow schema with field IDs assigned based on name mapping
+fn apply_name_mapping_to_arrow_schema(
+ arrow_schema: ArrowSchemaRef,
+ name_mapping: &NameMapping,
+) -> Result<Arc<ArrowSchema>> {
+ debug_assert!(
+ arrow_schema
+ .fields()
+ .iter()
+ .next()
+ .is_none_or(|f|
f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
+ "Schema already has field IDs - name mapping should not be applied"
+ );
+
+ use arrow_schema::Field;
+
+ let fields_with_mapped_ids: Vec<_> = arrow_schema
+ .fields()
+ .iter()
+ .map(|field| {
+ // Look up this column name in name mapping to get the Iceberg
field ID.
+ // Corresponds to Java's ApplyNameMapping visitor which calls
+ // nameMapping.find(currentPath()) and returns field.withId() if
found.
+ //
+ // If the field isn't in the mapping, leave it WITHOUT assigning
an ID
+ // (matching Java's behavior of returning the field unchanged).
+ // Later, during projection, fields without IDs are filtered out.
+ let mapped_field_opt = name_mapping
+ .fields()
+ .iter()
+ .find(|f| f.names().contains(&field.name().to_string()));
+
+ let mut metadata = field.metadata().clone();
+
+ if let Some(mapped_field) = mapped_field_opt {
+ if let Some(field_id) = mapped_field.field_id() {
+ // Field found in mapping with a field_id → assign it
+ metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(),
field_id.to_string());
+ }
+ // If field_id is None, leave the field without an ID (will be
filtered by projection)
+ }
+ // If field not found in mapping, leave it without an ID (will be
filtered by projection)
+
+ Field::new(field.name(), field.data_type().clone(),
field.is_nullable())
+ .with_metadata(metadata)
+ })
+ .collect();
+
+ Ok(Arc::new(ArrowSchema::new_with_metadata(
+ fields_with_mapped_ids,
+ arrow_schema.metadata().clone(),
+ )))
+}
+
/// Add position-based fallback field IDs to Arrow schema for Parquet files
lacking them.
/// Enables projection on migrated files (e.g., from Hive/Spark).
///
@@ -1948,6 +2061,9 @@ message schema {
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -2266,6 +2382,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
// Task 2: read the second and third row groups
@@ -2279,6 +2398,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as
FileScanTaskStream;
@@ -2403,6 +2525,9 @@ message schema {
project_field_ids: vec![1, 2], // Request both columns 'a' and
'b'
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -2571,6 +2696,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
@@ -2786,6 +2914,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
@@ -2994,6 +3125,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
@@ -3094,6 +3228,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3188,6 +3325,9 @@ message schema {
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3271,6 +3411,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3368,6 +3511,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3494,6 +3640,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3587,6 +3736,9 @@ message schema {
project_field_ids: vec![1, 5, 2],
predicate: None,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3693,6 +3845,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
@@ -3708,4 +3863,162 @@ message schema {
// Should return empty results
assert!(result.is_empty() || result.iter().all(|batch|
batch.num_rows() == 0));
}
+
+ /// Test bucket partitioning reads source column from data file (not
partition metadata).
+ ///
+ /// This is an integration test verifying the complete ArrowReader
pipeline with bucket partitioning.
+ /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g.,
testRenamedSourceColumnTable).
+ ///
+ /// # Iceberg Spec Requirements
+ ///
+ /// Per the Iceberg spec "Column Projection" section:
+ /// > "Return the value from partition metadata if an **Identity
Transform** exists for the field"
+ ///
+ /// This means:
+ /// - Identity transforms (e.g., `identity(dept)`) use constants from
partition metadata
+ /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source
columns from data files
+ /// - Partition metadata for bucket transforms stores bucket numbers
(0-3), NOT source values
+ ///
+ /// Java's PartitionUtil.constantsMap() implements this via:
+ /// ```java
+ /// if (field.transform().isIdentity()) {
+ /// idToConstant.put(field.sourceId(), converted);
+ /// }
+ /// ```
+ ///
+ /// # What This Test Verifies
+ ///
+ /// This test ensures the full ArrowReader → RecordBatchTransformer
pipeline correctly handles
+ /// bucket partitioning when FileScanTask provides partition_spec and
partition_data:
+ ///
+ /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
+ /// - FileScanTask specifies partition_spec with bucket(4, id) and
partition_data with bucket=1
+ /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned
field from constants
+ /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
+ /// - Values are NOT replaced with constant 1 from partition metadata
+ ///
+ /// # Why This Matters
+ ///
+ /// Without correct handling:
+ /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
+ /// - Query results would be incorrect (all rows would have id=1)
+ /// - Bucket partitioning would be unusable for query optimization
+ ///
+ /// # References
+ /// - Iceberg spec: format/spec.md "Column Projection" + "Partition
Transforms"
+ /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
+ /// - Java impl:
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+ #[tokio::test]
+ async fn test_bucket_partitioning_reads_source_column_from_file() {
+ use arrow_array::Int32Array;
+
+ use crate::spec::{Literal, PartitionSpec, Struct, Transform};
+
+ // Iceberg schema with id and name columns
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Partition spec: bucket(4, id)
+ let partition_spec = Arc::new(
+ PartitionSpec::builder(schema.clone())
+ .with_spec_id(0)
+ .add_partition_field("id", "id_bucket", Transform::Bucket(4))
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Partition data: bucket value is 1
+ let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
+
+ // Create Arrow schema with field IDs for Parquet file
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ Field::new("name", DataType::Utf8,
true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ ]));
+
+ // Write Parquet file with data
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as
ArrayRef;
+ let name_data =
+ Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie",
"Dave"])) as ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(arrow_schema.clone(), vec![id_data,
name_data]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+ let file = File::create(format!("{}/data.parquet",
&table_location)).unwrap();
+ let mut writer = ArrowWriter::try_new(file, to_write.schema(),
Some(props)).unwrap();
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+
+ // Read the Parquet file with partition spec and data
+ let reader = ArrowReaderBuilder::new(file_io).build();
+ let tasks = Box::pin(futures::stream::iter(
+ vec![Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/data.parquet", table_location),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ partition: Some(partition_data),
+ partition_spec: Some(partition_spec),
+ name_mapping: None,
+ })]
+ .into_iter(),
+ )) as FileScanTaskStream;
+
+ let result = reader
+ .read(tasks)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Verify we got the correct data
+ assert_eq!(result.len(), 1);
+ let batch = &result[0];
+
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.num_rows(), 4);
+
+ // The id column MUST contain actual values from the Parquet file [1,
5, 9, 13],
+ // NOT the constant partition value 1
+ let id_col = batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ assert_eq!(id_col.value(0), 1);
+ assert_eq!(id_col.value(1), 5);
+ assert_eq!(id_col.value(2), 9);
+ assert_eq!(id_col.value(3), 13);
+
+ let name_col = batch.column(1).as_string::<i32>();
+ assert_eq!(name_col.value(0), "Alice");
+ assert_eq!(name_col.value(1), "Bob");
+ assert_eq!(name_col.value(2), "Charlie");
+ assert_eq!(name_col.value(3), "Dave");
+ }
}
diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs
b/crates/iceberg/src/arrow/record_batch_transformer.rs
index 5fbbbb10..e7d8b8f0 100644
--- a/crates/iceberg/src/arrow/record_batch_transformer.rs
+++ b/crates/iceberg/src/arrow/record_batch_transformer.rs
@@ -29,9 +29,44 @@ use arrow_schema::{
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::arrow::schema_to_arrow_schema;
-use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema};
+use crate::spec::{
+ Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct,
Transform,
+};
use crate::{Error, ErrorKind, Result};
+/// Build a map of field ID to constant value for identity-partitioned fields.
+///
+/// Implements Iceberg spec "Column Projection" rule #1: use partition
metadata constants
+/// only for identity-transformed fields. Non-identity transforms (bucket,
truncate, year, etc.)
+/// store derived values in partition metadata, so source columns must be read
from data files.
+///
+/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2`
(bucket number),
+/// but the actual `id` values (100, 200, 300) are only in the data file.
+///
+/// Matches Java's `PartitionUtil.constantsMap()` which filters `if
(field.transform().isIdentity())`.
+///
+/// # References
+/// - Spec: https://iceberg.apache.org/spec/#column-projection
+/// - Java:
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap()
+fn constants_map(
+ partition_spec: &PartitionSpec,
+ partition_data: &Struct,
+) -> HashMap<i32, PrimitiveLiteral> {
+ let mut constants = HashMap::new();
+
+ for (pos, field) in partition_spec.fields().iter().enumerate() {
+ // Only identity transforms should use constant values from partition
metadata
+ if matches!(field.transform, Transform::Identity) {
+ // Get the partition value for this field
+ if let Some(Literal::Primitive(value)) = &partition_data[pos] {
+ constants.insert(field.source_id, value.clone());
+ }
+ }
+ }
+
+ constants
+}
+
/// Indicates how a particular column in a processed RecordBatch should
/// be sourced.
#[derive(Debug)]
@@ -107,32 +142,107 @@ enum SchemaComparison {
Different,
}
+/// Builder for RecordBatchTransformer to improve ergonomics when constructing
with optional parameters.
+///
+/// See [`RecordBatchTransformer`] for details on partition spec and partition
data.
#[derive(Debug)]
-pub(crate) struct RecordBatchTransformer {
+pub(crate) struct RecordBatchTransformerBuilder {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: Vec<i32>,
-
- // BatchTransform gets lazily constructed based on the schema of
- // the first RecordBatch we receive from the file
- batch_transform: Option<BatchTransform>,
+ partition_spec: Option<Arc<PartitionSpec>>,
+ partition_data: Option<Struct>,
}
-impl RecordBatchTransformer {
- /// Build a RecordBatchTransformer for a given
- /// Iceberg snapshot schema and list of projected field ids.
- pub(crate) fn build(
+impl RecordBatchTransformerBuilder {
+ pub(crate) fn new(
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: &[i32],
) -> Self {
- let projected_iceberg_field_ids = projected_iceberg_field_ids.to_vec();
-
Self {
snapshot_schema,
- projected_iceberg_field_ids,
+ projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
+ partition_spec: None,
+ partition_data: None,
+ }
+ }
+
+ /// Set partition spec and data together for identifying
identity-transformed partition columns.
+ ///
+ /// Both partition_spec and partition_data must be provided together since
the spec defines
+ /// which fields are identity-partitioned, and the data provides their
constant values.
+ /// One without the other cannot produce a valid constants map.
+ pub(crate) fn with_partition(
+ mut self,
+ partition_spec: Arc<PartitionSpec>,
+ partition_data: Struct,
+ ) -> Self {
+ self.partition_spec = Some(partition_spec);
+ self.partition_data = Some(partition_data);
+ self
+ }
+
+ pub(crate) fn build(self) -> RecordBatchTransformer {
+ RecordBatchTransformer {
+ snapshot_schema: self.snapshot_schema,
+ projected_iceberg_field_ids: self.projected_iceberg_field_ids,
+ partition_spec: self.partition_spec,
+ partition_data: self.partition_data,
batch_transform: None,
}
}
+}
+
+/// Transforms RecordBatches from Parquet files to match the Iceberg table
schema.
+///
+/// Handles schema evolution, column reordering, type promotion, and
implements the Iceberg spec's
+/// "Column Projection" rules for resolving field IDs "not present" in data
files:
+/// 1. Return the value from partition metadata if an Identity Transform exists
+/// 2. Use schema.name-mapping.default metadata to map field id to columns
without field id (applied in ArrowReader)
+/// 3. Return the default value if it has a defined initial-default
+/// 4. Return null in all other cases
+///
+/// # Field ID Resolution
+///
+/// Field ID resolution happens in ArrowReader before data is read (matching
Java's ReadConf):
+/// - If file has embedded field IDs: trust them (ParquetSchemaUtil.hasIds() =
true)
+/// - If file lacks IDs and name_mapping exists: apply name mapping
(ParquetSchemaUtil.applyNameMapping())
+/// - If file lacks IDs and no name_mapping: use position-based fallback
(ParquetSchemaUtil.addFallbackIds())
+///
+/// By the time RecordBatchTransformer processes data, all field IDs are
trustworthy.
+/// This transformer only handles remaining projection rules (#1, #3, #4) for
fields still "not present".
+///
+/// # Partition Spec and Data
+///
+/// **Bucket partitioning**: Distinguish identity transforms (use partition
metadata constants)
+/// from non-identity transforms like bucket (read from data file) to enable
runtime filtering on
+/// bucket-partitioned columns. For example, `bucket(4, id)` stores only the
bucket number in
+/// partition metadata, so actual `id` values must be read from the data file.
+///
+/// # References
+/// - Spec: https://iceberg.apache.org/spec/#column-projection
+/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
(field ID resolution)
+/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
(partition constants)
+#[derive(Debug)]
+pub(crate) struct RecordBatchTransformer {
+ snapshot_schema: Arc<IcebergSchema>,
+ projected_iceberg_field_ids: Vec<i32>,
+
+ /// Partition spec for identifying identity-transformed partition columns
(spec rule #1).
+ /// Only fields with identity transforms use partition data constants;
non-identity transforms
+ /// (bucket, truncate, etc.) must read source columns from data files.
+ partition_spec: Option<Arc<PartitionSpec>>,
+ /// Partition data providing constant values for identity-transformed
partition columns (spec rule #1).
+ /// For example, in a file at path `dept=engineering/file.parquet`, this
would contain
+ /// the value "engineering" for the dept field.
+ partition_data: Option<Struct>,
+
+ // BatchTransform gets lazily constructed based on the schema of
+ // the first RecordBatch we receive from the file
+ batch_transform: Option<BatchTransform>,
+}
+
+impl RecordBatchTransformer {
pub(crate) fn process_record_batch(
&mut self,
record_batch: RecordBatch,
@@ -147,7 +257,7 @@ impl RecordBatchTransformer {
.with_match_field_names(false)
.with_row_count(Some(record_batch.num_rows()));
RecordBatch::try_new_with_options(
- target_schema.clone(),
+ Arc::clone(target_schema),
self.transform_columns(record_batch.columns(),
operations)?,
&options,
)?
@@ -157,7 +267,7 @@ impl RecordBatchTransformer {
.with_match_field_names(false)
.with_row_count(Some(record_batch.num_rows()));
RecordBatch::try_new_with_options(
- target_schema.clone(),
+ Arc::clone(target_schema),
record_batch.columns().to_vec(),
&options,
)?
@@ -167,6 +277,8 @@ impl RecordBatchTransformer {
record_batch.schema_ref(),
self.snapshot_schema.as_ref(),
&self.projected_iceberg_field_ids,
+ self.partition_spec.as_ref().map(|s| s.as_ref()),
+ self.partition_data.as_ref(),
)?);
self.process_record_batch(record_batch)?
@@ -185,6 +297,8 @@ impl RecordBatchTransformer {
source_schema: &ArrowSchemaRef,
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
+ partition_spec: Option<&PartitionSpec>,
+ partition_data: Option<&Struct>,
) -> Result<BatchTransform> {
let mapped_unprojected_arrow_schema =
Arc::new(schema_to_arrow_schema(snapshot_schema)?);
let field_id_to_mapped_schema_map =
@@ -205,6 +319,12 @@ impl RecordBatchTransformer {
let target_schema = Arc::new(ArrowSchema::new(fields?));
+ let constants_map = if let (Some(spec), Some(data)) = (partition_spec,
partition_data) {
+ constants_map(spec, data)
+ } else {
+ HashMap::new()
+ };
+
match Self::compare_schemas(source_schema, &target_schema) {
SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
SchemaComparison::NameChangesOnly =>
Ok(BatchTransform::ModifySchema { target_schema }),
@@ -214,6 +334,8 @@ impl RecordBatchTransformer {
snapshot_schema,
projected_iceberg_field_ids,
field_id_to_mapped_schema_map,
+ constants_map,
+ partition_spec,
)?,
target_schema,
}),
@@ -270,57 +392,92 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
+ constants_map: HashMap<i32, PrimitiveLiteral>,
+ _partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
- projected_iceberg_field_ids.iter().map(|field_id|{
- let (target_field, _) =
field_id_to_mapped_schema_map.get(field_id).ok_or(
- Error::new(ErrorKind::Unexpected, "could not find field in
schema")
- )?;
- let target_type = target_field.data_type();
-
- Ok(if let Some((source_field, source_index)) =
field_id_to_source_schema_map.get(field_id) {
- // column present in source
+ projected_iceberg_field_ids
+ .iter()
+ .map(|field_id| {
+ let (target_field, _) =
+ field_id_to_mapped_schema_map
+ .get(field_id)
+ .ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "could not find field in schema",
+ ))?;
+ let target_type = target_field.data_type();
- if source_field.data_type().equals_datatype(target_type) {
- // no promotion required
- ColumnSource::PassThrough {
- source_index: *source_index
+ let iceberg_field =
snapshot_schema.field_by_id(*field_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "Field not found in snapshot schema",
+ ))?;
+
+ // Iceberg spec's "Column Projection" rules
(https://iceberg.apache.org/spec/#column-projection).
+ // For fields "not present" in data files:
+ // 1. Use partition metadata (identity transforms only)
+ // 2. Use name mapping
+ // 3. Use initial_default
+ // 4. Return null
+ //
+ // Why check partition constants before Parquet field IDs
(Java: BaseParquetReaders.java:299):
+ // In add_files scenarios, partition columns may exist in BOTH
Parquet AND partition metadata.
+ // Partition metadata is authoritative - it defines which
partition this file belongs to.
+
+ // Field ID resolution now happens in ArrowReader via:
+ // 1. Embedded field IDs (ParquetSchemaUtil.hasIds() = true) -
trust them
+ // 2. Name mapping (ParquetSchemaUtil.applyNameMapping()) -
applied upfront
+ // 3. Position-based fallback
(ParquetSchemaUtil.addFallbackIds()) - applied upfront
+ //
+ // At this point, all field IDs in the source schema are
trustworthy.
+ // No conflict detection needed - schema resolution happened
in reader.rs.
+ let field_by_id =
field_id_to_source_schema_map.get(field_id).map(
+ |(source_field, source_index)| {
+ if
source_field.data_type().equals_datatype(target_type) {
+ ColumnSource::PassThrough {
+ source_index: *source_index,
+ }
+ } else {
+ ColumnSource::Promote {
+ target_type: target_type.clone(),
+ source_index: *source_index,
+ }
+ }
+ },
+ );
+
+ // Apply spec's fallback steps for "not present" fields.
+ let column_source = if let Some(constant_value) =
constants_map.get(field_id) {
+ // Rule #1: Identity partition constant
+ ColumnSource::Add {
+ value: Some(constant_value.clone()),
+ target_type: target_type.clone(),
}
+ } else if let Some(source) = field_by_id {
+ source
} else {
- // promotion required
- ColumnSource::Promote {
+ // Rules #2, #3 and #4:
+ // Rule #2 (name mapping) was already applied in reader.rs
if needed.
+ // If field_id is still not found, the column doesn't
exist in the Parquet file.
+ // Fall through to rule #3 (initial_default) or rule #4
(null).
+ let default_value =
iceberg_field.initial_default.as_ref().and_then(|lit| {
+ if let Literal::Primitive(prim) = lit {
+ Some(prim.clone())
+ } else {
+ None
+ }
+ });
+ ColumnSource::Add {
+ value: default_value,
target_type: target_type.clone(),
- source_index: *source_index,
}
- }
- } else {
- // column must be added
- let iceberg_field =
snapshot_schema.field_by_id(*field_id).ok_or(
- Error::new(ErrorKind::Unexpected, "Field not found in
snapshot schema")
- )?;
-
- let default_value = if let Some(iceberg_default_value) =
- &iceberg_field.initial_default
- {
- let Literal::Primitive(primitive_literal) =
iceberg_default_value else {
- return Err(Error::new(
- ErrorKind::Unexpected,
- format!("Default value for column must be
primitive type, but encountered {iceberg_default_value:?}")
- ));
- };
- Some(primitive_literal.clone())
- } else {
- None
};
- ColumnSource::Add {
- value: default_value,
- target_type: target_type.clone(),
- }
+ Ok(column_source)
})
- }).collect()
+ .collect()
}
fn build_field_id_to_arrow_schema_map(
@@ -328,25 +485,19 @@ impl RecordBatchTransformer {
) -> Result<HashMap<i32, (FieldRef, usize)>> {
let mut field_id_to_source_schema = HashMap::new();
for (source_field_idx, source_field) in
source_schema.fields.iter().enumerate() {
- let this_field_id = source_field
- .metadata()
- .get(PARQUET_FIELD_ID_META_KEY)
- .ok_or_else(|| {
+ // Check if field has a field ID in metadata
+ if let Some(field_id_str) =
source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
+ let this_field_id = field_id_str.parse().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
- "field ID not present in parquet metadata",
- )
- })?
- .parse()
- .map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("field id not parseable as an i32: {e}"),
+ format!("field id not parseable as an i32: {}", e),
)
})?;
- field_id_to_source_schema
- .insert(this_field_id, (source_field.clone(),
source_field_idx));
+ field_id_to_source_schema
+ .insert(this_field_id, (source_field.clone(),
source_field_idx));
+ }
+ // If field doesn't have a field ID, skip it - name mapping will
handle it
}
Ok(field_id_to_source_schema)
@@ -447,7 +598,7 @@ impl RecordBatchTransformer {
(dt, _) => {
return Err(Error::new(
ErrorKind::Unexpected,
- format!("unexpected target column type {dt}"),
+ format!("unexpected target column type {}", dt),
));
}
})
@@ -466,8 +617,10 @@ mod test {
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
- use crate::arrow::record_batch_transformer::RecordBatchTransformer;
- use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type};
+ use crate::arrow::record_batch_transformer::{
+ RecordBatchTransformer, RecordBatchTransformerBuilder,
+ };
+ use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct,
Type};
#[test]
fn build_field_id_to_source_schema_map_works() {
@@ -492,7 +645,9 @@ mod test {
let snapshot_schema = Arc::new(iceberg_table_schema());
let projected_iceberg_field_ids = [13, 14];
- let mut inst = RecordBatchTransformer::build(snapshot_schema,
&projected_iceberg_field_ids);
+ let mut inst =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_iceberg_field_ids)
+ .build();
let result = inst
.process_record_batch(source_record_batch_no_migration_required())
@@ -508,7 +663,9 @@ mod test {
let snapshot_schema = Arc::new(iceberg_table_schema());
let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e,
f
- let mut inst = RecordBatchTransformer::build(snapshot_schema,
&projected_iceberg_field_ids);
+ let mut inst =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_iceberg_field_ids)
+ .build();
let result = inst.process_record_batch(source_record_batch()).unwrap();
@@ -537,7 +694,8 @@ mod test {
let projected_iceberg_field_ids = [1, 2, 3];
let mut transformer =
- RecordBatchTransformer::build(snapshot_schema,
&projected_iceberg_field_ids);
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_iceberg_field_ids)
+ .build();
let file_schema = Arc::new(ArrowSchema::new(vec![
simple_field("id", DataType::Int32, false, "1"),
@@ -696,4 +854,592 @@ mod test {
value.to_string(),
)]))
}
+
+ /// Test for add_files with Parquet files that have NO field IDs (Hive
tables).
+ ///
+ /// This reproduces the scenario from Iceberg spec where:
+ /// - Hive-style partitioned Parquet files are imported via add_files
procedure
+ /// - Parquet files originally DO NOT have field IDs (typical for Hive
tables)
+ /// - ArrowReader applies name mapping to assign correct Iceberg field IDs
+ /// - Iceberg schema assigns field IDs: id (1), name (2), dept (3),
subdept (4)
+ /// - Partition columns (id, dept) have initial_default values
+ ///
+ /// Per the Iceberg spec
(https://iceberg.apache.org/spec/#column-projection),
+ /// this scenario requires `schema.name-mapping.default` from table
metadata
+ /// to correctly map Parquet columns by name to Iceberg field IDs.
+ /// This mapping is now applied in ArrowReader before data is processed.
+ ///
+ /// Expected behavior:
+ /// 1. id=1 (from initial_default) - spec rule #3
+ /// 2. name="John Doe" (from Parquet with field_id=2 assigned by reader) -
found by field ID
+ /// 3. dept="hr" (from initial_default) - spec rule #3
+ /// 4. subdept="communications" (from Parquet with field_id=4 assigned by
reader) - found by field ID
+ #[test]
+ fn add_files_with_name_mapping_applied_in_reader() {
+ // Iceberg schema after add_files: id (partition), name, dept
(partition), subdept
+ let snapshot_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::optional(1, "id",
Type::Primitive(PrimitiveType::Int))
+ .with_initial_default(Literal::int(1))
+ .into(),
+ NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(3, "dept",
Type::Primitive(PrimitiveType::String))
+ .with_initial_default(Literal::string("hr"))
+ .into(),
+ NestedField::optional(4, "subdept",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Simulate ArrowReader having applied name mapping:
+ // Original Parquet: name, subdept (NO field IDs)
+ // After reader.rs applies name mapping: name (field_id=2), subdept
(field_id=4)
+ //
+ // Note: Partition columns (id, dept) are NOT in the Parquet file -
they're in directory paths
+ use std::collections::HashMap;
+ let parquet_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("name", DataType::Utf8,
true).with_metadata(HashMap::from([(
+ "PARQUET:field_id".to_string(),
+ "2".to_string(),
+ )])),
+ Field::new("subdept", DataType::Utf8,
true).with_metadata(HashMap::from([(
+ "PARQUET:field_id".to_string(),
+ "4".to_string(),
+ )])),
+ ]));
+
+ let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
+
+ let mut transformer =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids).build();
+
+ // Create a Parquet RecordBatch with data for: name="John Doe",
subdept="communications"
+ let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+ Arc::new(StringArray::from(vec!["John Doe"])),
+ Arc::new(StringArray::from(vec!["communications"])),
+ ])
+ .unwrap();
+
+ let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+ // Verify the transformed RecordBatch has:
+ // - id=1 (from initial_default, not from Parquet)
+ // - name="John Doe" (from Parquet with correct field_id=2)
+ // - dept="hr" (from initial_default, not from Parquet)
+ // - subdept="communications" (from Parquet with correct field_id=4)
+ assert_eq!(result.num_columns(), 4);
+ assert_eq!(result.num_rows(), 1);
+
+ let id_column = result
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_column.value(0), 1);
+
+ let name_column = result
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(name_column.value(0), "John Doe");
+
+ let dept_column = result
+ .column(2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(dept_column.value(0), "hr");
+
+ let subdept_column = result
+ .column(3)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(subdept_column.value(0), "communications");
+ }
+
+ /// Test for bucket partitioning where source columns must be read from
data files.
+ ///
+ /// This test verifies correct implementation of the Iceberg spec's
"Column Projection" rules:
+ /// > "Return the value from partition metadata if an **Identity
Transform** exists for the field"
+ ///
+ /// # Why this test is critical
+ ///
+ /// The key insight is that partition metadata stores TRANSFORMED values,
not source values:
+ /// - For `bucket(4, id)`, partition metadata has `id_bucket = 2` (the
bucket number)
+ /// - The actual `id` column values (100, 200, 300) are ONLY in the data
file
+ ///
+ /// If iceberg-rust incorrectly treated bucket-partitioned fields as
constants, it would:
+ /// 1. Replace all `id` values with the constant `2` from partition
metadata
+ /// 2. Break runtime filtering (e.g., `WHERE id = 100` would match no rows)
+ /// 3. Return incorrect query results
+ ///
+ /// # What this test verifies
+ ///
+ /// - Bucket-partitioned fields (e.g., `bucket(4, id)`) are read from the
data file
+ /// - The source column `id` contains actual values (100, 200, 300), not
constants
+ /// - Java's `PartitionUtil.constantsMap()` behavior is correctly
replicated:
+ /// ```java
+ /// if (field.transform().isIdentity()) { // FALSE for bucket transforms
+ /// idToConstant.put(field.sourceId(), converted);
+ /// }
+ /// ```
+ ///
+ /// # Real-world impact
+ ///
+ /// This reproduces the failure scenario from Iceberg Java's
TestRuntimeFiltering:
+ /// - Tables partitioned by `bucket(N, col)` are common for load balancing
+ /// - Queries filter on the source column: `SELECT * FROM tbl WHERE col =
value`
+ /// - Runtime filtering pushes predicates down to Iceberg file scans
+ /// - Without this fix, the filter would match against constant partition
values instead of data
+ ///
+ /// # References
+ /// - Iceberg spec: format/spec.md "Column Projection" + "Partition
Transforms"
+ /// - Java impl:
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+ /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
+ #[test]
+ fn bucket_partitioning_reads_source_column_from_file() {
+ use crate::spec::{Struct, Transform};
+
+ // Table schema: id (data column), name (data column), id_bucket
(partition column)
+ let snapshot_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Partition spec: bucket(4, id) - the id field is bucketed
+ let partition_spec = Arc::new(
+ crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+ .with_spec_id(0)
+ .add_partition_field("id", "id_bucket", Transform::Bucket(4))
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Partition data: bucket value is 2
+ // In Iceberg, partition data is a Struct where each field corresponds
to a partition field
+ let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
+
+ // Parquet file contains both id and name columns
+ let parquet_schema = Arc::new(ArrowSchema::new(vec![
+ simple_field("id", DataType::Int32, false, "1"),
+ simple_field("name", DataType::Utf8, true, "2"),
+ ]));
+
+ let projected_field_ids = [1, 2]; // id, name
+
+ let mut transformer =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
+ .with_partition(partition_spec, partition_data)
+ .build();
+
+ // Create a Parquet RecordBatch with actual data
+ // The id column MUST be read from here, not treated as a constant
+ let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+ Arc::new(Int32Array::from(vec![100, 200, 300])),
+ Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+ ])
+ .unwrap();
+
+ let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+ // Verify the transformed RecordBatch correctly reads id from the file
+ // (NOT as a constant from partition metadata)
+ assert_eq!(result.num_columns(), 2);
+ assert_eq!(result.num_rows(), 3);
+
+ let id_column = result
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ // These values MUST come from the Parquet file, not be replaced by
constants
+ assert_eq!(id_column.value(0), 100);
+ assert_eq!(id_column.value(1), 200);
+ assert_eq!(id_column.value(2), 300);
+
+ let name_column = result
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(name_column.value(0), "Alice");
+ assert_eq!(name_column.value(1), "Bob");
+ assert_eq!(name_column.value(2), "Charlie");
+ }
+
+ /// Test that identity-transformed partition fields ARE treated as
constants.
+ ///
+ /// This is the complement to
`bucket_partitioning_reads_source_column_from_file`,
+ /// verifying that constants_map() correctly identifies
identity-transformed
+ /// partition fields per the Iceberg spec.
+ ///
+ /// # Spec requirement (format/spec.md "Column Projection")
+ ///
+ /// > "Return the value from partition metadata if an Identity Transform
exists for the field
+ /// > and the partition value is present in the `partition` struct on
`data_file` object
+ /// > in the manifest. This allows for metadata only migrations of Hive
tables."
+ ///
+ /// # Why identity transforms use constants
+ ///
+ /// Unlike bucket/truncate/year/etc., identity transforms don't modify the
value:
+ /// - `identity(dept)` stores the actual `dept` value in partition metadata
+ /// - Partition metadata has `dept = "engineering"` (the real value, not a
hash/bucket)
+ /// - This value can be used directly without reading the data file
+ ///
+ /// # Performance benefit
+ ///
+ /// For Hive migrations where partition columns aren't in data files:
+ /// - Partition metadata provides the column values
+ /// - No need to read from data files (metadata-only query optimization)
+ /// - Common pattern: `dept=engineering/subdept=backend/file.parquet`
+ /// - `dept` and `subdept` are in directory structure, not in
`file.parquet`
+ /// - Iceberg populates these from partition metadata as constants
+ ///
+ /// # What this test verifies
+ ///
+ /// - Identity-partitioned fields use constants from partition metadata
+ /// - The `dept` column is populated with `"engineering"` (not read from
file)
+ /// - Java's `PartitionUtil.constantsMap()` behavior is matched:
+ /// ```java
+ /// if (field.transform().isIdentity()) { // TRUE for identity
+ /// idToConstant.put(field.sourceId(), converted);
+ /// }
+ /// ```
+ ///
+ /// # References
+ /// - Iceberg spec: format/spec.md "Column Projection"
+ /// - Java impl:
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+ #[test]
+ fn identity_partition_uses_constant_from_metadata() {
+ use crate::spec::{Struct, Transform};
+
+ // Table schema: id (data column), dept (partition column), name (data
column)
+ let snapshot_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "dept",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(3, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Partition spec: identity(dept) - the dept field uses identity
transform
+ let partition_spec = Arc::new(
+ crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+ .with_spec_id(0)
+ .add_partition_field("dept", "dept", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Partition data: dept="engineering"
+ let partition_data =
Struct::from_iter(vec![Some(Literal::string("engineering"))]);
+
+ // Parquet file contains only id and name (dept is in partition path)
+ let parquet_schema = Arc::new(ArrowSchema::new(vec![
+ simple_field("id", DataType::Int32, false, "1"),
+ simple_field("name", DataType::Utf8, true, "3"),
+ ]));
+
+ let projected_field_ids = [1, 2, 3]; // id, dept, name
+
+ let mut transformer =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
+ .with_partition(partition_spec, partition_data)
+ .build();
+
+ let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+ Arc::new(Int32Array::from(vec![100, 200])),
+ Arc::new(StringArray::from(vec!["Alice", "Bob"])),
+ ])
+ .unwrap();
+
+ let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+ // Verify the dept column is populated with the constant from
partition metadata
+ assert_eq!(result.num_columns(), 3);
+ assert_eq!(result.num_rows(), 2);
+
+ let id_column = result
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_column.value(0), 100);
+ assert_eq!(id_column.value(1), 200);
+
+ let dept_column = result
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ // This value MUST come from partition metadata (constant)
+ assert_eq!(dept_column.value(0), "engineering");
+ assert_eq!(dept_column.value(1), "engineering");
+
+ let name_column = result
+ .column(2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(name_column.value(0), "Alice");
+ assert_eq!(name_column.value(1), "Bob");
+ }
+
+ /// Test bucket partitioning with renamed source column.
+ ///
+ /// This verifies correct behavior for
TestRuntimeFiltering.testRenamedSourceColumnTable() in Iceberg Java.
+ /// When a source column is renamed after partitioning is established,
field-ID-based mapping
+ /// must still correctly identify the column in Parquet files.
+ ///
+ /// # Scenario
+ ///
+ /// 1. Table created with `bucket(4, id)` partitioning
+ /// 2. Data written to Parquet files (field_id=1, name="id")
+ /// 3. Column renamed: `ALTER TABLE ... RENAME COLUMN id TO row_id`
+ /// 4. Iceberg schema now has: field_id=1, name="row_id"
+ /// 5. Parquet files still have: field_id=1, name="id"
+ ///
+ /// # Expected Behavior Per Iceberg Spec
+ ///
+ /// Per the Iceberg spec "Column Projection" section and Java's
PartitionUtil.constantsMap():
+ /// - Bucket transforms are NON-identity, so partition metadata stores
bucket numbers (0-3), not source values
+ /// - Source columns for non-identity transforms MUST be read from data
files
+ /// - Field-ID-based mapping should find the column by field_id=1
(ignoring name mismatch)
+ /// - Runtime filtering on `row_id` should work correctly
+ ///
+ /// # What This Tests
+ ///
+ /// This test ensures that when FileScanTask provides partition_spec and
partition_data:
+ /// - constants_map() correctly identifies that bucket(4, row_id) is NOT
an identity transform
+ /// - The source column (field_id=1) is NOT added to constants_map
+ /// - Field-ID-based mapping reads actual values from the Parquet file
+ /// - Values [100, 200, 300] are read, not replaced with bucket constant 2
+ ///
+ /// # References
+ /// - Java test:
spark/src/test/java/.../TestRuntimeFiltering.java::testRenamedSourceColumnTable
+ /// - Java impl:
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java::constantsMap()
+ /// - Iceberg spec: format/spec.md "Column Projection" section
+ #[test]
+ fn test_bucket_partitioning_with_renamed_source_column() {
+ use crate::spec::{Struct, Transform};
+
+ // Iceberg schema after rename: row_id (was id), name
+ let snapshot_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "row_id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Partition spec: bucket(4, row_id) - but source_id still points to
field_id=1
+ let partition_spec = Arc::new(
+ crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+ .with_spec_id(0)
+ .add_partition_field("row_id", "row_id_bucket",
Transform::Bucket(4))
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Partition data: bucket value is 2
+ let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]);
+
+ // Parquet file has OLD column name "id" but SAME field_id=1
+ // Field-ID-based mapping should find this despite name mismatch
+ let parquet_schema = Arc::new(ArrowSchema::new(vec![
+ simple_field("id", DataType::Int32, false, "1"),
+ simple_field("name", DataType::Utf8, true, "2"),
+ ]));
+
+ let projected_field_ids = [1, 2]; // row_id (field_id=1), name
(field_id=2)
+
+ let mut transformer =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
+ .with_partition(partition_spec, partition_data)
+ .build();
+
+ // Create a Parquet RecordBatch with actual data
+ // Despite column rename, data should be read via field_id=1
+ let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+ Arc::new(Int32Array::from(vec![100, 200, 300])),
+ Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+ ])
+ .unwrap();
+
+ let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+ // Verify the transformed RecordBatch correctly reads data despite
name mismatch
+ assert_eq!(result.num_columns(), 2);
+ assert_eq!(result.num_rows(), 3);
+
+ let row_id_column = result
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ // These values MUST come from the Parquet file via field_id=1,
+ // not be replaced by the bucket constant (2)
+ assert_eq!(row_id_column.value(0), 100);
+ assert_eq!(row_id_column.value(1), 200);
+ assert_eq!(row_id_column.value(2), 300);
+
+ let name_column = result
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(name_column.value(0), "Alice");
+ assert_eq!(name_column.value(1), "Bob");
+ assert_eq!(name_column.value(2), "Charlie");
+ }
+
+ /// Comprehensive integration test that verifies all 4 Iceberg spec rules
work correctly.
+ ///
+ /// Per the Iceberg spec
(https://iceberg.apache.org/spec/#column-projection),
+ /// "Values for field ids which are not present in a data file must be
resolved
+ /// according the following rules:"
+ ///
+ /// This test creates a scenario where each rule is exercised:
+ /// - Rule #1: dept (identity-partitioned) -> constant from partition
metadata
+ /// - Rule #2: data (via name mapping) -> read from Parquet file by name
+ /// - Rule #3: category (initial_default) -> use default value
+ /// - Rule #4: notes (no default) -> return null
+ ///
+ /// # References
+ /// - Iceberg spec: format/spec.md "Column Projection" section
+ #[test]
+ fn test_all_four_spec_rules() {
+ use crate::spec::Transform;
+
+ // Iceberg schema with columns designed to exercise each spec rule
+ let snapshot_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ // Field in Parquet by field ID (normal case)
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ // Rule #1: Identity-partitioned field - should use
partition metadata
+ NestedField::required(2, "dept",
Type::Primitive(PrimitiveType::String)).into(),
+ // Rule #2: Field resolved by name mapping (ArrowReader
already applied)
+ NestedField::required(3, "data",
Type::Primitive(PrimitiveType::String)).into(),
+ // Rule #3: Field with initial_default
+ NestedField::optional(4, "category",
Type::Primitive(PrimitiveType::String))
+
.with_initial_default(Literal::string("default_category"))
+ .into(),
+ // Rule #4: Field with no default - should be null
+ NestedField::optional(5, "notes",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Partition spec: identity transform on dept
+ let partition_spec = Arc::new(
+ crate::spec::PartitionSpec::builder(snapshot_schema.clone())
+ .with_spec_id(0)
+ .add_partition_field("dept", "dept", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap(),
+ );
+
+ // Partition data: dept="engineering"
+ let partition_data =
Struct::from_iter(vec![Some(Literal::string("engineering"))]);
+
+ // Parquet schema: simulates post-ArrowReader state where name mapping
already applied
+ // Has id (field_id=1) and data (field_id=3, assigned by ArrowReader
via name mapping)
+ // Missing: dept (in partition), category (has default), notes (no
default)
+ let parquet_schema = Arc::new(ArrowSchema::new(vec![
+ simple_field("id", DataType::Int32, false, "1"),
+ simple_field("data", DataType::Utf8, false, "3"),
+ ]));
+
+ let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data,
category, notes
+
+ let mut transformer =
+ RecordBatchTransformerBuilder::new(snapshot_schema,
&projected_field_ids)
+ .with_partition(partition_spec, partition_data)
+ .build();
+
+ let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
+ Arc::new(Int32Array::from(vec![100, 200])),
+ Arc::new(StringArray::from(vec!["value1", "value2"])),
+ ])
+ .unwrap();
+
+ let result = transformer.process_record_batch(parquet_batch).unwrap();
+
+ assert_eq!(result.num_columns(), 5);
+ assert_eq!(result.num_rows(), 2);
+
+ // Verify each column demonstrates the correct spec rule:
+
+ // Normal case: id from Parquet by field ID
+ let id_column = result
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_column.value(0), 100);
+ assert_eq!(id_column.value(1), 200);
+
+ // Rule #1: dept from partition metadata (identity transform)
+ let dept_column = result
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(dept_column.value(0), "engineering");
+ assert_eq!(dept_column.value(1), "engineering");
+
+ // Rule #2: data from Parquet via name mapping
+ let data_column = result
+ .column(2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(data_column.value(0), "value1");
+ assert_eq!(data_column.value(1), "value2");
+
+ // Rule #3: category from initial_default
+ let category_column = result
+ .column(3)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(category_column.value(0), "default_category");
+ assert_eq!(category_column.value(1), "default_category");
+
+ // Rule #4: notes is null (no default, not in Parquet, not in
partition)
+ let notes_column = result
+ .column(4)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert!(notes_column.is_null(0));
+ assert!(notes_column.is_null(1));
+ }
}
diff --git a/crates/iceberg/src/scan/context.rs
b/crates/iceberg/src/scan/context.rs
index 3f7c29db..fe3f5c8f 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -128,6 +128,13 @@ impl ManifestEntryContext {
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
deletes,
+
+ // Include partition data and spec from manifest entry
+ partition: Some(self.manifest_entry.data_file.partition.clone()),
+ // TODO: Pass actual PartitionSpec through context chain for
native flow
+ partition_spec: None,
+ // TODO: Extract name_mapping from table metadata property
"schema.name-mapping.default"
+ name_mapping: None,
})
}
}
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 6884e00b..3e319ca0 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -1777,6 +1777,9 @@ pub mod tests {
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
test_fn(task);
@@ -1791,6 +1794,9 @@ pub mod tests {
record_count: None,
data_file_format: DataFileFormat::Avro,
deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
};
test_fn(task);
}
diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs
index 32fe3ae3..e1ef241a 100644
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@ -15,16 +15,39 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use futures::stream::BoxStream;
-use serde::{Deserialize, Serialize};
+use serde::{Deserialize, Serialize, Serializer};
use crate::Result;
use crate::expr::BoundPredicate;
-use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema,
SchemaRef};
+use crate::spec::{
+ DataContentType, DataFileFormat, ManifestEntryRef, NameMapping,
PartitionSpec, Schema,
+ SchemaRef, Struct,
+};
/// A stream of [`FileScanTask`].
pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
+/// Serialization helper that always returns NotImplementedError.
+/// Used for fields that should not be serialized but we want to be explicit
about it.
+fn serialize_not_implemented<S, T>(_: &T, _: S) -> std::result::Result<S::Ok,
S::Error>
+where S: Serializer {
+ Err(serde::ser::Error::custom(
+ "Serialization not implemented for this field",
+ ))
+}
+
+/// Deserialization helper that always returns NotImplementedError.
+/// Used for fields that should not be deserialized but we want to be explicit
about it.
+fn deserialize_not_implemented<'de, D, T>(_: D) -> std::result::Result<T,
D::Error>
+where D: serde::Deserializer<'de> {
+ Err(serde::de::Error::custom(
+ "Deserialization not implemented for this field",
+ ))
+}
+
/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FileScanTask {
@@ -54,6 +77,33 @@ pub struct FileScanTask {
/// The list of delete files that may need to be applied to this data file
pub deletes: Vec<FileScanTaskDeleteFile>,
+
+ /// Partition data from the manifest entry, used to identify which columns
can use
+ /// constant values from partition metadata vs. reading from the data file.
+ /// Per the Iceberg spec, only identity-transformed partition fields
should use constants.
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(serialize_with = "serialize_not_implemented")]
+ #[serde(deserialize_with = "deserialize_not_implemented")]
+ pub partition: Option<Struct>,
+
+ /// The partition spec for this file, used to distinguish identity
transforms
+ /// (which use partition metadata constants) from non-identity transforms
like
+ /// bucket/truncate (which must read source columns from the data file).
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(serialize_with = "serialize_not_implemented")]
+ #[serde(deserialize_with = "deserialize_not_implemented")]
+ pub partition_spec: Option<Arc<PartitionSpec>>,
+
+ /// Name mapping from table metadata (property:
schema.name-mapping.default),
+ /// used to resolve field IDs from column names when Parquet files lack
field IDs
+ /// or have field ID conflicts.
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(serialize_with = "serialize_not_implemented")]
+ #[serde(deserialize_with = "deserialize_not_implemented")]
+ pub name_mapping: Option<Arc<NameMapping>>,
}
impl FileScanTask {