This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch xuanwo/resolve-merge-conflicts
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git

commit c9dcf54c581050aec60b55721aa98a160271b458
Merge: 719d62f21 9fa3776cb
Author: Xuanwo <[email protected]>
AuthorDate: Fri Dec 5 18:24:22 2025 +0800

    Merge remote-tracking branch 'origin/main' into 
viktor/origin_limit_push_down
    
    Signed-off-by: Xuanwo <[email protected]>

 .asf.yaml                                          |    9 +-
 .github/workflows/audit.yml                        |    2 +-
 .github/workflows/bindings_python_ci.yml           |   37 +-
 .github/workflows/ci.yml                           |   41 +-
 .github/workflows/ci_typos.yml                     |    4 +-
 .github/workflows/publish.yml                      |    2 +-
 .github/workflows/release_python.yml               |   14 +-
 .github/workflows/release_python_nightly.yml       |   40 +-
 .github/workflows/stale.yml                        |    2 +-
 .github/workflows/website.yml                      |    2 +-
 .licenserc.yaml                                    |   19 +-
 CONTRIBUTING.md                                    |    4 +-
 Cargo.lock                                         | 2240 +++++------
 Cargo.toml                                         |   61 +-
 Makefile                                           |    5 +-
 README.md                                          |    1 +
 bindings/python/Cargo.lock                         | 1361 +++----
 bindings/python/Cargo.toml                         |   13 +-
 .licenserc.yaml => bindings/python/Makefile        |   26 +-
 bindings/python/README.md                          |   14 +-
 bindings/python/pyproject.toml                     |   18 +-
 bindings/python/src/datafusion_table_provider.rs   |    6 +-
 bindings/python/uv.lock                            |  767 ++++
 crates/catalog/glue/src/catalog.rs                 |  130 +-
 crates/catalog/glue/src/error.rs                   |    4 +-
 crates/catalog/glue/src/schema.rs                  |    6 +-
 crates/catalog/glue/src/utils.rs                   |    5 +-
 crates/catalog/glue/tests/glue_catalog_test.rs     |    4 +-
 crates/catalog/hms/src/catalog.rs                  |    4 +-
 crates/catalog/hms/src/error.rs                    |    4 +-
 crates/catalog/hms/src/schema.rs                   |    6 +-
 crates/catalog/hms/src/utils.rs                    |   14 +-
 crates/catalog/hms/testdata/hms_catalog/Dockerfile |   22 +-
 crates/catalog/hms/tests/hms_catalog_test.rs       |    2 +-
 crates/catalog/loader/Cargo.toml                   |    5 +
 crates/catalog/loader/src/lib.rs                   |   34 +
 crates/catalog/rest/src/catalog.rs                 |    6 +-
 crates/catalog/rest/tests/rest_catalog_test.rs     |    2 +-
 crates/catalog/s3tables/Cargo.toml                 |    1 +
 crates/catalog/s3tables/src/catalog.rs             |  213 +-
 crates/catalog/sql/Cargo.toml                      |    5 +-
 crates/catalog/sql/src/catalog.rs                  |  511 ++-
 crates/catalog/sql/src/error.rs                    |    6 +-
 crates/catalog/sql/src/lib.rs                      |   37 +
 crates/examples/src/rest_catalog_namespace.rs      |    9 +-
 crates/examples/src/rest_catalog_table.rs          |    2 +-
 crates/iceberg/Cargo.toml                          |    5 +-
 .../src/arrow/caching_delete_file_loader.rs        |  253 +-
 crates/iceberg/src/arrow/delete_file_loader.rs     |   29 +-
 crates/iceberg/src/arrow/delete_filter.rs          |   10 +-
 crates/iceberg/src/arrow/mod.rs                    |   10 +-
 .../src/arrow/partition_value_calculator.rs        |  254 ++
 crates/iceberg/src/arrow/reader.rs                 | 2331 +++++++++++-
 .../src/arrow/record_batch_partition_splitter.rs   |  390 +-
 crates/iceberg/src/arrow/record_batch_projector.rs |   71 +-
 .../iceberg/src/arrow/record_batch_transformer.rs  | 1064 +++++-
 crates/iceberg/src/arrow/schema.rs                 |   28 +-
 crates/iceberg/src/arrow/value.rs                  |    6 +-
 crates/iceberg/src/catalog/memory/catalog.rs       |   78 +-
 .../iceberg/src/catalog/memory/namespace_state.rs  |   14 +-
 crates/iceberg/src/catalog/metadata_location.rs    |   11 +-
 crates/iceberg/src/catalog/mod.rs                  |  229 +-
 crates/iceberg/src/delete_file_index.rs            |  292 +-
 crates/iceberg/src/expr/predicate.rs               |   16 +-
 .../src/expr/visitors/manifest_evaluator.rs        |    1 +
 .../src/expr/visitors/page_index_evaluator.rs      |    9 +-
 .../src/expr/visitors/strict_metrics_evaluator.rs  |   45 +-
 crates/iceberg/src/io/file_io.rs                   |   12 -
 crates/iceberg/src/io/object_cache.rs              |   26 +-
 crates/iceberg/src/io/storage.rs                   |    2 +-
 crates/iceberg/src/io/storage_s3.rs                |    5 +-
 crates/iceberg/src/puffin/metadata.rs              |   13 +-
 crates/iceberg/src/scan/cache.rs                   |    2 +-
 crates/iceberg/src/scan/context.rs                 |    7 +
 crates/iceberg/src/scan/mod.rs                     |   65 +-
 crates/iceberg/src/scan/task.rs                    |   54 +-
 crates/iceberg/src/spec/datatypes.rs               |   65 +-
 crates/iceberg/src/spec/encrypted_key.rs           |   16 +-
 crates/iceberg/src/spec/manifest/data_file.rs      |   14 +-
 crates/iceberg/src/spec/manifest/entry.rs          |   40 +-
 crates/iceberg/src/spec/manifest/mod.rs            |    3 +-
 crates/iceberg/src/spec/manifest/writer.rs         |   76 +-
 crates/iceberg/src/spec/manifest_list.rs           |  676 +++-
 crates/iceberg/src/spec/mod.rs                     |    2 +
 crates/iceberg/src/spec/partition.rs               |   51 +-
 crates/iceberg/src/spec/schema/id_reassigner.rs    |    4 +-
 crates/iceberg/src/spec/schema/mod.rs              |   11 +-
 crates/iceberg/src/spec/schema/utils.rs            |    5 +-
 crates/iceberg/src/spec/snapshot.rs                |  140 +-
 crates/iceberg/src/spec/snapshot_summary.rs        |    8 +-
 crates/iceberg/src/spec/table_metadata.rs          |  694 +++-
 crates/iceberg/src/spec/table_metadata_builder.rs  |  557 ++-
 crates/iceberg/src/spec/table_properties.rs        |  284 ++
 crates/iceberg/src/spec/transform.rs               |   26 +-
 crates/iceberg/src/spec/values.rs                  | 3957 --------------------
 crates/iceberg/src/spec/values/datum.rs            | 1225 ++++++
 crates/iceberg/src/spec/values/literal.rs          |  747 ++++
 crates/iceberg/src/spec/values/map.rs              |  145 +
 .../src/spec/values}/mod.rs                        |   25 +-
 crates/iceberg/src/spec/values/primitive.rs        |   59 +
 crates/iceberg/src/spec/values/serde.rs            |  719 ++++
 crates/iceberg/src/spec/values/struct_value.rs     |   79 +
 crates/iceberg/src/spec/values/temporal.rs         |  105 +
 crates/iceberg/src/spec/values/tests.rs            | 1334 +++++++
 crates/iceberg/src/spec/view_metadata.rs           |    4 +-
 crates/iceberg/src/spec/view_metadata_builder.rs   |    8 +-
 crates/iceberg/src/table.rs                        |   11 +-
 crates/iceberg/src/transaction/append.rs           |    6 +-
 crates/iceberg/src/transaction/mod.rs              |  191 +-
 crates/iceberg/src/transaction/snapshot.rs         |  124 +-
 .../iceberg/src/transaction/update_properties.rs   |    5 +-
 crates/iceberg/src/transform/mod.rs                |   14 +-
 crates/iceberg/src/transform/temporal.rs           |   10 +-
 crates/iceberg/src/transform/truncate.rs           |    4 +-
 .../src/writer/base_writer/data_file_writer.rs     |  163 +-
 .../writer/base_writer/equality_delete_writer.rs   |  162 +-
 .../src/writer/file_writer/location_generator.rs   |   10 +-
 crates/iceberg/src/writer/file_writer/mod.rs       |    6 +-
 .../src/writer/file_writer/parquet_writer.rs       |  258 +-
 .../src/writer/file_writer/rolling_writer.rs       |  218 +-
 crates/iceberg/src/writer/mod.rs                   |  192 +-
 .../src/writer/partitioning/clustered_writer.rs    |  517 +++
 .../src/writer/partitioning/fanout_writer.rs       |  384 ++
 crates/iceberg/src/writer/partitioning/mod.rs      |   56 +
 .../writer/partitioning/unpartitioned_writer.rs    |  198 +
 .../TableMetadataV3ValidMinimal.json               |   74 +
 crates/iceberg/tests/file_io_s3_test.rs            |    6 +-
 crates/integration_tests/src/lib.rs                |    2 +-
 crates/integration_tests/testdata/spark/Dockerfile |    2 +-
 .../tests/shared_tests/append_data_file_test.rs    |    9 +-
 .../append_partition_data_file_test.rs             |   82 +-
 .../tests/shared_tests/conflict_commit_test.rs     |    9 +-
 .../tests/shared_tests/datafusion.rs               |    4 +-
 .../tests/shared_tests/read_positional_deletes.rs  |    4 +-
 .../tests/shared_tests/scan_all_type.rs            |    9 +-
 crates/integrations/datafusion/src/error.rs        |    2 +-
 crates/integrations/datafusion/src/lib.rs          |    2 +
 .../datafusion/src/physical_plan/commit.rs         |  128 +-
 .../datafusion/src/physical_plan/mod.rs            |    4 +
 .../datafusion/src/physical_plan/project.rs        |  380 ++
 .../datafusion/src/physical_plan/repartition.rs    |  885 +++++
 .../datafusion/src/physical_plan/sort.rs           |  244 ++
 .../datafusion/src/physical_plan/write.rs          |  114 +-
 crates/integrations/datafusion/src/schema.rs       |    6 +-
 crates/integrations/datafusion/src/table/mod.rs    |  468 ++-
 .../datafusion/src/table/table_provider_factory.rs |   10 +-
 crates/integrations/datafusion/src/task_writer.rs  |  517 +++
 .../tests/integration_datafusion_test.rs           |  156 +-
 crates/integrations/playground/Cargo.toml          |    3 +-
 crates/integrations/playground/src/main.rs         |    2 +-
 crates/sqllogictest/Cargo.toml                     |   12 +
 crates/sqllogictest/src/engine/datafusion.rs       |   56 +-
 crates/sqllogictest/src/engine/mod.rs              |   73 +-
 crates/sqllogictest/src/error.rs                   |    6 +
 crates/sqllogictest/src/lib.rs                     |    7 +-
 crates/sqllogictest/src/schedule.rs                |   24 +-
 .../sqllogictest/testdata/schedules/df_test.toml   |   23 +-
 .../testdata/slts/df_test/show_tables.slt          |   45 +-
 crates/sqllogictest/tests/sqllogictests.rs         |   87 +
 .../0001_modularize_iceberg_implementations.md     |  120 +
 scripts/release.sh                                 |    2 +-
 website/src/download.md                            |    4 +-
 website/src/reference/orbstack.md                  |    4 +-
 website/src/release.md                             |   14 +-
 164 files changed, 20194 insertions(+), 8065 deletions(-)

