This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6eb8d45c12 Let `FileScanConfig` own a list of `ProjectionExpr`s 
(#18253)
6eb8d45c12 is described below

commit 6eb8d45c122cf4c690bf3e8521c529ec720b549b
Author: Matthew Kim <[email protected]>
AuthorDate: Mon Oct 27 14:47:36 2025 -0400

    Let `FileScanConfig` own a list of `ProjectionExpr`s (#18253)
    
    ## Which issue does this PR close?
    
    - Related to https://github.com/apache/datafusion/issues/14993
    
    ## Rationale for this change
    
    To enable expression pushdown to file sources, we need to plumb
    expressions through the `FileScanConfig` layer. Currently,
    `FileScanConfig` only tracks column indices for projection, which limits
    us to simple and naive column selection.
    
    This PR begins expression pushdown implementation by having
    `FileScanConfig` own a list of `ProjectionExpr`s, instead of column
    indices. This allows file sources to eventually receive and optimize
    based on the actual expressions being projected.
    
    
    ## Notes about this PR
    - The first commit is based off of
    https://github.com/apache/datafusion/pull/18231
    - To avoid a super large diff and a harder review, I've decided to break
    (#14993) into 2 tasks:
    - Have the `DataSource` (`FileScanConfig`) actually hold projection
    expressions (this PR)
    - Flow the projection expressions from `DataSourceExec` all the way to
    the `FileSource`
    
    ---------
    
    Co-authored-by: Adrian Garcia Badaracco 
<[email protected]>
---
 .../examples/advanced_parquet_index.rs             |   2 +-
 datafusion-examples/examples/csv_json_opener.rs    |   4 +-
 .../examples/default_column_values.rs              |   2 +-
 datafusion-examples/examples/parquet_index.rs      |   2 +-
 datafusion/catalog-listing/src/table.rs            |   2 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   2 +-
 .../core/src/datasource/physical_plan/avro.rs      |   6 +-
 .../core/src/datasource/physical_plan/csv.rs       |   6 +-
 .../core/src/datasource/physical_plan/json.rs      |   4 +-
 .../core/src/datasource/physical_plan/parquet.rs   |   4 +-
 datafusion/core/tests/parquet/schema_coercion.rs   |   2 +-
 .../physical_optimizer/filter_pushdown/util.rs     |   2 +-
 .../physical_optimizer/projection_pushdown.rs      |   6 +-
 datafusion/datasource/src/file_scan_config.rs      | 135 ++++++++++-----
 datafusion/datasource/src/table_schema.rs          |   4 +
 datafusion/physical-expr/src/projection.rs         | 186 ++++++++++++++++-----
 datafusion/physical-plan/src/projection.rs         |   8 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |   2 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |   5 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   4 +-
 datafusion/substrait/src/physical_plan/consumer.rs |   4 +-
 datafusion/substrait/src/physical_plan/producer.rs |   7 +-
 docs/source/library-user-guide/upgrading.md        |  51 ++++++
 23 files changed, 334 insertions(+), 116 deletions(-)

diff --git a/datafusion-examples/examples/advanced_parquet_index.rs 
b/datafusion-examples/examples/advanced_parquet_index.rs
index 55400e2192..1c560be6d0 100644
--- a/datafusion-examples/examples/advanced_parquet_index.rs
+++ b/datafusion-examples/examples/advanced_parquet_index.rs
@@ -502,7 +502,7 @@ impl TableProvider for IndexTableProvider {
         let file_scan_config =
             FileScanConfigBuilder::new(object_store_url, schema, file_source)
                 .with_limit(limit)
-                .with_projection(projection.cloned())
+                .with_projection_indices(projection.cloned())
                 .with_file(partitioned_file)
                 .build();
 
diff --git a/datafusion-examples/examples/csv_json_opener.rs 
b/datafusion-examples/examples/csv_json_opener.rs
index 1a2c2cbff4..8abed90238 100644
--- a/datafusion-examples/examples/csv_json_opener.rs
+++ b/datafusion-examples/examples/csv_json_opener.rs
@@ -60,7 +60,7 @@ async fn csv_opener() -> Result<()> {
         Arc::clone(&schema),
         Arc::new(CsvSource::default()),
     )
-    .with_projection(Some(vec![12, 0]))
+    .with_projection_indices(Some(vec![12, 0]))
     .with_limit(Some(5))
     .with_file(PartitionedFile::new(path.display().to_string(), 10))
     .build();
@@ -126,7 +126,7 @@ async fn json_opener() -> Result<()> {
         schema,
         Arc::new(JsonSource::default()),
     )
-    .with_projection(Some(vec![1, 0]))
+    .with_projection_indices(Some(vec![1, 0]))
     .with_limit(Some(5))
     .with_file(PartitionedFile::new(path.to_string(), 10))
     .build();
diff --git a/datafusion-examples/examples/default_column_values.rs 
b/datafusion-examples/examples/default_column_values.rs
index 43e2d4ca09..d3a7d2ec67 100644
--- a/datafusion-examples/examples/default_column_values.rs
+++ b/datafusion-examples/examples/default_column_values.rs
@@ -260,7 +260,7 @@ impl TableProvider for DefaultValueTableProvider {
             self.schema.clone(),
             Arc::new(parquet_source),
         )
-        .with_projection(projection.cloned())
+        .with_projection_indices(projection.cloned())
         .with_limit(limit)
         .with_file_group(file_group)
         
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
diff --git a/datafusion-examples/examples/parquet_index.rs 
b/datafusion-examples/examples/parquet_index.rs
index afc3b279f4..127c55da98 100644
--- a/datafusion-examples/examples/parquet_index.rs
+++ b/datafusion-examples/examples/parquet_index.rs
@@ -246,7 +246,7 @@ impl TableProvider for IndexTableProvider {
         let source = 
Arc::new(ParquetSource::default().with_predicate(predicate));
         let mut file_scan_config_builder =
             FileScanConfigBuilder::new(object_store_url, self.schema(), source)
-                .with_projection(projection.cloned())
+                .with_projection_indices(projection.cloned())
                 .with_limit(limit);
 
         // Transform to the format needed to pass to DataSourceExec
diff --git a/datafusion/catalog-listing/src/table.rs 
b/datafusion/catalog-listing/src/table.rs
index e9ac1bf097..95f9523d44 100644
--- a/datafusion/catalog-listing/src/table.rs
+++ b/datafusion/catalog-listing/src/table.rs
@@ -499,7 +499,7 @@ impl TableProvider for ListingTable {
                 .with_file_groups(partitioned_file_lists)
                 .with_constraints(self.constraints.clone())
                 .with_statistics(statistics)
-                .with_projection(projection)
+                .with_projection_indices(projection)
                 .with_limit(limit)
                 .with_output_ordering(output_ordering)
                 .with_table_partition_cols(table_partition_cols)
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index e165707c2e..4881783eeb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -90,7 +90,7 @@ pub(crate) mod test_util {
                 )
                 .with_file_groups(file_groups)
                 .with_statistics(statistics)
-                .with_projection(projection)
+                .with_projection_indices(projection)
                 .with_limit(limit)
                 .build(),
             )
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 8a00af959c..9068c97581 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -88,7 +88,7 @@ mod tests {
             source,
         )
         .with_file(meta.into())
-        .with_projection(Some(vec![0, 1, 2]))
+        .with_projection_indices(Some(vec![0, 1, 2]))
         .build();
 
         let source_exec = DataSourceExec::from_data_source(conf);
@@ -160,7 +160,7 @@ mod tests {
         let source = Arc::new(AvroSource::new());
         let conf = FileScanConfigBuilder::new(object_store_url, file_schema, 
source)
             .with_file(meta.into())
-            .with_projection(projection)
+            .with_projection_indices(projection)
             .build();
 
         let source_exec = DataSourceExec::from_data_source(conf);
@@ -231,7 +231,7 @@ mod tests {
         let conf = FileScanConfigBuilder::new(object_store_url, file_schema, 
source)
             // select specific columns of the files as well as the partitioning
             // column which is supposed to be the last column in the table 
schema.
-            .with_projection(projection)
+            .with_projection_indices(projection)
             .with_file(partitioned_file)
             .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, 
false)])
             .build();
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index b2ef51a76f..4f46a57d8b 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -118,7 +118,7 @@ mod tests {
         ))
         .with_file_compression_type(file_compression_type)
         .with_newlines_in_values(false)
-        .with_projection(Some(vec![0, 2, 4]))
+        .with_projection_indices(Some(vec![0, 2, 4]))
         .build();
 
         assert_eq!(13, config.file_schema().fields().len());
@@ -183,7 +183,7 @@ mod tests {
         ))
         .with_newlines_in_values(false)
         .with_file_compression_type(file_compression_type.to_owned())
-        .with_projection(Some(vec![4, 0, 2]))
+        .with_projection_indices(Some(vec![4, 0, 2]))
         .build();
         assert_eq!(13, config.file_schema().fields().len());
         let csv = DataSourceExec::from_data_source(config);
@@ -373,7 +373,7 @@ mod tests {
         .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, 
false)])
         // We should be able to project on the partition column
         // Which is supposed to be after the file fields