diff --cc crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 9cf605680,192ca390a..d3e943200
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@@ -686,4 -701,215 +701,216 @@@ mod tests 
          let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
          assert!(result.is_none()); // no pos dels for file 3
      }
+ 
+     /// Verifies that evolve_schema on partial-schema equality deletes works 
correctly
+     /// when only equality_ids columns are evolved, not all table columns.
+     ///
+     /// Per the [Iceberg 
spec](https://iceberg.apache.org/spec/#equality-delete-files),
+     /// equality delete files can contain only a subset of columns.
+     #[tokio::test]
+     async fn test_partial_schema_equality_deletes_evolve_succeeds() {
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
+ 
+         // Create table schema with REQUIRED fields
+         let table_schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     crate::spec::NestedField::required(
+                         1,
+                         "id",
+                         
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
+                     )
+                     .into(),
+                     crate::spec::NestedField::required(
+                         2,
+                         "data",
+                         
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
+                     )
+                     .into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         // Write equality delete file with PARTIAL schema (only 'data' column)
+         let delete_file_path = {
+             let data_vals = vec!["a", "d", "g"];
+             let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;
+ 
+             let delete_schema = 
Arc::new(arrow_schema::Schema::new(vec![simple_field(
+                 "data",
+                 DataType::Utf8,
+                 false,
+                 "2", // field ID
+             )]));
+ 
+             let delete_batch = RecordBatch::try_new(delete_schema.clone(), 
vec![data_col]).unwrap();
+ 
+             let path = format!("{}/partial-eq-deletes.parquet", 
&table_location);
+             let file = File::create(&path).unwrap();
+             let props = WriterProperties::builder()
+                 .set_compression(Compression::SNAPPY)
+                 .build();
+             let mut writer =
+                 ArrowWriter::try_new(file, delete_batch.schema(), 
Some(props)).unwrap();
+             writer.write(&delete_batch).expect("Writing batch");
+             writer.close().unwrap();
+             path
+         };
+ 
+         let file_io = 
FileIO::from_path(table_location).unwrap().build().unwrap();
+         let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
+ 
+         let batch_stream = basic_delete_file_loader
+             .parquet_to_batch_stream(&delete_file_path)
+             .await
+             .unwrap();
+ 
+         // Only evolve the equality_ids columns (field 2), not all table 
columns
+         let equality_ids = vec![2];
+         let evolved_stream =
+             BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, 
&equality_ids)
+                 .await
+                 .unwrap();
+ 
+         let result = evolved_stream.try_collect::<Vec<_>>().await;
+ 
+         assert!(
+             result.is_ok(),
+             "Expected success when evolving only equality_ids columns, got 
error: {:?}",
+             result.err()
+         );
+ 
+         let batches = result.unwrap();
+         assert_eq!(batches.len(), 1);
+ 
+         let batch = &batches[0];
+         assert_eq!(batch.num_rows(), 3);
+         assert_eq!(batch.num_columns(), 1); // Only 'data' column
+ 
+         // Verify the actual values are preserved after schema evolution
+         let data_col = batch.column(0).as_string::<i32>();
+         assert_eq!(data_col.value(0), "a");
+         assert_eq!(data_col.value(1), "d");
+         assert_eq!(data_col.value(2), "g");
+     }
+ 
+     /// Test loading a FileScanTask with BOTH positional and equality deletes.
+     /// Verifies the fix for the inverted condition that caused "Missing 
predicate for equality delete file" errors.
+     #[tokio::test]
+     async fn test_load_deletes_with_mixed_types() {
+         use crate::scan::FileScanTask;
+         use crate::spec::{DataFileFormat, Schema};
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path();
+         let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+             .unwrap()
+             .build()
+             .unwrap();
+ 
+         // Create the data file schema
+         let data_file_schema = Arc::new(
+             Schema::builder()
+                 .with_fields(vec![
+                     crate::spec::NestedField::optional(
+                         2,
+                         "y",
+                         
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
+                     )
+                     .into(),
+                     crate::spec::NestedField::optional(
+                         3,
+                         "z",
+                         
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
+                     )
+                     .into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         // Write positional delete file
+         let positional_delete_schema = 
crate::arrow::delete_filter::tests::create_pos_del_schema();
+         let file_path_values =
+             vec![format!("{}/data-1.parquet", 
table_location.to_str().unwrap()); 4];
+         let file_path_col = 
Arc::new(StringArray::from_iter_values(&file_path_values));
+         let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 
3]));
+ 
+         let positional_deletes_to_write =
+             RecordBatch::try_new(positional_delete_schema.clone(), vec![
+                 file_path_col,
+                 pos_col,
+             ])
+             .unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let pos_del_path = format!("{}/pos-del-mixed.parquet", 
table_location.to_str().unwrap());
+         let file = File::create(&pos_del_path).unwrap();
+         let mut writer = ArrowWriter::try_new(
+             file,
+             positional_deletes_to_write.schema(),
+             Some(props.clone()),
+         )
+         .unwrap();
+         writer.write(&positional_deletes_to_write).unwrap();
+         writer.close().unwrap();
+ 
+         // Write equality delete file
+         let eq_delete_path = 
setup_write_equality_delete_file_1(table_location.to_str().unwrap());
+ 
+         // Create FileScanTask with BOTH positional and equality deletes
+         let pos_del = FileScanTaskDeleteFile {
+             file_path: pos_del_path,
+             file_type: DataContentType::PositionDeletes,
+             partition_spec_id: 0,
+             equality_ids: None,
+         };
+ 
+         let eq_del = FileScanTaskDeleteFile {
+             file_path: eq_delete_path.clone(),
+             file_type: DataContentType::EqualityDeletes,
+             partition_spec_id: 0,
+             equality_ids: Some(vec![2, 3]), // Only use field IDs that exist 
in both schemas
+         };
+ 
+         let file_scan_task = FileScanTask {
+             start: 0,
+             length: 0,
+             record_count: None,
+             data_file_path: format!("{}/data-1.parquet", 
table_location.to_str().unwrap()),
+             data_file_format: DataFileFormat::Parquet,
+             schema: data_file_schema.clone(),
+             project_field_ids: vec![2, 3],
+             predicate: None,
+             deletes: vec![pos_del, eq_del],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
++            limit: None,
+         };
+ 
+         // Load the deletes - should handle both types without error
+         let delete_file_loader = 
CachingDeleteFileLoader::new(file_io.clone(), 10);
+         let delete_filter = delete_file_loader
+             .load_deletes(&file_scan_task.deletes, 
file_scan_task.schema_ref())
+             .await
+             .unwrap()
+             .unwrap();
+ 
+         // Verify both delete types can be processed together
+         let result = delete_filter
+             .build_equality_delete_predicate(&file_scan_task)
+             .await;
+         assert!(
+             result.is_ok(),
+             "Failed to build equality delete predicate: {:?}",
+             result.err()
+         );
+     }
  }
diff --cc crates/iceberg/src/arrow/delete_filter.rs
index b29f80886,14b5124ee..03be7338b
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@@ -339,7 -341,9 +341,10 @@@ pub(crate) mod tests 
                  project_field_ids: vec![],
                  predicate: None,
                  deletes: vec![pos_del_1, pos_del_2.clone()],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
 +                limit: None,
              },
              FileScanTask {
                  start: 0,
@@@ -351,7 -355,9 +356,10 @@@
                  project_field_ids: vec![],
                  predicate: None,
                  deletes: vec![pos_del_3],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
 +                limit: None,
              },
          ];
  
diff --cc crates/iceberg/src/arrow/reader.rs
index c6f005474,ab5a96f75..81a9fa727
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@@ -176,13 -178,15 +178,16 @@@ impl ArrowReader 
          row_group_filtering_enabled: bool,
          row_selection_enabled: bool,
      ) -> Result<ArrowRecordBatchStream> {
 -        let should_load_page_index =
 -            (row_selection_enabled && task.predicate.is_some()) || 
!task.deletes.is_empty();
 +        let should_load_page_index = (row_selection_enabled && 
task.predicate.is_some())
 +            || !task.deletes.is_empty()
 +            || task.limit.is_some();
  
-         let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, 
task.schema.clone());
+         let delete_filter_rx =
+             delete_file_loader.load_deletes(&task.deletes, 
Arc::clone(&task.schema));
  
-         let mut record_batch_stream_builder = 
Self::create_parquet_record_batch_stream_builder(
+         // Migrated tables lack field IDs, requiring us to inspect the schema 
to choose
+         // between field-ID-based or position-based projection
+         let initial_stream_builder = 
Self::create_parquet_record_batch_stream_builder(
              &task.data_file_path,
              file_io.clone(),
              should_load_page_index,
@@@ -344,11 -440,9 +445,11 @@@
              .with_preload_page_index(should_load_page_index);
  
          // Create the record batch stream builder, which wraps the parquet 
file reader
-         let record_batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new_with_options(
-             parquet_file_reader,
-             ArrowReaderOptions::new().with_page_index(should_load_page_index),
-         )
-         .await?;
 -        let options = arrow_reader_options.unwrap_or_default();
++        let options = arrow_reader_options
++            .unwrap_or_default()
++            .with_page_index(should_load_page_index);
+         let record_batch_stream_builder =
+             
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, 
options).await?;
          Ok(record_batch_stream_builder)
      }
  
@@@ -1750,7 -2061,9 +2068,10 @@@ message schema 
                  project_field_ids: vec![1],
                  predicate: Some(predicate.bind(schema, true).unwrap()),
                  deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
 +                limit: None,
              })]
              .into_iter(),
          )) as FileScanTaskStream;
@@@ -1964,4 -2277,1748 +2285,1762 @@@
  
          Arc::new(SchemaDescriptor::new(Arc::new(schema)))
      }