-        .with_projection(Some(vec![0, num_file_schema_fields]))
+        .with_projection_indices(Some(vec![0, num_file_schema_fields]))
         .build();
 
         // we don't have `/date=xx/` in the path but that is ok because
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 0d45711c76..f7d5c710bf 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -297,7 +297,7 @@ mod tests {
         let source = Arc::new(JsonSource::new());
         let conf = FileScanConfigBuilder::new(object_store_url, file_schema, 
source)
             .with_file_groups(file_groups)
-            .with_projection(Some(vec![0, 2]))
+            .with_projection_indices(Some(vec![0, 2]))
             .with_file_compression_type(file_compression_type.to_owned())
             .build();
         let exec = DataSourceExec::from_data_source(conf);
@@ -345,7 +345,7 @@ mod tests {
         let source = Arc::new(JsonSource::new());
         let conf = FileScanConfigBuilder::new(object_store_url, file_schema, 
source)
             .with_file_groups(file_groups)
-            .with_projection(Some(vec![3, 0, 2]))
+            .with_projection_indices(Some(vec![3, 0, 2]))
             .with_file_compression_type(file_compression_type.to_owned())
             .build();
         let exec = DataSourceExec::from_data_source(conf);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 10a475c1cc..6df5cd7ac6 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -201,7 +201,7 @@ mod tests {
                 source,
             )
             .with_file_group(file_group)
-            .with_projection(self.projection.clone())
+            .with_projection_indices(self.projection.clone())
             .build();
             DataSourceExec::from_data_source(base_config)
         }
@@ -1655,7 +1655,7 @@ mod tests {
         let config = FileScanConfigBuilder::new(object_store_url, 
schema.clone(), source)
             .with_file(partitioned_file)
             // file has 10 cols so index 12 should be month and 13 should be 
day
-            .with_projection(Some(vec![0, 1, 2, 12, 13]))
+            .with_projection_indices(Some(vec![0, 1, 2, 12, 13]))
             .with_table_partition_cols(vec![
                 Field::new("year", DataType::Utf8, false),
                 Field::new("month", DataType::UInt8, false),
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs 
b/datafusion/core/tests/parquet/schema_coercion.rs
index 59cbf4b087..9be391a910 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -126,7 +126,7 @@ async fn multi_parquet_coercion_projection() {
         Arc::new(ParquetSource::default()),
     )
     .with_file_group(file_group)
-    .with_projection(Some(vec![1, 0, 2]))
+    .with_projection_indices(Some(vec![1, 0, 2]))
     .build();
 
     let parquet_exec = DataSourceExec::from_data_source(config);
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index f05f3f0028..54e8e7bf04 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -165,7 +165,7 @@ impl FileSource for TestSource {
 
     fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
         Arc::new(TestSource {
-            projection: config.projection.clone(),
+            projection: config.projection_exprs.as_ref().map(|p| 
p.column_indices()),
             ..self.clone()
         })
     }
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index c51a5e02c9..8631613c39 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -390,7 +390,7 @@ fn create_simple_csv_exec() -> Arc<dyn ExecutionPlan> {
         Arc::new(CsvSource::new(false, 0, 0)),
     )
     .with_file(PartitionedFile::new("x".to_string(), 100))
-    .with_projection(Some(vec![0, 1, 2, 3, 4]))
+    .with_projection_indices(Some(vec![0, 1, 2, 3, 4]))
     .build();
 
     DataSourceExec::from_data_source(config)
@@ -409,7 +409,7 @@ fn create_projecting_csv_exec() -> Arc<dyn ExecutionPlan> {
         Arc::new(CsvSource::new(false, 0, 0)),
     )
     .with_file(PartitionedFile::new("x".to_string(), 100))
-    .with_projection(Some(vec![3, 2, 1]))
+    .with_projection_indices(Some(vec![3, 2, 1]))
     .build();
 
     DataSourceExec::from_data_source(config)
@@ -1596,7 +1596,7 @@ fn partitioned_data_source() -> Arc<DataSourceExec> {
     )
     .with_file(PartitionedFile::new("x".to_string(), 100))
     .with_table_partition_cols(vec![Field::new("partition_col", 
DataType::Utf8, true)])
-    .with_projection(Some(vec![0, 1, 2]))
+    .with_projection_indices(Some(vec![0, 1, 2]))
     .build();
 
     DataSourceExec::from_data_source(config)
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 695252803b..c52397d9a7 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -44,18 +44,20 @@ use datafusion_execution::{
     object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
 };
 use datafusion_expr::Operator;
-use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::{expressions::Column, 
utils::reassign_expr_columns};
+use datafusion_physical_expr::expressions::{BinaryExpr, Column};
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::utils::reassign_expr_columns;
 use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, 
Partitioning};
 use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use datafusion_physical_plan::projection::ProjectionExpr;
+use datafusion_physical_plan::projection::{
+    all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
+};
 use datafusion_physical_plan::{
     display::{display_orderings, ProjectSchemaDisplay},
     filter_pushdown::FilterPushdownPropagation,
     metrics::ExecutionPlanMetricsSet,
-    projection::{all_alias_free_columns, new_projections_for_columns},
     DisplayAs, DisplayFormatType,
 };
 use std::{
@@ -124,7 +126,7 @@ use log::{debug, warn};
 /// let file_source = Arc::new(ParquetSource::new());
 /// let config = FileScanConfigBuilder::new(object_store_url, file_schema, 
file_source)
 ///   .with_limit(Some(1000))            // read only the first 1000 records
-///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
+///   .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
 ///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single 
group
 ///   .with_file(PartitionedFile::new("file1.parquet", 1234))
 ///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
@@ -175,9 +177,12 @@ pub struct FileScanConfig {
     pub file_groups: Vec<FileGroup>,
     /// Table constraints
     pub constraints: Constraints,
-    /// Columns on which to project the data. Indexes that are higher than the
-    /// number of columns of `file_schema` refer to `table_partition_cols`.
-    pub projection: Option<Vec<usize>>,
+    /// Physical expressions defining the projection to apply when reading 
data.
+    ///
+    /// Each expression in the projection can reference columns from both the 
file
+    /// schema and table partition columns. If `None`, all columns from the 
table
+    /// schema are projected.
+    pub projection_exprs: Option<ProjectionExprs>,
     /// The maximum number of records to read from this plan. If `None`,
     /// all records after filtering are returned.
     pub limit: Option<usize>,
@@ -229,7 +234,7 @@ pub struct FileScanConfig {
 ///     // Set a limit of 1000 rows
 ///     .with_limit(Some(1000))
 ///     // Project only the first column
-///     .with_projection(Some(vec![0]))
+///     .with_projection_indices(Some(vec![0]))
 ///     // Add partition columns
 ///     .with_table_partition_cols(vec![
 ///         Field::new("date", DataType::Utf8, false),
@@ -261,7 +266,7 @@ pub struct FileScanConfigBuilder {
     table_schema: TableSchema,
     file_source: Arc<dyn FileSource>,
     limit: Option<usize>,
-    projection: Option<Vec<usize>>,
+    projection_indices: Option<Vec<usize>>,
     constraints: Option<Constraints>,
     file_groups: Vec<FileGroup>,
     statistics: Option<Statistics>,
@@ -294,7 +299,7 @@ impl FileScanConfigBuilder {
             file_compression_type: None,
             new_lines_in_values: None,
             limit: None,
-            projection: None,
+            projection_indices: None,
             constraints: None,
             batch_size: None,
             expr_adapter_factory: None,
@@ -317,10 +322,25 @@ impl FileScanConfigBuilder {
         self
     }
 
+    pub fn table_schema(&self) -> &SchemaRef {
+        self.table_schema.table_schema()
+    }
+
     /// Set the columns on which to project the data. Indexes that are higher 
than the
     /// number of columns of `file_schema` refer to `table_partition_cols`.
-    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
-        self.projection = projection;
+    ///
+    /// # Deprecated
+    /// Use [`Self::with_projection_indices`] instead. This method will be 
removed in a future release.
+    #[deprecated(since = "51.0.0", note = "Use with_projection_indices 
instead")]
+    pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
+        self.with_projection_indices(indices)
+    }
+
+    /// Set the columns on which to project the data using column indices.
+    ///
+    /// Indexes that are higher than the number of columns of `file_schema` 
refer to `table_partition_cols`.
+    pub fn with_projection_indices(mut self, indices: Option<Vec<usize>>) -> 
Self {
+        self.projection_indices = indices;
         self
     }
 
@@ -433,7 +453,7 @@ impl FileScanConfigBuilder {
             table_schema,
             file_source,
             limit,
-            projection,
+            projection_indices,
             constraints,
             file_groups,
             statistics,
@@ -455,12 +475,18 @@ impl FileScanConfigBuilder {
             file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
         let new_lines_in_values = new_lines_in_values.unwrap_or(false);
 
+        // Convert projection indices to ProjectionExprs using the final table 
schema
+        // (which now includes partition columns if they were added)
+        let projection_exprs = projection_indices.map(|indices| {
+            ProjectionExprs::from_indices(&indices, 
table_schema.table_schema())
+        });
+
         FileScanConfig {
             object_store_url,
             table_schema,
             file_source,
             limit,
-            projection,
+            projection_exprs,
             constraints,
             file_groups,
             output_ordering,
@@ -484,7 +510,9 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
             file_compression_type: Some(config.file_compression_type),
             new_lines_in_values: Some(config.new_lines_in_values),
             limit: config.limit,
-            projection: config.projection,
+            projection_indices: config
+                .projection_exprs
+                .map(|p| p.ordered_column_indices()),
             constraints: Some(config.constraints),
             batch_size: config.batch_size,
             expr_adapter_factory: config.expr_adapter_factory,
@@ -673,15 +701,16 @@ impl DataSource for FileScanConfig {
             let new_projections = new_projections_for_columns(
                 projection,
                 &file_scan
-                    .projection
-                    .clone()
+                    .projection_exprs
+                    .as_ref()
+                    .map(|p| p.ordered_column_indices())
                     .unwrap_or_else(|| 
(0..self.file_schema().fields().len()).collect()),
             );
 
             Arc::new(
                 FileScanConfigBuilder::from(file_scan)
                     // Assign projected statistics to source
-                    .with_projection(Some(new_projections))
+                    .with_projection_indices(Some(new_projections))
                     .with_source(source)
                     .build(),
             ) as _
@@ -727,8 +756,8 @@ impl FileScanConfig {
     }
 
     fn projection_indices(&self) -> Vec<usize> {
-        match &self.projection {
-            Some(proj) => proj.clone(),
+        match &self.projection_exprs {
+            Some(proj) => proj.ordered_column_indices(),
             None => (0..self.file_schema().fields().len()
                 + self.table_partition_cols().len())
                 .collect(),
@@ -825,7 +854,7 @@ impl FileScanConfig {
 
     /// Project the schema, constraints, and the statistics on the given 
column indices
     pub fn project(&self) -> (SchemaRef, Constraints, Statistics, 
Vec<LexOrdering>) {
-        if self.projection.is_none() && self.table_partition_cols().is_empty() 
{
+        if self.projection_exprs.is_none() && 
self.table_partition_cols().is_empty() {
             return (
                 Arc::clone(self.file_schema()),
                 self.constraints.clone(),
@@ -844,12 +873,17 @@ impl FileScanConfig {
     }
 
     pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
-        self.projection.as_ref().map(|p| {
-            p.iter()
-                .filter(|col_idx| **col_idx < 
self.file_schema().fields().len())
-                .map(|col_idx| self.file_schema().field(*col_idx).name())
+        let fields = self.file_schema().fields();
+
+        self.projection_exprs.as_ref().map(|p| {
+            let column_indices = p.ordered_column_indices();
+
+            column_indices
+                .iter()
+                .filter(|&&col_i| col_i < fields.len())
+                .map(|&col_i| self.file_schema().field(col_i).name())
                 .cloned()
-                .collect()
+                .collect::<Vec<_>>()
         })
     }
 
@@ -875,11 +909,11 @@ impl FileScanConfig {
     }
 
     pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
-        self.projection.as_ref().map(|p| {
-            p.iter()
-                .filter(|col_idx| **col_idx < 
self.file_schema().fields().len())
-                .copied()
-                .collect()
+        self.projection_exprs.as_ref().map(|p| {
+            p.ordered_column_indices()
+                .into_iter()
+                .filter(|&i| i < self.file_schema().fields().len())
+                .collect::<Vec<_>>()
         })
     }
 
@@ -1415,10 +1449,15 @@ fn get_projected_output_ordering(
                 return false;
             }
 
+            let indices = base_config
+                .projection_exprs
+                .as_ref()
+                .map(|p| p.ordered_column_indices());
+
             let statistics = match MinMaxStatistics::new_from_files(
                 &new_ordering,
                 projected_schema,
-                base_config.projection.as_deref(),
+                indices.as_deref(),
                 group.iter(),
             ) {
                 Ok(statistics) => statistics,
@@ -1479,7 +1518,7 @@ mod tests {
     use datafusion_common::{assert_batches_eq, internal_err};
     use datafusion_expr::{Operator, SortExpr};
     use datafusion_physical_expr::create_physical_sort_expr;
-    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
+    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
     use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 
     /// Returns the column names on the schema
@@ -2143,7 +2182,7 @@ mod tests {
             file_schema,
             Arc::new(MockSource::default()),
         )
-        .with_projection(projection)
+        .with_projection_indices(projection)
         .with_statistics(statistics)
         .with_table_partition_cols(table_partition_cols)
         .build()
@@ -2196,7 +2235,7 @@ mod tests {
         // Build with various configurations
         let config = builder
             .with_limit(Some(1000))
-            .with_projection(Some(vec![0, 1]))
+            .with_projection_indices(Some(vec![0, 1]))
             .with_table_partition_cols(vec![Field::new(
                 "date",
                 wrap_partition_type_in_dict(DataType::Utf8),
@@ -2219,7 +2258,10 @@ mod tests {
         assert_eq!(config.object_store_url, object_store_url);
         assert_eq!(*config.file_schema(), file_schema);
         assert_eq!(config.limit, Some(1000));
-        assert_eq!(config.projection, Some(vec![0, 1]));
+        assert_eq!(
+            config.projection_exprs.as_ref().map(|p| p.column_indices()),
+            Some(vec![0, 1])
+        );
         assert_eq!(config.table_partition_cols().len(), 1);
         assert_eq!(config.table_partition_cols()[0].name(), "date");
         assert_eq!(config.file_groups.len(), 1);
@@ -2253,7 +2295,7 @@ mod tests {
             Arc::clone(&file_schema),
             Arc::clone(&file_source),
         )
-        .with_projection(Some(vec![0, 1, 2]))
+        .with_projection_indices(Some(vec![0, 1, 2]))
         .build();
 
         // Simulate projection being updated. Since the filter has already 
been pushed down,
@@ -2302,7 +2344,10 @@ mod tests {
         assert_eq!(config.object_store_url, object_store_url);
         assert_eq!(*config.file_schema(), file_schema);
         assert_eq!(config.limit, None);
-        assert_eq!(config.projection, None);
+        assert_eq!(
+            config.projection_exprs.as_ref().map(|p| p.column_indices()),
+            None
+        );
         assert!(config.table_partition_cols().is_empty());
         assert!(config.file_groups.is_empty());
         assert_eq!(
@@ -2357,7 +2402,7 @@ mod tests {
             Arc::clone(&schema),
             Arc::clone(&file_source),
         )
-        .with_projection(Some(vec![0, 2]))
+        .with_projection_indices(Some(vec![0, 2]))
         .with_limit(Some(10))
         .with_table_partition_cols(partition_cols.clone())
         .with_file(file.clone())
@@ -2375,7 +2420,13 @@ mod tests {
         let partition_cols = 
partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
         assert_eq!(new_config.object_store_url, object_store_url);
         assert_eq!(*new_config.file_schema(), schema);
-        assert_eq!(new_config.projection, Some(vec![0, 2]));
+        assert_eq!(
+            new_config
+                .projection_exprs
+                .as_ref()
+                .map(|p| p.column_indices()),
+            Some(vec![0, 2])
+        );
         assert_eq!(new_config.limit, Some(10));
         assert_eq!(*new_config.table_partition_cols(), partition_cols);
         assert_eq!(new_config.file_groups.len(), 1);
@@ -2594,7 +2645,7 @@ mod tests {
             Arc::clone(&schema),
             Arc::new(MockSource::default()),
         )
-        .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
+        .with_projection_indices(Some(vec![0, 2])) // Only project columns 0 
and 2
         .with_file_groups(vec![file_group])
         .build();
 
diff --git a/datafusion/datasource/src/table_schema.rs 
b/datafusion/datasource/src/table_schema.rs
index 8e95585ce8..863c123e3b 100644
--- a/datafusion/datasource/src/table_schema.rs
+++ b/datafusion/datasource/src/table_schema.rs
@@ -132,6 +132,10 @@ impl TableSchema {
         table_partition_cols: Vec<FieldRef>,
     ) -> TableSchema {
         self.table_partition_cols = table_partition_cols;
+        // Rebuild the table schema with the new partition columns
+        let mut builder = SchemaBuilder::from(self.file_schema.as_ref());
+        builder.extend(self.table_partition_cols.iter().cloned());
+        self.table_schema = Arc::new(builder.finish());
         self
     }
 
diff --git a/datafusion/physical-expr/src/projection.rs 
b/datafusion/physical-expr/src/projection.rs
index e35bfbb3a2..fc972d644e 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -100,24 +100,24 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, 
String) {
 /// representing a complete projection operation and provides
 /// methods to manipulate and analyze the projection as a whole.
 #[derive(Debug, Clone)]
-pub struct Projection {
+pub struct ProjectionExprs {
     exprs: Vec<ProjectionExpr>,
 }
 
-impl std::fmt::Display for Projection {
+impl std::fmt::Display for ProjectionExprs {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         let exprs: Vec<String> = self.exprs.iter().map(|e| 
e.to_string()).collect();
         write!(f, "Projection[{}]", exprs.join(", "))
     }
 }
 
-impl From<Vec<ProjectionExpr>> for Projection {
+impl From<Vec<ProjectionExpr>> for ProjectionExprs {
     fn from(value: Vec<ProjectionExpr>) -> Self {
         Self { exprs: value }
     }
 }
 
-impl From<&[ProjectionExpr]> for Projection {
+impl From<&[ProjectionExpr]> for ProjectionExprs {
     fn from(value: &[ProjectionExpr]) -> Self {
         Self {
             exprs: value.to_vec(),
@@ -125,15 +125,83 @@ impl From<&[ProjectionExpr]> for Projection {
     }
 }
 
-impl AsRef<[ProjectionExpr]> for Projection {
+impl FromIterator<ProjectionExpr> for ProjectionExprs {
+    fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
+        Self {
+            exprs: exprs.into_iter().collect::<Vec<_>>(),
+        }
+    }
+}
+
+impl AsRef<[ProjectionExpr]> for ProjectionExprs {
     fn as_ref(&self) -> &[ProjectionExpr] {
         &self.exprs
     }
 }
 
-impl Projection {
-    pub fn new(exprs: Vec<ProjectionExpr>) -> Self {
-        Self { exprs }
+impl ProjectionExprs {
+    pub fn new<I>(exprs: I) -> Self
+    where
+        I: IntoIterator<Item = ProjectionExpr>,
+    {
+        Self {
+            exprs: exprs.into_iter().collect::<Vec<_>>(),
+        }
+    }
+
+    /// Creates a [`ProjectionExpr`] from a list of column indices.
+    ///
+    /// This is a convenience method for creating simple column-only 
projections, where each projection expression is a reference to a column
+    /// in the input schema.
+    ///
+    /// # Behavior
+    /// - Ordering: the output projection preserves the exact order of indices 
provided in the input slice
+    ///   For example, `[2, 0, 1]` will produce projections for columns 2, 0, 
then 1 in that order
+    /// - Duplicates: Duplicate indices are allowed and will create multiple 
projection expressions referencing the same source column
+    ///   For example, `[0, 0]` creates 2 separate projections both 
referencing column 0
+    ///
+    /// # Panics
+    /// Panics if any index in `indices` is out of bounds for the provided 
schema.
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// use std::sync::Arc;
+    /// use arrow::datatypes::{Schema, Field, DataType};
+    /// use datafusion_physical_expr::projection::ProjectionExprs;
+    ///
+    /// // Create a schema with three columns
+    /// let schema = Arc::new(Schema::new(vec![
+    ///     Field::new("a", DataType::Int32, false),
+    ///     Field::new("b", DataType::Utf8, false),
+    ///     Field::new("c", DataType::Float64, false),
+    /// ]));
+    ///
+    /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
+    /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
+    ///
+    /// // This creates: SELECT c@2 AS c, a@0 AS a
+    /// assert_eq!(projection.as_ref().len(), 2);
+    /// assert_eq!(projection.as_ref()[0].alias, "c");
+    /// assert_eq!(projection.as_ref()[1].alias, "a");
+    ///
+    /// // Duplicate indices are allowed
+    /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], 
&schema);
+    /// assert_eq!(projection_with_dups.as_ref().len(), 3);
+    /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
+    /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
+    /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
+    /// ```
+    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
+        let projection_exprs = indices.iter().map(|&i| {
+            let field = schema.field(i);
+            ProjectionExpr {
+                expr: Arc::new(Column::new(field.name(), i)),
+                alias: field.name().clone(),
+            }
+        });
+
+        Self::from_iter(projection_exprs)
     }
 
     /// Returns an iterator over the projection expressions
@@ -167,7 +235,7 @@ impl Projection {
     ///
     /// ```rust
     /// use std::sync::Arc;
-    /// use datafusion_physical_expr::projection::{Projection, ProjectionExpr};
+    /// use datafusion_physical_expr::projection::{ProjectionExprs, 
ProjectionExpr};
     /// use datafusion_physical_expr::expressions::{Column, BinaryExpr, 
Literal};
     /// use datafusion_common::{Result, ScalarValue};
     /// use datafusion_expr::Operator;
@@ -175,7 +243,7 @@ impl Projection {
     /// fn main() -> Result<()> {
     ///     // Example from the docstring:
     ///     // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
-    ///     let base = Projection::new(vec![
+    ///     let base = ProjectionExprs::new(vec![
     ///         ProjectionExpr {
     ///             expr: Arc::new(Column::new("c", 2)),
     ///             alias: "x".to_string(),
@@ -191,7 +259,7 @@ impl Projection {
     ///     ]);
     ///
     ///     // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
-    ///     let top = Projection::new(vec![
+    ///     let top = ProjectionExprs::new(vec![
     ///         ProjectionExpr {
     ///             expr: Arc::new(BinaryExpr::new(
     ///                 Arc::new(Column::new("x", 0)),
@@ -224,7 +292,7 @@ impl Projection {
     /// # Errors
     /// This function returns an error if any expression in the `other` 
projection cannot be
     /// applied on top of this projection.
-    pub fn try_merge(&self, other: &Projection) -> Result<Projection> {
+    pub fn try_merge(&self, other: &ProjectionExprs) -> 
Result<ProjectionExprs> {
         let mut new_exprs = Vec::with_capacity(other.exprs.len());
         for proj_expr in &other.exprs {
             let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
@@ -240,7 +308,7 @@ impl Projection {
                 alias: proj_expr.alias.clone(),
             });
         }
-        Ok(Projection::new(new_exprs))
+        Ok(ProjectionExprs::new(new_exprs))
     }
 
     /// Extract the column indices used in this projection.
@@ -256,6 +324,46 @@ impl Projection {
             .collect_vec()
     }
 
+    /// Extract the ordered column indices for a column-only projection.
+    ///
+    /// This function assumes that all expressions in the projection are 
simple column references.
+    /// It returns the column indices in the order they appear in the 
projection.
+    ///
+    /// # Panics
+    ///
+    /// Panics if any expression in the projection is not a simple column 
reference. This includes:
+    /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`)
+    /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`)
+    /// - Literals (e.g., `42`, `'hello'`)
+    /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`)
+    ///
+    /// # Returns
+    ///
+    /// A vector of column indices in projection order. Unlike 
[`column_indices()`](Self::column_indices),
+    /// this function:
+    /// - Preserves the projection order (does not sort)
+    /// - Preserves duplicates (does not deduplicate)
+    ///
+    /// # Example
+    ///
+    /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is 
at index 2,
+    /// this function would return `[2, 0, 2]`.
+    ///
+    /// Use [`column_indices()`](Self::column_indices) instead if the 
projection may contain
+    /// non-column expressions or if you need a deduplicated sorted list.
+    pub fn ordered_column_indices(&self) -> Vec<usize> {
+        self.exprs
+            .iter()
+            .map(|e| {
+                e.expr
+                    .as_any()
+                    .downcast_ref::<Column>()
+                    .expect("Expected column reference in projection")
+                    .index()
+            })
+            .collect()
+    }
+
     /// Project a schema according to this projection.
     /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` 
is at index 0 and `b` is at index 1,
     /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output 
schema would be `[x: Int32, y: Int32]`.
@@ -327,7 +435,7 @@ impl Projection {
     }
 }
 
-impl<'a> IntoIterator for &'a Projection {
+impl<'a> IntoIterator for &'a ProjectionExprs {
     type Item = &'a ProjectionExpr;
     type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
 
@@ -336,7 +444,7 @@ impl<'a> IntoIterator for &'a Projection {
     }
 }
 
-impl IntoIterator for Projection {
+impl IntoIterator for ProjectionExprs {
     type Item = ProjectionExpr;
     type IntoIter = std::vec::IntoIter<ProjectionExpr>;
 
@@ -1570,7 +1678,7 @@ pub(crate) mod tests {
         let source = get_stats();
         let schema = get_schema();
 
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("col1", 1)),
                 alias: "col1".to_string(),
@@ -1612,7 +1720,7 @@ pub(crate) mod tests {
         let source = get_stats();
         let schema = get_schema();
 
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("col2", 2)),
                 alias: "col2".to_string(),
@@ -1663,7 +1771,7 @@ pub(crate) mod tests {
                 alias: "b".to_string(),
             },
         ];
-        let projection = Projection::new(exprs.clone());
+        let projection = ProjectionExprs::new(exprs.clone());
         assert_eq!(projection.as_ref().len(), 2);
         Ok(())
     }
@@ -1674,7 +1782,7 @@ pub(crate) mod tests {
             expr: Arc::new(Column::new("x", 0)),
             alias: "x".to_string(),
         }];
-        let projection: Projection = exprs.clone().into();
+        let projection: ProjectionExprs = exprs.clone().into();
         assert_eq!(projection.as_ref().len(), 1);
         Ok(())
     }
@@ -1691,7 +1799,7 @@ pub(crate) mod tests {
                 alias: "col2".to_string(),
             },
         ];
-        let projection = Projection::new(exprs);
+        let projection = ProjectionExprs::new(exprs);
         let as_ref: &[ProjectionExpr] = projection.as_ref();
         assert_eq!(as_ref.len(), 2);
         Ok(())
@@ -1700,7 +1808,7 @@ pub(crate) mod tests {
     #[test]
     fn test_column_indices_multiple_columns() -> Result<()> {
         // Test with reversed column order to ensure proper reordering
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("c", 5)),
                 alias: "c".to_string(),
@@ -1722,7 +1830,7 @@ pub(crate) mod tests {
     #[test]
     fn test_column_indices_duplicates() -> Result<()> {
         // Test that duplicate column indices appear only once
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("a", 1)),
                 alias: "a".to_string(),
@@ -1743,7 +1851,7 @@ pub(crate) mod tests {
     #[test]
     fn test_column_indices_unsorted() -> Result<()> {
         // Test that column indices are sorted in the output
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("c", 5)),
                 alias: "c".to_string(),
@@ -1769,7 +1877,7 @@ pub(crate) mod tests {
             Operator::Plus,
             Arc::new(Column::new("b", 4)),
         ));
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr,
                 alias: "sum".to_string(),
@@ -1786,7 +1894,7 @@ pub(crate) mod tests {
 
     #[test]
     fn test_column_indices_empty() -> Result<()> {
-        let projection = Projection::new(vec![]);
+        let projection = ProjectionExprs::new(vec![]);
         assert_eq!(projection.column_indices(), Vec::<usize>::new());
         Ok(())
     }
@@ -1794,7 +1902,7 @@ pub(crate) mod tests {
     #[test]
     fn test_merge_simple_columns() -> Result<()> {
         // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
-        let base_projection = Projection::new(vec![
+        let base_projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("c", 2)),
                 alias: "x".to_string(),
@@ -1810,7 +1918,7 @@ pub(crate) mod tests {
         ]);
 
         // Second projection: SELECT y@1 AS col2, x@0 AS col1
-        let top_projection = Projection::new(vec![
+        let top_projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("y", 1)),
                 alias: "col2".to_string(),
@@ -1831,7 +1939,7 @@ pub(crate) mod tests {
     #[test]
     fn test_merge_with_expressions() -> Result<()> {
         // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
-        let base_projection = Projection::new(vec![
+        let base_projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("c", 2)),
                 alias: "x".to_string(),
@@ -1847,7 +1955,7 @@ pub(crate) mod tests {
         ]);
 
         // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
-        let top_projection = Projection::new(vec![
+        let top_projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(BinaryExpr::new(
                     Arc::new(Column::new("y", 1)),
@@ -1876,7 +1984,7 @@ pub(crate) mod tests {
     #[test]
     fn try_merge_error() {
         // Create a base projection
-        let base = Projection::new(vec![
+        let base = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("a", 0)),
                 alias: "x".to_string(),
@@ -1888,7 +1996,7 @@ pub(crate) mod tests {
         ]);
 
         // Create a top projection that references a non-existent column index
-        let top = Projection::new(vec![ProjectionExpr {
+        let top = ProjectionExprs::new(vec![ProjectionExpr {
             expr: Arc::new(Column::new("z", 5)), // Invalid index
             alias: "result".to_string(),
         }]);
@@ -1907,7 +2015,7 @@ pub(crate) mod tests {
         let input_schema = get_schema();
 
         // Projection: SELECT col2 AS c, col0 AS a
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("col2", 2)),
                 alias: "c".to_string(),
@@ -1940,7 +2048,7 @@ pub(crate) mod tests {
         let input_schema = get_schema();
 
         // Projection: SELECT col0 + 1 AS incremented
-        let projection = Projection::new(vec![ProjectionExpr {
+        let projection = ProjectionExprs::new(vec![ProjectionExpr {
             expr: Arc::new(BinaryExpr::new(
                 Arc::new(Column::new("col0", 0)),
                 Operator::Plus,
@@ -1974,7 +2082,7 @@ pub(crate) mod tests {
         ]);
 
         // Projection: SELECT col0 AS renamed
-        let projection = Projection::new(vec![ProjectionExpr {
+        let projection = ProjectionExprs::new(vec![ProjectionExpr {
             expr: Arc::new(Column::new("col0", 0)),
             alias: "renamed".to_string(),
         }]);
@@ -1994,7 +2102,7 @@ pub(crate) mod tests {
     #[test]
     fn test_project_schema_empty() -> Result<()> {
         let input_schema = get_schema();
-        let projection = Projection::new(vec![]);
+        let projection = ProjectionExprs::new(vec![]);
 
         let output_schema = projection.project_schema(&input_schema)?;
 
@@ -2009,7 +2117,7 @@ pub(crate) mod tests {
         let input_schema = get_schema();
 
         // Projection: SELECT col1 AS text, col0 AS num
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("col1", 1)),
                 alias: "text".to_string(),
@@ -2057,7 +2165,7 @@ pub(crate) mod tests {
         let input_schema = get_schema();
 
         // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS 
text
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(BinaryExpr::new(
                     Arc::new(Column::new("col0", 0)),
@@ -2105,7 +2213,7 @@ pub(crate) mod tests {
         let input_schema = get_schema();
 
         // Projection with only primitive width columns: SELECT col2 AS f, 
col0 AS i
-        let projection = Projection::new(vec![
+        let projection = ProjectionExprs::new(vec![
             ProjectionExpr {
                 expr: Arc::new(Column::new("col2", 2)),
                 alias: "f".to_string(),
@@ -2136,7 +2244,7 @@ pub(crate) mod tests {
         let input_stats = get_stats();
         let input_schema = get_schema();
 
-        let projection = Projection::new(vec![]);
+        let projection = ProjectionExprs::new(vec![]);
 
         let output_stats = projection.project_statistics(input_stats, 
&input_schema)?;
 
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index 4dc88bc566..2c84570b33 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -53,7 +53,9 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, 
PhysicalExprRef};
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 // Re-exported from datafusion-physical-expr for backwards compatibility
 // We recommend updating your imports to use datafusion-physical-expr directly
-pub use datafusion_physical_expr::projection::{update_expr, Projection, 
ProjectionExpr};
+pub use datafusion_physical_expr::projection::{
+    update_expr, ProjectionExpr, ProjectionExprs,
+};
 
 use futures::stream::{Stream, StreamExt};
 use log::trace;
@@ -65,7 +67,7 @@ use log::trace;
 #[derive(Debug, Clone)]
 pub struct ProjectionExec {
     /// The projection expressions stored as tuples of (expression, output 
column name)
-    projection: Projection,
+    projection: ProjectionExprs,
     /// The schema once the projection has been applied to the input
     schema: SchemaRef,
     /// The input plan
@@ -130,7 +132,7 @@ impl ProjectionExec {
         let input_schema = input.schema();
         // convert argument to Vec<ProjectionExpr>
         let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
-        let projection = Projection::new(expr_vec);
+        let projection = ProjectionExprs::new(expr_vec);
 
         let schema = Arc::new(projection.project_schema(&input_schema)?);
 
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 7c4b9e55b8..2a3906d493 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -545,7 +545,7 @@ pub fn parse_protobuf_file_scan_config(
         .with_file_groups(file_groups)
         .with_constraints(constraints)
         .with_statistics(statistics)
-        .with_projection(Some(projection))
+        .with_projection_indices(Some(projection))
         .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
         .with_table_partition_cols(table_partition_cols)
         .with_output_ordering(output_ordering)
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 399c234191..dc0a78dbcc 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -532,9 +532,10 @@ pub fn serialize_file_scan_config(
         statistics: Some((&conf.file_source.statistics().unwrap()).into()),
         limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
         projection: conf
-            .projection
+            .projection_exprs
             .as_ref()
-            .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
+            .map(|p| p.column_indices())
+            .unwrap_or((0..schema.fields().len()).collect::<Vec<_>>())
             .iter()
             .map(|n| *n as u32)
             .collect(),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index a0456e2031..c8b2bc02e4 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -920,7 +920,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() 
-> Result<()> {
         schema,
         file_source,
     )
-    .with_projection(Some(vec![0, 1]))
+    .with_projection_indices(Some(vec![0, 1]))
     .with_file_group(FileGroup::new(vec![file_group]))
     .with_table_partition_cols(vec![Field::new(
         "part".to_string(),
@@ -1814,7 +1814,7 @@ async fn roundtrip_projection_source() -> Result<()> {
         1024,
     )])])
     .with_statistics(statistics)
-    .with_projection(Some(vec![0, 1, 2]))
+    .with_projection_indices(Some(vec![0, 1, 2]))
     .build();
 
     let filter = Arc::new(
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs 
b/datafusion/substrait/src/physical_plan/consumer.rs
index ecf465dd3f..45a19cea80 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -151,8 +151,8 @@ pub async fn from_substrait_rel(
                                 .iter()
                                 .map(|item| item.field as usize)
                                 .collect();
-                            base_config_builder =
-                                
base_config_builder.with_projection(Some(column_indices));
+                            base_config_builder = base_config_builder
+                                .with_projection_indices(Some(column_indices));
                         }
                     }
 
diff --git a/datafusion/substrait/src/physical_plan/producer.rs 
b/datafusion/substrait/src/physical_plan/producer.rs
index 63abd14d6f..20d41c2e61 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -92,11 +92,12 @@ pub fn to_substrait_rel(
             };
 
             let mut select_struct = None;
-            if let Some(projection) = file_config.projection.as_ref() {
+            if let Some(projection) = file_config.projection_exprs.as_ref() {
                 let struct_items = projection
-                    .iter()
+                    .column_indices()
+                    .into_iter()
                     .map(|index| StructItem {
-                        field: *index as i32,
+                        field: index as i32,
                         // FIXME: duckdb sets this to None, but it's not clear 
why.
                         // 
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
                         child: None,
diff --git a/docs/source/library-user-guide/upgrading.md 
b/docs/source/library-user-guide/upgrading.md
index 4174fef7a6..c568b8b28e 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -125,6 +125,57 @@ Users may need to update their paths to account for these 
changes.
 
 See [issue #17713] for more details.
 
+### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs`
+
+The `projection` field in `FileScanConfig` has been renamed to 
`projection_exprs` and its type has changed from `Option<Vec<usize>>` to 
`Option<ProjectionExprs>`. This change enables more powerful projection 
pushdown capabilities by supporting arbitrary physical expressions rather than 
just column indices.
+
+**Impact on direct field access:**
+
+If you directly access the `projection` field:
+
+```rust
+# /* comment to avoid running
+let config: FileScanConfig = ...;
+let projection = config.projection;
+# */
+```
+
+You should update to:
+
+```rust
+# /* comment to avoid running
+let config: FileScanConfig = ...;
+let projection_exprs = config.projection_exprs;
+# */
+```
+
+**Impact on builders:**
+
+The `FileScanConfigBuilder::with_projection()` method has been deprecated in 
favor of `with_projection_indices()`:
+
+```diff
+let config = FileScanConfigBuilder::new(url, schema, file_source)
+-   .with_projection(Some(vec![0, 2, 3]))
++   .with_projection_indices(Some(vec![0, 2, 3]))
+    .build();
+```
+
+Note: `with_projection()` still works but is deprecated and will be removed in 
a future release.
+
+**What is `ProjectionExprs`?**
+
+`ProjectionExprs` is a new type that represents a list of physical expressions 
for projection. While it can be constructed from column indices (which is what 
`with_projection_indices` does internally), it also supports arbitrary physical 
expressions, enabling advanced features like expression evaluation during 
scanning.
+
+You can access column indices from `ProjectionExprs` using its methods if 
needed:
+
+```rust
+# /* comment to avoid running
+let projection_exprs: ProjectionExprs = ...;
+// Get the column indices if the projection only contains simple column 
references
+let indices = projection_exprs.column_indices();
+# */
+```
+
 ### `DESCRIBE query` support
 
 `DESCRIBE query` was previously an alias for `EXPLAIN query`, which outputs the


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to