+ 
+     /// Verifies that file splits respect byte ranges and only read specific 
row groups.
+     #[tokio::test]
+     async fn test_file_splits_respect_byte_ranges() {
+         use arrow_array::Int32Array;
+         use parquet::file::reader::{FileReader, SerializedFileReader};
+ 
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "1".to_string(),
+             )])),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_path = format!("{}/multi_row_group.parquet", 
&table_location);
+ 
+         // Force each batch into its own row group for testing byte range 
filtering.
+         let batch1 = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(Int32Array::from(
+             (0..100).collect::<Vec<i32>>(),
+         ))])
+         .unwrap();
+         let batch2 = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(Int32Array::from(
+             (100..200).collect::<Vec<i32>>(),
+         ))])
+         .unwrap();
+         let batch3 = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(Int32Array::from(
+             (200..300).collect::<Vec<i32>>(),
+         ))])
+         .unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .set_max_row_group_size(100)
+             .build();
+ 
+         let file = File::create(&file_path).unwrap();
+         let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+         writer.write(&batch1).expect("Writing batch 1");
+         writer.write(&batch2).expect("Writing batch 2");
+         writer.write(&batch3).expect("Writing batch 3");
+         writer.close().unwrap();
+ 
+         // Read the file metadata to get row group byte positions
+         let file = File::open(&file_path).unwrap();
+         let reader = SerializedFileReader::new(file).unwrap();
+         let metadata = reader.metadata();
+ 
+         println!("File has {} row groups", metadata.num_row_groups());
+         assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
+ 
+         // Get byte positions for each row group
+         let row_group_0 = metadata.row_group(0);
+         let row_group_1 = metadata.row_group(1);
+         let row_group_2 = metadata.row_group(2);
+ 
+         let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+         let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+         let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
+         let file_end = rg2_start + row_group_2.compressed_size() as u64;
+ 
+         println!(
+             "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
+             row_group_0.num_rows(),
+             rg0_start,
+             row_group_0.compressed_size()
+         );
+         println!(
+             "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
+             row_group_1.num_rows(),
+             rg1_start,
+             row_group_1.compressed_size()
+         );
+         println!(
+             "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
+             row_group_2.num_rows(),
+             rg2_start,
+             row_group_2.compressed_size()
+         );
+ 
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         // Task 1: read only the first row group
+         let task1 = FileScanTask {
+             start: rg0_start,
+             length: row_group_0.compressed_size() as u64,
+             record_count: Some(100),
+             data_file_path: file_path.clone(),
+             data_file_format: DataFileFormat::Parquet,
+             schema: schema.clone(),
+             project_field_ids: vec![1],
+             predicate: None,
+             deletes: vec![],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
++            limit: None,
+         };
+ 
+         // Task 2: read the second and third row groups
+         let task2 = FileScanTask {
+             start: rg1_start,
+             length: file_end - rg1_start,
+             record_count: Some(200),
+             data_file_path: file_path.clone(),
+             data_file_format: DataFileFormat::Parquet,
+             schema: schema.clone(),
+             project_field_ids: vec![1],
+             predicate: None,
+             deletes: vec![],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
++            limit: None,
+         };
+ 
+         let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as 
FileScanTaskStream;
+         let result1 = reader
+             .clone()
+             .read(tasks1)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         let total_rows_task1: usize = result1.iter().map(|b| 
b.num_rows()).sum();
+         println!(
+             "Task 1 (bytes {}-{}) returned {} rows",
+             rg0_start,
+             rg0_start + row_group_0.compressed_size() as u64,
+             total_rows_task1
+         );
+ 
+         let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as 
FileScanTaskStream;
+         let result2 = reader
+             .read(tasks2)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         let total_rows_task2: usize = result2.iter().map(|b| 
b.num_rows()).sum();
+         println!("Task 2 (bytes {rg1_start}-{file_end}) returned 
{total_rows_task2} rows");
+ 
+         assert_eq!(
+             total_rows_task1, 100,
+             "Task 1 should read only the first row group (100 rows), but got 
{total_rows_task1} rows"
+         );
+ 
+         assert_eq!(
+             total_rows_task2, 200,
+             "Task 2 should read only the second+third row groups (200 rows), 
but got {total_rows_task2} rows"
+         );
+ 
+         // Verify the actual data values are correct (not just the row count)
+         if total_rows_task1 > 0 {
+             let first_batch = &result1[0];
+             let id_col = first_batch
+                 .column(0)
+                 .as_primitive::<arrow_array::types::Int32Type>();
+             let first_val = id_col.value(0);
+             let last_val = id_col.value(id_col.len() - 1);
+             println!("Task 1 data range: {first_val} to {last_val}");
+ 
+             assert_eq!(first_val, 0, "Task 1 should start with id=0");
+             assert_eq!(last_val, 99, "Task 1 should end with id=99");
+         }
+ 
+         if total_rows_task2 > 0 {
+             let first_batch = &result2[0];
+             let id_col = first_batch
+                 .column(0)
+                 .as_primitive::<arrow_array::types::Int32Type>();
+             let first_val = id_col.value(0);
+             println!("Task 2 first value: {first_val}");
+ 
+             assert_eq!(first_val, 100, "Task 2 should start with id=100, not 
id=0");
+         }
+     }
+ 
+     /// Test schema evolution: reading old Parquet file (with only column 'a')
+     /// using a newer table schema (with columns 'a' and 'b').
+     /// This tests that:
+     /// 1. get_arrow_projection_mask allows missing columns
+     /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
+     #[tokio::test]
+     async fn test_schema_evolution_add_column() {
+         use arrow_array::{Array, Int32Array};
+ 
+         // New table schema: columns 'a' and 'b' (b was added later, file 
only has 'a')
+         let new_schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(2)
+                 .with_fields(vec![
+                     NestedField::required(1, "a", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::optional(2, "b", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         // Create Arrow schema for old Parquet file (only has column 'a')
+         let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
+             Field::new("a", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "1".to_string(),
+             )])),
+         ]));
+ 
+         // Write old Parquet file with only column 'a'
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+         let to_write = RecordBatch::try_new(arrow_schema_old.clone(), 
vec![data_a]).unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+         let file = File::create(format!("{}/old_file.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         // Read the old Parquet file using the NEW schema (with column 'b')
+         let reader = ArrowReaderBuilder::new(file_io).build();
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{table_location}/old_file.parquet"),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: new_schema.clone(),
+                 project_field_ids: vec![1, 2], // Request both columns 'a' 
and 'b'
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         // Verify we got the correct data
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+ 
+         // Should have 2 columns now
+         assert_eq!(batch.num_columns(), 2);
+         assert_eq!(batch.num_rows(), 3);
+ 
+         // Column 'a' should have the original data
+         let col_a = batch
+             .column(0)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(col_a.values(), &[1, 2, 3]);
+ 
+         // Column 'b' should be all NULLs (it didn't exist in the old file)
+         let col_b = batch
+             .column(1)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(col_b.null_count(), 3);
+         assert!(col_b.is_null(0));
+         assert!(col_b.is_null(1));
+         assert!(col_b.is_null(2));
+     }
+ 
+     /// Test for bug where position deletes in later row groups are not 
applied correctly.
+     ///
+     /// When a file has multiple row groups and a position delete targets a 
row in a later
+     /// row group, the `build_deletes_row_selection` function had a bug where 
it would
+     /// fail to increment `current_row_group_base_idx` when skipping row 
groups.
+     ///
+     /// This test creates:
+     /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+     /// - A position delete file that deletes row 199 (last row in second row 
group)
+     ///
+     /// Expected behavior: Should return 199 rows (with id=200 deleted)
+     /// Bug behavior: Returns 200 rows (delete is not applied)
+     ///
+     /// This bug was discovered while running Apache Spark + Apache Iceberg 
integration tests
+     /// through DataFusion Comet. The following Iceberg Java tests failed due 
to this bug:
+     /// - 
`org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
+     /// - 
`org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
+     #[tokio::test]
+     async fn test_position_delete_across_multiple_row_groups() {
+         use arrow_array::{Int32Array, Int64Array};
+         use parquet::file::reader::{FileReader, SerializedFileReader};
+ 
+         // Field IDs for positional delete schema
+         const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+         const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ 
+         // Create table schema with a single 'id' column
+         let table_schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "1".to_string(),
+             )])),
+         ]));
+ 
+         // Step 1: Create data file with 200 rows in 2 row groups
+         // Row group 0: rows 0-99 (ids 1-100)
+         // Row group 1: rows 100-199 (ids 101-200)
+         let data_file_path = format!("{}/data.parquet", &table_location);
+ 
+         let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+             Int32Array::from_iter_values(1..=100),
+         )])
+         .unwrap();
+ 
+         let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+             Int32Array::from_iter_values(101..=200),
+         )])
+         .unwrap();
+ 
+         // Force each batch into its own row group
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .set_max_row_group_size(100)
+             .build();
+ 
+         let file = File::create(&data_file_path).unwrap();
+         let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+         writer.write(&batch1).expect("Writing batch 1");
+         writer.write(&batch2).expect("Writing batch 2");
+         writer.close().unwrap();
+ 
+         // Verify we created 2 row groups
+         let verify_file = File::open(&data_file_path).unwrap();
+         let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+         assert_eq!(
+             verify_reader.metadata().num_row_groups(),
+             2,
+             "Should have 2 row groups"
+         );
+ 
+         // Step 2: Create position delete file that deletes row 199 (id=200, 
last row in row group 1)
+         let delete_file_path = format!("{}/deletes.parquet", &table_location);
+ 
+         let delete_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("file_path", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+             )])),
+             Field::new("pos", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+             )])),
+         ]));
+ 
+         // Delete row at position 199 (0-indexed, so it's the last row: 
id=200)
+         let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+             
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+             Arc::new(Int64Array::from_iter_values(vec![199i64])),
+         ])
+         .unwrap();
+ 
+         let delete_props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let delete_file = File::create(&delete_file_path).unwrap();
+         let mut delete_writer =
+             ArrowWriter::try_new(delete_file, delete_schema, 
Some(delete_props)).unwrap();
+         delete_writer.write(&delete_batch).unwrap();
+         delete_writer.close().unwrap();
+ 
+         // Step 3: Read the data file with the delete applied
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let task = FileScanTask {
+             start: 0,
+             length: 0,
+             record_count: Some(200),
+             data_file_path: data_file_path.clone(),
+             data_file_format: DataFileFormat::Parquet,
+             schema: table_schema.clone(),
+             project_field_ids: vec![1],
+             predicate: None,
+             deletes: vec![FileScanTaskDeleteFile {
+                 file_path: delete_file_path,
+                 file_type: DataContentType::PositionDeletes,
+                 partition_spec_id: 0,
+                 equality_ids: None,
+             }],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
++            limit: None,
+         };
+ 
+         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         // Step 4: Verify we got 199 rows (not 200)
+         let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+ 
+         println!("Total rows read: {}", total_rows);
+         println!("Expected: 199 rows (deleted row 199 which had id=200)");
+ 
+         // This assertion will FAIL before the fix and PASS after the fix
+         assert_eq!(
+             total_rows, 199,
+             "Expected 199 rows after deleting row 199, but got {} rows. \
+              The bug causes position deletes in later row groups to be 
ignored.",
+             total_rows
+         );
+ 
+         // Verify the deleted row (id=200) is not present
+         let all_ids: Vec<i32> = result
+             .iter()
+             .flat_map(|batch| {
+                 batch
+                     .column(0)
+                     .as_primitive::<arrow_array::types::Int32Type>()
+                     .values()
+                     .iter()
+                     .copied()
+             })
+             .collect();
+ 
+         assert!(
+             !all_ids.contains(&200),
+             "Row with id=200 should be deleted but was found in results"
+         );
+ 
+         // Verify we have all other ids (1-199)
+         let expected_ids: Vec<i32> = (1..=199).collect();
+         assert_eq!(
+             all_ids, expected_ids,
+             "Should have ids 1-199 but got different values"
+         );
+     }
+ 
+     /// Test for bug where position deletes are lost when skipping unselected 
row groups.
+     ///
+     /// This is a variant of 
`test_position_delete_across_multiple_row_groups` that exercises
+     /// the row group selection code path (`selected_row_groups: 
Some([...])`).
+     ///
+     /// When a file has multiple row groups and only some are selected for 
reading,
+     /// the `build_deletes_row_selection` function must correctly skip over 
deletes in
+     /// unselected row groups WITHOUT consuming deletes that belong to 
selected row groups.
+     ///
+     /// This test creates:
+     /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+     /// - A position delete file that deletes row 199 (last row in second row 
group)
+     /// - Row group selection that reads ONLY row group 1 (rows 100-199)
+     ///
+     /// Expected behavior: Should return 99 rows (with row 199 deleted)
+     /// Bug behavior: Returns 100 rows (delete is lost when skipping row 
group 0)
+     ///
+     /// The bug occurs when processing row group 0 (unselected):
+     /// ```rust
+     /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position 
at first delete >= 100
+     /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: 
Consumes delete at 199!
+     /// ```
+     ///
+     /// The fix is to NOT call `next()` after `advance_to()` when skipping 
unselected row groups,
+     /// because `advance_to()` already positions the iterator correctly 
without consuming elements.
+     #[tokio::test]
+     async fn test_position_delete_with_row_group_selection() {
+         use arrow_array::{Int32Array, Int64Array};
+         use parquet::file::reader::{FileReader, SerializedFileReader};
+ 
+         // Field IDs for positional delete schema
+         const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+         const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ 
+         // Create table schema with a single 'id' column
+         let table_schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "1".to_string(),
+             )])),
+         ]));
+ 
+         // Step 1: Create data file with 200 rows in 2 row groups
+         // Row group 0: rows 0-99 (ids 1-100)
+         // Row group 1: rows 100-199 (ids 101-200)
+         let data_file_path = format!("{}/data.parquet", &table_location);
+ 
+         let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+             Int32Array::from_iter_values(1..=100),
+         )])
+         .unwrap();
+ 
+         let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+             Int32Array::from_iter_values(101..=200),
+         )])
+         .unwrap();
+ 
+         // Force each batch into its own row group
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .set_max_row_group_size(100)
+             .build();
+ 
+         let file = File::create(&data_file_path).unwrap();
+         let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+         writer.write(&batch1).expect("Writing batch 1");
+         writer.write(&batch2).expect("Writing batch 2");
+         writer.close().unwrap();
+ 
+         // Verify we created 2 row groups
+         let verify_file = File::open(&data_file_path).unwrap();
+         let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+         assert_eq!(
+             verify_reader.metadata().num_row_groups(),
+             2,
+             "Should have 2 row groups"
+         );
+ 
+         // Step 2: Create position delete file that deletes row 199 (id=200, 
last row in row group 1)
+         let delete_file_path = format!("{}/deletes.parquet", &table_location);
+ 
+         let delete_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("file_path", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+             )])),
+             Field::new("pos", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+             )])),
+         ]));
+ 
+         // Delete row at position 199 (0-indexed, so it's the last row: 
id=200)
+         let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+             
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+             Arc::new(Int64Array::from_iter_values(vec![199i64])),
+         ])
+         .unwrap();
+ 
+         let delete_props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let delete_file = File::create(&delete_file_path).unwrap();
+         let mut delete_writer =
+             ArrowWriter::try_new(delete_file, delete_schema, 
Some(delete_props)).unwrap();
+         delete_writer.write(&delete_batch).unwrap();
+         delete_writer.close().unwrap();
+ 
+         // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
+         // This exercises the row group selection code path where row group 0 
is skipped
+         let metadata_file = File::open(&data_file_path).unwrap();
+         let metadata_reader = 
SerializedFileReader::new(metadata_file).unwrap();
+         let metadata = metadata_reader.metadata();
+ 
+         let row_group_0 = metadata.row_group(0);
+         let row_group_1 = metadata.row_group(1);
+ 
+         let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+         let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+         let rg1_length = row_group_1.compressed_size() as u64;
+ 
+         println!(
+             "Row group 0: starts at byte {}, {} bytes compressed",
+             rg0_start,
+             row_group_0.compressed_size()
+         );
+         println!(
+             "Row group 1: starts at byte {}, {} bytes compressed",
+             rg1_start,
+             row_group_1.compressed_size()
+         );
+ 
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
+         let task = FileScanTask {
+             start: rg1_start,
+             length: rg1_length,
+             record_count: Some(100), // Row group 1 has 100 rows
+             data_file_path: data_file_path.clone(),
+             data_file_format: DataFileFormat::Parquet,
+             schema: table_schema.clone(),
+             project_field_ids: vec![1],
+             predicate: None,
+             deletes: vec![FileScanTaskDeleteFile {
+                 file_path: delete_file_path,
+                 file_type: DataContentType::PositionDeletes,
+                 partition_spec_id: 0,
+                 equality_ids: None,
+             }],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
++            limit: None,
+         };
+ 
+         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         // Step 4: Verify we got 99 rows (not 100)
+         // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 
99 rows
+         let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+ 
+         println!("Total rows read from row group 1: {}", total_rows);
+         println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at 
position 199)");
+ 
+         // This assertion will FAIL before the fix and PASS after the fix
+         assert_eq!(
+             total_rows, 99,
+             "Expected 99 rows from row group 1 after deleting position 199, 
but got {} rows. \
+              The bug causes position deletes to be lost when advance_to() is 
followed by next() \
+              when skipping unselected row groups.",
+             total_rows
+         );
+ 
+         // Verify the deleted row (id=200) is not present
+         let all_ids: Vec<i32> = result
+             .iter()
+             .flat_map(|batch| {
+                 batch
+                     .column(0)
+                     .as_primitive::<arrow_array::types::Int32Type>()
+                     .values()
+                     .iter()
+                     .copied()
+             })
+             .collect();
+ 
+         assert!(
+             !all_ids.contains(&200),
+             "Row with id=200 should be deleted but was found in results"
+         );
+ 
+         // Verify we have ids 101-199 (not 101-200)
+         let expected_ids: Vec<i32> = (101..=199).collect();
+         assert_eq!(
+             all_ids, expected_ids,
+             "Should have ids 101-199 but got different values"
+         );
+     }
+     /// Test for bug where stale cached delete causes infinite loop when 
skipping row groups.
+     ///
+     /// This test exposes the inverse scenario of 
`test_position_delete_with_row_group_selection`:
+     /// - Position delete targets a row in the SKIPPED row group (not the 
selected one)
+     /// - After calling advance_to(), the cached delete index is stale
+     /// - Without updating the cache, the code enters an infinite loop
+     ///
+     /// This test creates:
+     /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
+     /// - A position delete file that deletes row 0 (first row in SKIPPED row 
group 0)
+     /// - Row group selection that reads ONLY row group 1 (rows 100-199)
+     ///
+     /// The bug occurs when skipping row group 0:
+     /// ```rust
+     /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // 
Some(0)
+     /// // ... skip to row group 1 ...
+     /// delete_vector_iter.advance_to(100); // Iterator advances past delete 
at 0
+     /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
+     /// // When processing row group 1:
+     /// //   current_idx = 100, next_deleted_row_idx = 0, 
next_row_group_base_idx = 200
+     /// //   Loop condition: 0 < 200 (true)
+     /// //   But: current_idx (100) > next_deleted_row_idx (0)
+     /// //   And: current_idx (100) != next_deleted_row_idx (0)
+     /// //   Neither branch executes -> INFINITE LOOP!
+     /// ```
+     ///
+     /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect 
row group 1)
+     /// Bug behavior: Infinite loop in build_deletes_row_selection
+     #[tokio::test]
+     async fn test_position_delete_in_skipped_row_group() {
+         use arrow_array::{Int32Array, Int64Array};
+         use parquet::file::reader::{FileReader, SerializedFileReader};
+ 
+         // Field IDs for positional delete schema
+         const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
+         const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ 
+         // Create table schema with a single 'id' column
+         let table_schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "1".to_string(),
+             )])),
+         ]));
+ 
+         // Step 1: Create data file with 200 rows in 2 row groups
+         // Row group 0: rows 0-99 (ids 1-100)
+         // Row group 1: rows 100-199 (ids 101-200)
+         let data_file_path = format!("{}/data.parquet", &table_location);
+ 
+         let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+             Int32Array::from_iter_values(1..=100),
+         )])
+         .unwrap();
+ 
+         let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
+             Int32Array::from_iter_values(101..=200),
+         )])
+         .unwrap();
+ 
+         // Force each batch into its own row group
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .set_max_row_group_size(100)
+             .build();
+ 
+         let file = File::create(&data_file_path).unwrap();
+         let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+         writer.write(&batch1).expect("Writing batch 1");
+         writer.write(&batch2).expect("Writing batch 2");
+         writer.close().unwrap();
+ 
+         // Verify we created 2 row groups
+         let verify_file = File::open(&data_file_path).unwrap();
+         let verify_reader = SerializedFileReader::new(verify_file).unwrap();
+         assert_eq!(
+             verify_reader.metadata().num_row_groups(),
+             2,
+             "Should have 2 row groups"
+         );
+ 
+         // Step 2: Create position delete file that deletes row 0 (id=1, 
first row in row group 0)
+         let delete_file_path = format!("{}/deletes.parquet", &table_location);
+ 
+         let delete_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("file_path", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
+             )])),
+             Field::new("pos", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
+             )])),
+         ]));
+ 
+         // Delete row at position 0 (0-indexed, so it's the first row: id=1)
+         let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
+             
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
+             Arc::new(Int64Array::from_iter_values(vec![0i64])),
+         ])
+         .unwrap();
+ 
+         let delete_props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let delete_file = File::create(&delete_file_path).unwrap();
+         let mut delete_writer =
+             ArrowWriter::try_new(delete_file, delete_schema, 
Some(delete_props)).unwrap();
+         delete_writer.write(&delete_batch).unwrap();
+         delete_writer.close().unwrap();
+ 
+         // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
+         // This exercises the row group selection code path where row group 0 
is skipped
+         let metadata_file = File::open(&data_file_path).unwrap();
+         let metadata_reader = 
SerializedFileReader::new(metadata_file).unwrap();
+         let metadata = metadata_reader.metadata();
+ 
+         let row_group_0 = metadata.row_group(0);
+         let row_group_1 = metadata.row_group(1);
+ 
+         let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
+         let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
+         let rg1_length = row_group_1.compressed_size() as u64;
+ 
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
+         let task = FileScanTask {
+             start: rg1_start,
+             length: rg1_length,
+             record_count: Some(100), // Row group 1 has 100 rows
+             data_file_path: data_file_path.clone(),
+             data_file_format: DataFileFormat::Parquet,
+             schema: table_schema.clone(),
+             project_field_ids: vec![1],
+             predicate: None,
+             deletes: vec![FileScanTaskDeleteFile {
+                 file_path: delete_file_path,
+                 file_type: DataContentType::PositionDeletes,
+                 partition_spec_id: 0,
+                 equality_ids: None,
+             }],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
++            limit: None,
+         };
+ 
+         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         // Step 4: Verify we got 100 rows (all of row group 1)
+         // The delete at position 0 is in row group 0, which is skipped, so 
it doesn't affect us
+         let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+ 
+         assert_eq!(
+             total_rows, 100,
+             "Expected 100 rows from row group 1 (delete at position 0 is in 
skipped row group 0). \
+              If this hangs or fails, it indicates the cached delete index was 
not updated after advance_to()."
+         );
+ 
+         // Verify we have all ids from row group 1 (101-200)
+         let all_ids: Vec<i32> = result
+             .iter()
+             .flat_map(|batch| {
+                 batch
+                     .column(0)
+                     .as_primitive::<arrow_array::types::Int32Type>()
+                     .values()
+                     .iter()
+                     .copied()
+             })
+             .collect();
+ 
+         let expected_ids: Vec<i32> = (101..=200).collect();
+         assert_eq!(
+             all_ids, expected_ids,
+             "Should have ids 101-200 (all of row group 1)"
+         );
+     }
+ 
+     /// Test reading Parquet files without field ID metadata (e.g., migrated 
tables).
+     /// This exercises the position-based fallback path.
+     ///
+     /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + 
pruneColumnsFallback()
+     /// in 
/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
+     #[tokio::test]
+     async fn test_read_parquet_file_without_field_ids() {
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                     NestedField::required(2, "age", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         // Parquet file from a migrated table - no field ID metadata
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("name", DataType::Utf8, false),
+             Field::new("age", DataType::Int32, false),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let name_data = vec!["Alice", "Bob", "Charlie"];
+         let age_data = vec![30, 25, 35];
+ 
+         use arrow_array::Int32Array;
+         let name_col = Arc::new(StringArray::from(name_data.clone())) as 
ArrayRef;
+         let age_col = Arc::new(Int32Array::from(age_data.clone())) as 
ArrayRef;
+ 
+         let to_write = RecordBatch::try_new(arrow_schema.clone(), 
vec![name_col, age_col]).unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+ 
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 2],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+         assert_eq!(batch.num_rows(), 3);
+         assert_eq!(batch.num_columns(), 2);
+ 
+         // Verify position-based mapping: field_id 1 → position 0, field_id 2 
→ position 1
+         let name_array = batch.column(0).as_string::<i32>();
+         assert_eq!(name_array.value(0), "Alice");
+         assert_eq!(name_array.value(1), "Bob");
+         assert_eq!(name_array.value(2), "Charlie");
+ 
+         let age_array = batch
+             .column(1)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(age_array.value(0), 30);
+         assert_eq!(age_array.value(1), 25);
+         assert_eq!(age_array.value(2), 35);
+     }
+ 
+     /// Test reading Parquet files without field IDs with partial projection.
+     /// Only a subset of columns are requested, verifying position-based 
fallback
+     /// handles column selection correctly.
+     #[tokio::test]
+     async fn test_read_parquet_without_field_ids_partial_projection() {
+         use arrow_array::Int32Array;
+ 
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "col1", 
Type::Primitive(PrimitiveType::String)).into(),
+                     NestedField::required(2, "col2", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::required(3, "col3", 
Type::Primitive(PrimitiveType::String)).into(),
+                     NestedField::required(4, "col4", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("col1", DataType::Utf8, false),
+             Field::new("col2", DataType::Int32, false),
+             Field::new("col3", DataType::Utf8, false),
+             Field::new("col4", DataType::Int32, false),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as 
ArrayRef;
+         let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+         let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as 
ArrayRef;
+         let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
+ 
+         let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
+             col1_data, col2_data, col3_data, col4_data,
+         ])
+         .unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+ 
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 3],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+         assert_eq!(batch.num_rows(), 2);
+         assert_eq!(batch.num_columns(), 2);
+ 
+         let col1_array = batch.column(0).as_string::<i32>();
+         assert_eq!(col1_array.value(0), "a");
+         assert_eq!(col1_array.value(1), "b");
+ 
+         let col3_array = batch.column(1).as_string::<i32>();
+         assert_eq!(col3_array.value(0), "c");
+         assert_eq!(col3_array.value(1), "d");
+     }
+ 
+     /// Test reading Parquet files without field IDs with schema evolution.
+     /// The Iceberg schema has more fields than the Parquet file, testing that
+     /// missing columns are filled with NULLs.
+     #[tokio::test]
+     async fn test_read_parquet_without_field_ids_schema_evolution() {
+         use arrow_array::{Array, Int32Array};
+ 
+         // Schema with field 3 added after the file was written
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                     NestedField::required(2, "age", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::optional(3, "city", 
Type::Primitive(PrimitiveType::String)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("name", DataType::Utf8, false),
+             Field::new("age", DataType::Int32, false),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as 
ArrayRef;
+         let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
+ 
+         let to_write =
+             RecordBatch::try_new(arrow_schema.clone(), vec![name_data, 
age_data]).unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+ 
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 2, 3],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+         assert_eq!(batch.num_rows(), 2);
+         assert_eq!(batch.num_columns(), 3);
+ 
+         let name_array = batch.column(0).as_string::<i32>();
+         assert_eq!(name_array.value(0), "Alice");
+         assert_eq!(name_array.value(1), "Bob");
+ 
+         let age_array = batch
+             .column(1)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(age_array.value(0), 30);
+         assert_eq!(age_array.value(1), 25);
+ 
+         // Verify missing column filled with NULLs
+         let city_array = batch.column(2).as_string::<i32>();
+         assert_eq!(city_array.null_count(), 2);
+         assert!(city_array.is_null(0));
+         assert!(city_array.is_null(1));
+     }
+ 
+     /// Test reading Parquet files without field IDs that have multiple row 
groups.
+     /// This ensures the position-based fallback works correctly across row 
group boundaries.
+     #[tokio::test]
+     async fn test_read_parquet_without_field_ids_multiple_row_groups() {
+         use arrow_array::Int32Array;
+ 
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                     NestedField::required(2, "value", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("name", DataType::Utf8, false),
+             Field::new("value", DataType::Int32, false),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         // Small row group size to create multiple row groups
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .set_write_batch_size(2)
+             .set_max_row_group_size(2)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+ 
+         // Write 6 rows in 3 batches (will create 3 row groups)
+         for batch_num in 0..3 {
+             let name_data = Arc::new(StringArray::from(vec![
+                 format!("name_{}", batch_num * 2),
+                 format!("name_{}", batch_num * 2 + 1),
+             ])) as ArrayRef;
+             let value_data =
+                 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 
1])) as ArrayRef;
+ 
+             let batch =
+                 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, 
value_data]).unwrap();
+             writer.write(&batch).expect("Writing batch");
+         }
+         writer.close().unwrap();
+ 
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 2],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         assert!(!result.is_empty());
+ 
+         let mut all_names = Vec::new();
+         let mut all_values = Vec::new();
+ 
+         for batch in &result {
+             let name_array = batch.column(0).as_string::<i32>();
+             let value_array = batch
+                 .column(1)
+                 .as_primitive::<arrow_array::types::Int32Type>();
+ 
+             for i in 0..batch.num_rows() {
+                 all_names.push(name_array.value(i).to_string());
+                 all_values.push(value_array.value(i));
+             }
+         }
+ 
+         assert_eq!(all_names.len(), 6);
+         assert_eq!(all_values.len(), 6);
+ 
+         for i in 0..6 {
+             assert_eq!(all_names[i], format!("name_{}", i));
+             assert_eq!(all_values[i], i as i32);
+         }
+     }
+ 
+     /// Test reading Parquet files without field IDs with nested types 
(struct).
+     /// Java's pruneColumnsFallback() projects entire top-level columns 
including nested content.
+     /// This test verifies that a top-level struct field is projected 
correctly with all its nested fields.
+     #[tokio::test]
+     async fn test_read_parquet_without_field_ids_with_struct() {
+         use arrow_array::{Int32Array, StructArray};
+         use arrow_schema::Fields;
+ 
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::required(
+                         2,
+                         "person",
+                         Type::Struct(crate::spec::StructType::new(vec![
+                             NestedField::required(
+                                 3,
+                                 "name",
+                                 Type::Primitive(PrimitiveType::String),
+                             )
+                             .into(),
+                             NestedField::required(4, "age", 
Type::Primitive(PrimitiveType::Int))
+                                 .into(),
+                         ])),
+                     )
+                     .into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, false),
+             Field::new(
+                 "person",
+                 DataType::Struct(Fields::from(vec![
+                     Field::new("name", DataType::Utf8, false),
+                     Field::new("age", DataType::Int32, false),
+                 ])),
+                 false,
+             ),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+         let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as 
ArrayRef;
+         let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
+         let person_data = Arc::new(StructArray::from(vec![
+             (
+                 Arc::new(Field::new("name", DataType::Utf8, false)),
+                 name_data,
+             ),
+             (
+                 Arc::new(Field::new("age", DataType::Int32, false)),
+                 age_data,
+             ),
+         ])) as ArrayRef;
+ 
+         let to_write =
+             RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
person_data]).unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+ 
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 2],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+         assert_eq!(batch.num_rows(), 2);
+         assert_eq!(batch.num_columns(), 2);
+ 
+         let id_array = batch
+             .column(0)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(id_array.value(0), 1);
+         assert_eq!(id_array.value(1), 2);
+ 
+         let person_array = batch.column(1).as_struct();
+         assert_eq!(person_array.num_columns(), 2);
+ 
+         let name_array = person_array.column(0).as_string::<i32>();
+         assert_eq!(name_array.value(0), "Alice");
+         assert_eq!(name_array.value(1), "Bob");
+ 
+         let age_array = person_array
+             .column(1)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(age_array.value(0), 30);
+         assert_eq!(age_array.value(1), 25);
+     }
+ 
+     /// Test reading Parquet files without field IDs with schema evolution - 
column added in the middle.
+     /// When a new column is inserted between existing columns in the schema 
order,
+     /// the fallback projection must correctly map field IDs to output 
positions.
+     #[tokio::test]
+     async fn 
test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
+         use arrow_array::{Array, Int32Array};
+ 
+         let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
+             Field::new("col0", DataType::Int32, true),
+             Field::new("col1", DataType::Int32, true),
+         ]));
+ 
+         // New column added between existing columns: col0 (id=1), newCol 
(id=5), col1 (id=2)
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::optional(1, "col0", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::optional(5, "newCol", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::optional(2, "col1", 
Type::Primitive(PrimitiveType::Int)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
+         let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
+ 
+         let to_write =
+             RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, 
col1_data]).unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         let reader = ArrowReaderBuilder::new(file_io).build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 5, 2],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+         assert_eq!(batch.num_rows(), 2);
+         assert_eq!(batch.num_columns(), 3);
+ 
+         let result_col0 = batch
+             .column(0)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(result_col0.value(0), 1);
+         assert_eq!(result_col0.value(1), 2);
+ 
+         // New column should be NULL (doesn't exist in old file)
+         let result_newcol = batch
+             .column(1)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(result_newcol.null_count(), 2);
+         assert!(result_newcol.is_null(0));
+         assert!(result_newcol.is_null(1));
+ 
+         let result_col1 = batch
+             .column(2)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(result_col1.value(0), 10);
+         assert_eq!(result_col1.value(1), 20);
+     }
+ 
+     /// Test reading Parquet files without field IDs with a filter that 
eliminates all row groups.
+     /// During development of field ID mapping, we saw a panic when 
row_selection_enabled=true and
+     /// all row groups are filtered out.
+     #[tokio::test]
+     async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() 
{
+         use arrow_array::{Float64Array, Int32Array};
+ 
+         // Schema with fields that will use fallback IDs 1, 2, 3
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(1)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                     NestedField::required(3, "value", 
Type::Primitive(PrimitiveType::Double))
+                         .into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, false),
+             Field::new("name", DataType::Utf8, false),
+             Field::new("value", DataType::Float64, false),
+         ]));
+ 
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         // Write data where all ids are >= 10
+         let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as 
ArrayRef;
+         let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as 
ArrayRef;
+         let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 
300.0])) as ArrayRef;
+ 
+         let to_write =
+             RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
name_data, value_data])
+                 .unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+ 
+         let file = File::create(format!("{}/1.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         // Filter that eliminates all row groups: id < 5
+         let predicate = Reference::new("id").less_than(Datum::int(5));
+ 
+         // Enable both row_group_filtering and row_selection - triggered the 
panic
+         let reader = ArrowReaderBuilder::new(file_io)
+             .with_row_group_filtering_enabled(true)
+             .with_row_selection_enabled(true)
+             .build();
+ 
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/1.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 2, 3],
+                 predicate: Some(predicate.bind(schema, true).unwrap()),
+                 deletes: vec![],
+                 partition: None,
+                 partition_spec: None,
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         // Should no longer panic
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         // Should return empty results
+         assert!(result.is_empty() || result.iter().all(|batch| 
batch.num_rows() == 0));
+     }
+ 
+     /// Test bucket partitioning reads source column from data file (not 
partition metadata).
+     ///
+     /// This is an integration test verifying the complete ArrowReader 
pipeline with bucket partitioning.
+     /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., 
testRenamedSourceColumnTable).
+     ///
+     /// # Iceberg Spec Requirements
+     ///
+     /// Per the Iceberg spec "Column Projection" section:
+     /// > "Return the value from partition metadata if an **Identity 
Transform** exists for the field"
+     ///
+     /// This means:
+     /// - Identity transforms (e.g., `identity(dept)`) use constants from 
partition metadata
+     /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source 
columns from data files
+     /// - Partition metadata for bucket transforms stores bucket numbers 
(0-3), NOT source values
+     ///
+     /// Java's PartitionUtil.constantsMap() implements this via:
+     /// ```java
+     /// if (field.transform().isIdentity()) {
+     ///     idToConstant.put(field.sourceId(), converted);
+     /// }
+     /// ```
+     ///
+     /// # What This Test Verifies
+     ///
+     /// This test ensures the full ArrowReader → RecordBatchTransformer 
pipeline correctly handles
+     /// bucket partitioning when FileScanTask provides partition_spec and 
partition_data:
+     ///
+     /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 
13]
+     /// - FileScanTask specifies partition_spec with bucket(4, id) and 
partition_data with bucket=1
+     /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned 
field from constants
+     /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
+     /// - Values are NOT replaced with constant 1 from partition metadata
+     ///
+     /// # Why This Matters
+     ///
+     /// Without correct handling:
+     /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
+     /// - Query results would be incorrect (all rows would have id=1)
+     /// - Bucket partitioning would be unusable for query optimization
+     ///
+     /// # References
+     /// - Iceberg spec: format/spec.md "Column Projection" + "Partition 
Transforms"
+     /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
+     /// - Java impl: 
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+     #[tokio::test]
+     async fn test_bucket_partitioning_reads_source_column_from_file() {
+         use arrow_array::Int32Array;
+ 
+         use crate::spec::{Literal, PartitionSpec, Struct, Transform};
+ 
+         // Iceberg schema with id and name columns
+         let schema = Arc::new(
+             Schema::builder()
+                 .with_schema_id(0)
+                 .with_fields(vec![
+                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                     NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                 ])
+                 .build()
+                 .unwrap(),
+         );
+ 
+         // Partition spec: bucket(4, id)
+         let partition_spec = Arc::new(
+             PartitionSpec::builder(schema.clone())
+                 .with_spec_id(0)
+                 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
+                 .unwrap()
+                 .build()
+                 .unwrap(),
+         );
+ 
+         // Partition data: bucket value is 1
+         let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
+ 
+         // Create Arrow schema with field IDs for Parquet file
+         let arrow_schema = Arc::new(ArrowSchema::new(vec![
+             Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "1".to_string(),
+             )])),
+             Field::new("name", DataType::Utf8, 
true).with_metadata(HashMap::from([(
+                 PARQUET_FIELD_ID_META_KEY.to_string(),
+                 "2".to_string(),
+             )])),
+         ]));
+ 
+         // Write Parquet file with data
+         let tmp_dir = TempDir::new().unwrap();
+         let table_location = tmp_dir.path().to_str().unwrap().to_string();
+         let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+ 
+         let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as 
ArrayRef;
+         let name_data =
+             Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", 
"Dave"])) as ArrayRef;
+ 
+         let to_write =
+             RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
name_data]).unwrap();
+ 
+         let props = WriterProperties::builder()
+             .set_compression(Compression::SNAPPY)
+             .build();
+         let file = File::create(format!("{}/data.parquet", 
&table_location)).unwrap();
+         let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+         writer.write(&to_write).expect("Writing batch");
+         writer.close().unwrap();
+ 
+         // Read the Parquet file with partition spec and data
+         let reader = ArrowReaderBuilder::new(file_io).build();
+         let tasks = Box::pin(futures::stream::iter(
+             vec![Ok(FileScanTask {
+                 start: 0,
+                 length: 0,
+                 record_count: None,
+                 data_file_path: format!("{}/data.parquet", table_location),
+                 data_file_format: DataFileFormat::Parquet,
+                 schema: schema.clone(),
+                 project_field_ids: vec![1, 2],
+                 predicate: None,
+                 deletes: vec![],
+                 partition: Some(partition_data),
+                 partition_spec: Some(partition_spec),
+                 name_mapping: None,
++                limit: None,
+             })]
+             .into_iter(),
+         )) as FileScanTaskStream;
+ 
+         let result = reader
+             .read(tasks)
+             .unwrap()
+             .try_collect::<Vec<RecordBatch>>()
+             .await
+             .unwrap();
+ 
+         // Verify we got the correct data
+         assert_eq!(result.len(), 1);
+         let batch = &result[0];
+ 
+         assert_eq!(batch.num_columns(), 2);
+         assert_eq!(batch.num_rows(), 4);
+ 
+         // The id column MUST contain actual values from the Parquet file [1, 
5, 9, 13],
+         // NOT the constant partition value 1
+         let id_col = batch
+             .column(0)
+             .as_primitive::<arrow_array::types::Int32Type>();
+         assert_eq!(id_col.value(0), 1);
+         assert_eq!(id_col.value(1), 5);
+         assert_eq!(id_col.value(2), 9);
+         assert_eq!(id_col.value(3), 13);
+ 
+         let name_col = batch.column(1).as_string::<i32>();
+         assert_eq!(name_col.value(0), "Alice");
+         assert_eq!(name_col.value(1), "Bob");
+         assert_eq!(name_col.value(2), "Charlie");
+         assert_eq!(name_col.value(3), "Dave");
+     }
  }
diff --cc crates/iceberg/src/scan/context.rs
index 0f39e1845,fe3f5c8f7..a3fbcb1f2
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@@ -133,7 -129,12 +133,14 @@@ impl ManifestEntryContext 
  
              deletes,
  
+             // Include partition data and spec from manifest entry
+             partition: Some(self.manifest_entry.data_file.partition.clone()),
+             // TODO: Pass actual PartitionSpec through context chain for 
native flow
+             partition_spec: None,
+             // TODO: Extract name_mapping from table metadata property 
"schema.name-mapping.default"
+             name_mapping: None,
++
 +            limit: self.limit,
          })
      }
  }
diff --cc crates/iceberg/src/scan/mod.rs
index e9055bdd6,3e319ca06..b2a81249f
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@@ -1914,7 -1777,9 +1911,10 @@@ pub mod tests 
              record_count: Some(100),
              data_file_format: DataFileFormat::Parquet,
              deletes: vec![],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
 +            limit: None,
          };
          test_fn(task);
  
@@@ -1929,7 -1794,9 +1929,10 @@@
              record_count: None,
              data_file_format: DataFileFormat::Avro,
              deletes: vec![],
+             partition: None,
+             partition_spec: None,
+             name_mapping: None,
 +            limit: None,
          };
          test_fn(task);
      }
diff --cc crates/iceberg/src/scan/task.rs
index 17116ef0b,e1ef241a5..ef2136360
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@@ -55,8 -78,32 +78,35 @@@ pub struct FileScanTask 
      /// The list of delete files that may need to be applied to this data file
      pub deletes: Vec<FileScanTaskDeleteFile>,
  
+     /// Partition data from the manifest entry, used to identify which 
columns can use
+     /// constant values from partition metadata vs. reading from the data 
file.
+     /// Per the Iceberg spec, only identity-transformed partition fields 
should use constants.
+     #[serde(default)]
+     #[serde(skip_serializing_if = "Option::is_none")]
+     #[serde(serialize_with = "serialize_not_implemented")]
+     #[serde(deserialize_with = "deserialize_not_implemented")]
+     pub partition: Option<Struct>,
+ 
+     /// The partition spec for this file, used to distinguish identity 
transforms
+     /// (which use partition metadata constants) from non-identity transforms 
like
+     /// bucket/truncate (which must read source columns from the data file).
+     #[serde(default)]
+     #[serde(skip_serializing_if = "Option::is_none")]
+     #[serde(serialize_with = "serialize_not_implemented")]
+     #[serde(deserialize_with = "deserialize_not_implemented")]
+     pub partition_spec: Option<Arc<PartitionSpec>>,
+ 
+     /// Name mapping from table metadata (property: 
schema.name-mapping.default),
+     /// used to resolve field IDs from column names when Parquet files lack 
field IDs
+     /// or have field ID conflicts.
+     #[serde(default)]
+     #[serde(skip_serializing_if = "Option::is_none")]
+     #[serde(serialize_with = "serialize_not_implemented")]
+     #[serde(deserialize_with = "deserialize_not_implemented")]
+     pub name_mapping: Option<Arc<NameMapping>>,
++
 +    /// Maximum number of records to return, None means no limit
 +    pub limit: Option<usize>,
  }
  
  impl FileScanTask {
diff --cc crates/integrations/datafusion/src/table/mod.rs
index 80f070c01,8527668d6..f4bd40e43
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@@ -84,12 -96,138 +96,139 @@@ impl IcebergTableProvider 
          })
      }
  
-     /// Asynchronously tries to construct a new [`IcebergTableProvider`]
-     /// using the given table. Can be used to create a table provider from an 
existing table regardless of the catalog implementation.
+     pub(crate) async fn metadata_table(
+         &self,
+         r#type: MetadataTableType,
+     ) -> Result<IcebergMetadataTableProvider> {
+         // Load fresh table metadata for metadata table access
+         let table = self.catalog.load_table(&self.table_ident).await?;
+         Ok(IcebergMetadataTableProvider { table, r#type })
+     }
+ }
+ 
+ #[async_trait]
+ impl TableProvider for IcebergTableProvider {
+     fn as_any(&self) -> &dyn Any {
+         self
+     }
+ 
+     fn schema(&self) -> ArrowSchemaRef {
+         self.schema.clone()
+     }
+ 
+     fn table_type(&self) -> TableType {
+         TableType::Base
+     }
+ 
+     async fn scan(
+         &self,
+         _state: &dyn Session,
+         projection: Option<&Vec<usize>>,
+         filters: &[Expr],
 -        _limit: Option<usize>,
++        limit: Option<usize>,
+     ) -> DFResult<Arc<dyn ExecutionPlan>> {
+         // Load fresh table metadata from catalog
+         let table = self
+             .catalog
+             .load_table(&self.table_ident)
+             .await
+             .map_err(to_datafusion_error)?;
+ 
+         // Create scan with fresh metadata (always use current snapshot)
+         Ok(Arc::new(IcebergTableScan::new(
+             table,
+             None, // Always use current snapshot for catalog-backed provider
+             self.schema.clone(),
+             projection,
+             filters,
++            limit,
+         )))
+     }
+ 
+     fn supports_filters_pushdown(
+         &self,
+         filters: &[&Expr],
+     ) -> DFResult<Vec<TableProviderFilterPushDown>> {
+         // Push down all filters, as a single source of truth, the scanner 
will drop the filters which couldn't be push down
+         Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
+     }
+ 
+     async fn insert_into(
+         &self,
+         state: &dyn Session,
+         input: Arc<dyn ExecutionPlan>,
+         _insert_op: InsertOp,
+     ) -> DFResult<Arc<dyn ExecutionPlan>> {
+         // Load fresh table metadata from catalog
+         let table = self
+             .catalog
+             .load_table(&self.table_ident)
+             .await
+             .map_err(to_datafusion_error)?;
+ 
+         let partition_spec = table.metadata().default_partition_spec();
+ 
+         // Step 1: Project partition values for partitioned tables
+         let plan_with_partition = if !partition_spec.is_unpartitioned() {
+             project_with_partition(input, &table)?
+         } else {
+             input
+         };
+ 
+         // Step 2: Repartition for parallel processing
+         let target_partitions =
+             
NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
+                 DataFusionError::Configuration(
+                     "target_partitions must be greater than 0".to_string(),
+                 )
+             })?;
+ 
+         let repartitioned_plan =
+             repartition(plan_with_partition, table.metadata_ref(), 
target_partitions)?;
+ 
+         let write_plan = Arc::new(IcebergWriteExec::new(
+             table.clone(),
+             repartitioned_plan,
+             self.schema.clone(),
+         ));
+ 
+         // Merge the outputs of write_plan into one so we can commit all 
files together
+         let coalesce_partitions = 
Arc::new(CoalescePartitionsExec::new(write_plan));
+ 
+         Ok(Arc::new(IcebergCommitExec::new(
+             table,
+             self.catalog.clone(),
+             coalesce_partitions,
+             self.schema.clone(),
+         )))
+     }
+ }
+ 
+ /// Static table provider for read-only snapshot access.
+ ///
+ /// This provider holds a cached table instance and does not refresh metadata 
or support
+ /// write operations. Use this for consistent analytical queries, time-travel 
scenarios,
+ /// or when you want to avoid catalog overhead.
+ ///
+ /// For catalog-backed tables with write support and automatic refresh, use
+ /// [`IcebergTableProvider`] instead.
+ #[derive(Debug, Clone)]
+ pub struct IcebergStaticTableProvider {
+     /// The static table instance (never refreshed)
+     table: Table,
+     /// Optional snapshot ID for this static view
+     snapshot_id: Option<i64>,
+     /// A reference-counted arrow `Schema`
+     schema: ArrowSchemaRef,
+ }
+ 
+ impl IcebergStaticTableProvider {
+     /// Creates a static provider from a table instance.
+     ///
+     /// Uses the table's current snapshot for all queries. Does not support 
write operations.
      pub async fn try_new_from_table(table: Table) -> Result<Self> {
          let schema = 
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
-         Ok(IcebergTableProvider {
+         Ok(IcebergStaticTableProvider {
              table,
              snapshot_id: None,
              schema,
@@@ -149,8 -280,9 +281,9 @@@ impl TableProvider for IcebergStaticTab
          _state: &dyn Session,
          projection: Option<&Vec<usize>>,
          filters: &[Expr],
 -        _limit: Option<usize>,
 +        limit: Option<usize>,
      ) -> DFResult<Arc<dyn ExecutionPlan>> {
+         // Use cached table (no refresh)
          Ok(Arc::new(IcebergTableScan::new(
              self.table.clone(),
              self.snapshot_id,

Reply via email to