This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6eb8d45c12 Let `FileScanConfig` own a list of `ProjectionExpr`s
(#18253)
6eb8d45c12 is described below
commit 6eb8d45c122cf4c690bf3e8521c529ec720b549b
Author: Matthew Kim <[email protected]>
AuthorDate: Mon Oct 27 14:47:36 2025 -0400
Let `FileScanConfig` own a list of `ProjectionExpr`s (#18253)
## Which issue does this PR close?
- Related to https://github.com/apache/datafusion/issues/14993
## Rationale for this change
To enable expression pushdown to file sources, we need to plumb
expressions through the `FileScanConfig` layer. Currently,
`FileScanConfig` only tracks column indices for projection, which limits
us to simple and naive column selection.
This PR begins expression pushdown implementation by having
`FileScanConfig` own a list of `ProjectionExpr`s, instead of column
indices. This allows file sources to eventually receive and optimize
based on the actual expressions being projected.
## Notes about this PR
- The first commit is based off of
https://github.com/apache/datafusion/pull/18231
- To avoid a super large diff and a harder review, I've decided to break
(#14993) into 2 tasks:
- Have the `DataSource` (`FileScanConfig`) actually hold projection
expressions (this PR)
- Flow the projection expressions from `DataSourceExec` all the way to
the `FileSource`
---------
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
.../examples/advanced_parquet_index.rs | 2 +-
datafusion-examples/examples/csv_json_opener.rs | 4 +-
.../examples/default_column_values.rs | 2 +-
datafusion-examples/examples/parquet_index.rs | 2 +-
datafusion/catalog-listing/src/table.rs | 2 +-
datafusion/core/src/datasource/file_format/mod.rs | 2 +-
.../core/src/datasource/physical_plan/avro.rs | 6 +-
.../core/src/datasource/physical_plan/csv.rs | 6 +-
.../core/src/datasource/physical_plan/json.rs | 4 +-
.../core/src/datasource/physical_plan/parquet.rs | 4 +-
datafusion/core/tests/parquet/schema_coercion.rs | 2 +-
.../physical_optimizer/filter_pushdown/util.rs | 2 +-
.../physical_optimizer/projection_pushdown.rs | 6 +-
datafusion/datasource/src/file_scan_config.rs | 135 ++++++++++-----
datafusion/datasource/src/table_schema.rs | 4 +
datafusion/physical-expr/src/projection.rs | 186 ++++++++++++++++-----
datafusion/physical-plan/src/projection.rs | 8 +-
datafusion/proto/src/physical_plan/from_proto.rs | 2 +-
datafusion/proto/src/physical_plan/to_proto.rs | 5 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 4 +-
datafusion/substrait/src/physical_plan/consumer.rs | 4 +-
datafusion/substrait/src/physical_plan/producer.rs | 7 +-
docs/source/library-user-guide/upgrading.md | 51 ++++++
23 files changed, 334 insertions(+), 116 deletions(-)
diff --git a/datafusion-examples/examples/advanced_parquet_index.rs
b/datafusion-examples/examples/advanced_parquet_index.rs
index 55400e2192..1c560be6d0 100644
--- a/datafusion-examples/examples/advanced_parquet_index.rs
+++ b/datafusion-examples/examples/advanced_parquet_index.rs
@@ -502,7 +502,7 @@ impl TableProvider for IndexTableProvider {
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
- .with_projection(projection.cloned())
+ .with_projection_indices(projection.cloned())
.with_file(partitioned_file)
.build();
diff --git a/datafusion-examples/examples/csv_json_opener.rs
b/datafusion-examples/examples/csv_json_opener.rs
index 1a2c2cbff4..8abed90238 100644
--- a/datafusion-examples/examples/csv_json_opener.rs
+++ b/datafusion-examples/examples/csv_json_opener.rs
@@ -60,7 +60,7 @@ async fn csv_opener() -> Result<()> {
Arc::clone(&schema),
Arc::new(CsvSource::default()),
)
- .with_projection(Some(vec![12, 0]))
+ .with_projection_indices(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();
@@ -126,7 +126,7 @@ async fn json_opener() -> Result<()> {
schema,
Arc::new(JsonSource::default()),
)
- .with_projection(Some(vec![1, 0]))
+ .with_projection_indices(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();
diff --git a/datafusion-examples/examples/default_column_values.rs
b/datafusion-examples/examples/default_column_values.rs
index 43e2d4ca09..d3a7d2ec67 100644
--- a/datafusion-examples/examples/default_column_values.rs
+++ b/datafusion-examples/examples/default_column_values.rs
@@ -260,7 +260,7 @@ impl TableProvider for DefaultValueTableProvider {
self.schema.clone(),
Arc::new(parquet_source),
)
- .with_projection(projection.cloned())
+ .with_projection_indices(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
diff --git a/datafusion-examples/examples/parquet_index.rs
b/datafusion-examples/examples/parquet_index.rs
index afc3b279f4..127c55da98 100644
--- a/datafusion-examples/examples/parquet_index.rs
+++ b/datafusion-examples/examples/parquet_index.rs
@@ -246,7 +246,7 @@ impl TableProvider for IndexTableProvider {
let source =
Arc::new(ParquetSource::default().with_predicate(predicate));
let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
- .with_projection(projection.cloned())
+ .with_projection_indices(projection.cloned())
.with_limit(limit);
// Transform to the format needed to pass to DataSourceExec
diff --git a/datafusion/catalog-listing/src/table.rs
b/datafusion/catalog-listing/src/table.rs
index e9ac1bf097..95f9523d44 100644
--- a/datafusion/catalog-listing/src/table.rs
+++ b/datafusion/catalog-listing/src/table.rs
@@ -499,7 +499,7 @@ impl TableProvider for ListingTable {
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
- .with_projection(projection)
+ .with_projection_indices(projection)
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index e165707c2e..4881783eeb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -90,7 +90,7 @@ pub(crate) mod test_util {
)
.with_file_groups(file_groups)
.with_statistics(statistics)
- .with_projection(projection)
+ .with_projection_indices(projection)
.with_limit(limit)
.build(),
)
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 8a00af959c..9068c97581 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -88,7 +88,7 @@ mod tests {
source,
)
.with_file(meta.into())
- .with_projection(Some(vec![0, 1, 2]))
+ .with_projection_indices(Some(vec![0, 1, 2]))
.build();
let source_exec = DataSourceExec::from_data_source(conf);
@@ -160,7 +160,7 @@ mod tests {
let source = Arc::new(AvroSource::new());
let conf = FileScanConfigBuilder::new(object_store_url, file_schema,
source)
.with_file(meta.into())
- .with_projection(projection)
+ .with_projection_indices(projection)
.build();
let source_exec = DataSourceExec::from_data_source(conf);
@@ -231,7 +231,7 @@ mod tests {
let conf = FileScanConfigBuilder::new(object_store_url, file_schema,
source)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table
schema.
- .with_projection(projection)
+ .with_projection_indices(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8,
false)])
.build();
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index b2ef51a76f..4f46a57d8b 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -118,7 +118,7 @@ mod tests {
))
.with_file_compression_type(file_compression_type)
.with_newlines_in_values(false)
- .with_projection(Some(vec![0, 2, 4]))
+ .with_projection_indices(Some(vec![0, 2, 4]))
.build();
assert_eq!(13, config.file_schema().fields().len());
@@ -183,7 +183,7 @@ mod tests {
))
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
- .with_projection(Some(vec![4, 0, 2]))
+ .with_projection_indices(Some(vec![4, 0, 2]))
.build();
assert_eq!(13, config.file_schema().fields().len());
let csv = DataSourceExec::from_data_source(config);
@@ -373,7 +373,7 @@ mod tests {
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8,
false)])
// We should be able to project on the partition column
// Which is supposed to be after the file fields
- .with_projection(Some(vec![0, num_file_schema_fields]))
+ .with_projection_indices(Some(vec![0, num_file_schema_fields]))
.build();
// we don't have `/date=xx/` in the path but that is ok because
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 0d45711c76..f7d5c710bf 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -297,7 +297,7 @@ mod tests {
let source = Arc::new(JsonSource::new());
let conf = FileScanConfigBuilder::new(object_store_url, file_schema,
source)
.with_file_groups(file_groups)
- .with_projection(Some(vec![0, 2]))
+ .with_projection_indices(Some(vec![0, 2]))
.with_file_compression_type(file_compression_type.to_owned())
.build();
let exec = DataSourceExec::from_data_source(conf);
@@ -345,7 +345,7 @@ mod tests {
let source = Arc::new(JsonSource::new());
let conf = FileScanConfigBuilder::new(object_store_url, file_schema,
source)
.with_file_groups(file_groups)
- .with_projection(Some(vec![3, 0, 2]))
+ .with_projection_indices(Some(vec![3, 0, 2]))
.with_file_compression_type(file_compression_type.to_owned())
.build();
let exec = DataSourceExec::from_data_source(conf);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 10a475c1cc..6df5cd7ac6 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -201,7 +201,7 @@ mod tests {
source,
)
.with_file_group(file_group)
- .with_projection(self.projection.clone())
+ .with_projection_indices(self.projection.clone())
.build();
DataSourceExec::from_data_source(base_config)
}
@@ -1655,7 +1655,7 @@ mod tests {
let config = FileScanConfigBuilder::new(object_store_url,
schema.clone(), source)
.with_file(partitioned_file)
// file has 10 cols so index 12 should be month and 13 should be
day
- .with_projection(Some(vec![0, 1, 2, 12, 13]))
+ .with_projection_indices(Some(vec![0, 1, 2, 12, 13]))
.with_table_partition_cols(vec![
Field::new("year", DataType::Utf8, false),
Field::new("month", DataType::UInt8, false),
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs
b/datafusion/core/tests/parquet/schema_coercion.rs
index 59cbf4b087..9be391a910 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -126,7 +126,7 @@ async fn multi_parquet_coercion_projection() {
Arc::new(ParquetSource::default()),
)
.with_file_group(file_group)
- .with_projection(Some(vec![1, 0, 2]))
+ .with_projection_indices(Some(vec![1, 0, 2]))
.build();
let parquet_exec = DataSourceExec::from_data_source(config);
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index f05f3f0028..54e8e7bf04 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -165,7 +165,7 @@ impl FileSource for TestSource {
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
Arc::new(TestSource {
- projection: config.projection.clone(),
+ projection: config.projection_exprs.as_ref().map(|p|
p.column_indices()),
..self.clone()
})
}
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index c51a5e02c9..8631613c39 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -390,7 +390,7 @@ fn create_simple_csv_exec() -> Arc<dyn ExecutionPlan> {
Arc::new(CsvSource::new(false, 0, 0)),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
- .with_projection(Some(vec![0, 1, 2, 3, 4]))
+ .with_projection_indices(Some(vec![0, 1, 2, 3, 4]))
.build();
DataSourceExec::from_data_source(config)
@@ -409,7 +409,7 @@ fn create_projecting_csv_exec() -> Arc<dyn ExecutionPlan> {
Arc::new(CsvSource::new(false, 0, 0)),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
- .with_projection(Some(vec![3, 2, 1]))
+ .with_projection_indices(Some(vec![3, 2, 1]))
.build();
DataSourceExec::from_data_source(config)
@@ -1596,7 +1596,7 @@ fn partitioned_data_source() -> Arc<DataSourceExec> {
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_table_partition_cols(vec![Field::new("partition_col",
DataType::Utf8, true)])
- .with_projection(Some(vec![0, 1, 2]))
+ .with_projection_indices(Some(vec![0, 1, 2]))
.build();
DataSourceExec::from_data_source(config)
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 695252803b..c52397d9a7 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -44,18 +44,20 @@ use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
};
use datafusion_expr::Operator;
-use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::{expressions::Column,
utils::reassign_expr_columns};
+use datafusion_physical_expr::expressions::{BinaryExpr, Column};
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties,
Partitioning};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use datafusion_physical_plan::projection::ProjectionExpr;
+use datafusion_physical_plan::projection::{
+ all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
+};
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet,
- projection::{all_alias_free_columns, new_projections_for_columns},
DisplayAs, DisplayFormatType,
};
use std::{
@@ -124,7 +126,7 @@ use log::{debug, warn};
/// let file_source = Arc::new(ParquetSource::new());
/// let config = FileScanConfigBuilder::new(object_store_url, file_schema,
file_source)
/// .with_limit(Some(1000)) // read only the first 1000 records
-/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
+/// .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single
group
/// .with_file(PartitionedFile::new("file1.parquet", 1234))
/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
@@ -175,9 +177,12 @@ pub struct FileScanConfig {
pub file_groups: Vec<FileGroup>,
/// Table constraints
pub constraints: Constraints,
- /// Columns on which to project the data. Indexes that are higher than the
- /// number of columns of `file_schema` refer to `table_partition_cols`.
- pub projection: Option<Vec<usize>>,
+ /// Physical expressions defining the projection to apply when reading
data.
+ ///
+ /// Each expression in the projection can reference columns from both the
file
+ /// schema and table partition columns. If `None`, all columns from the
table
+ /// schema are projected.
+ pub projection_exprs: Option<ProjectionExprs>,
/// The maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
pub limit: Option<usize>,
@@ -229,7 +234,7 @@ pub struct FileScanConfig {
/// // Set a limit of 1000 rows
/// .with_limit(Some(1000))
/// // Project only the first column
-/// .with_projection(Some(vec![0]))
+/// .with_projection_indices(Some(vec![0]))
/// // Add partition columns
/// .with_table_partition_cols(vec![
/// Field::new("date", DataType::Utf8, false),
@@ -261,7 +266,7 @@ pub struct FileScanConfigBuilder {
table_schema: TableSchema,
file_source: Arc<dyn FileSource>,
limit: Option<usize>,
- projection: Option<Vec<usize>>,
+ projection_indices: Option<Vec<usize>>,
constraints: Option<Constraints>,
file_groups: Vec<FileGroup>,
statistics: Option<Statistics>,
@@ -294,7 +299,7 @@ impl FileScanConfigBuilder {
file_compression_type: None,
new_lines_in_values: None,
limit: None,
- projection: None,
+ projection_indices: None,
constraints: None,
batch_size: None,
expr_adapter_factory: None,
@@ -317,10 +322,25 @@ impl FileScanConfigBuilder {
self
}
+ pub fn table_schema(&self) -> &SchemaRef {
+ self.table_schema.table_schema()
+ }
+
/// Set the columns on which to project the data. Indexes that are higher
than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
- pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
- self.projection = projection;
+ ///
+ /// # Deprecated
+ /// Use [`Self::with_projection_indices`] instead. This method will be
removed in a future release.
+ #[deprecated(since = "51.0.0", note = "Use with_projection_indices
instead")]
+ pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
+ self.with_projection_indices(indices)
+ }
+
+ /// Set the columns on which to project the data using column indices.
+ ///
+ /// Indexes that are higher than the number of columns of `file_schema`
refer to `table_partition_cols`.
+ pub fn with_projection_indices(mut self, indices: Option<Vec<usize>>) ->
Self {
+ self.projection_indices = indices;
self
}
@@ -433,7 +453,7 @@ impl FileScanConfigBuilder {
table_schema,
file_source,
limit,
- projection,
+ projection_indices,
constraints,
file_groups,
statistics,
@@ -455,12 +475,18 @@ impl FileScanConfigBuilder {
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
+ // Convert projection indices to ProjectionExprs using the final table
schema
+ // (which now includes partition columns if they were added)
+ let projection_exprs = projection_indices.map(|indices| {
+ ProjectionExprs::from_indices(&indices,
table_schema.table_schema())
+ });
+
FileScanConfig {
object_store_url,
table_schema,
file_source,
limit,
- projection,
+ projection_exprs,
constraints,
file_groups,
output_ordering,
@@ -484,7 +510,9 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
file_compression_type: Some(config.file_compression_type),
new_lines_in_values: Some(config.new_lines_in_values),
limit: config.limit,
- projection: config.projection,
+ projection_indices: config
+ .projection_exprs
+ .map(|p| p.ordered_column_indices()),
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
@@ -673,15 +701,16 @@ impl DataSource for FileScanConfig {
let new_projections = new_projections_for_columns(
projection,
&file_scan
- .projection
- .clone()
+ .projection_exprs
+ .as_ref()
+ .map(|p| p.ordered_column_indices())
.unwrap_or_else(||
(0..self.file_schema().fields().len()).collect()),
);
Arc::new(
FileScanConfigBuilder::from(file_scan)
// Assign projected statistics to source
- .with_projection(Some(new_projections))
+ .with_projection_indices(Some(new_projections))
.with_source(source)
.build(),
) as _
@@ -727,8 +756,8 @@ impl FileScanConfig {
}
fn projection_indices(&self) -> Vec<usize> {
- match &self.projection {
- Some(proj) => proj.clone(),
+ match &self.projection_exprs {
+ Some(proj) => proj.ordered_column_indices(),
None => (0..self.file_schema().fields().len()
+ self.table_partition_cols().len())
.collect(),
@@ -825,7 +854,7 @@ impl FileScanConfig {
/// Project the schema, constraints, and the statistics on the given
column indices
pub fn project(&self) -> (SchemaRef, Constraints, Statistics,
Vec<LexOrdering>) {
- if self.projection.is_none() && self.table_partition_cols().is_empty()
{
+ if self.projection_exprs.is_none() &&
self.table_partition_cols().is_empty() {
return (
Arc::clone(self.file_schema()),
self.constraints.clone(),
@@ -844,12 +873,17 @@ impl FileScanConfig {
}
pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
- self.projection.as_ref().map(|p| {
- p.iter()
- .filter(|col_idx| **col_idx <
self.file_schema().fields().len())
- .map(|col_idx| self.file_schema().field(*col_idx).name())
+ let fields = self.file_schema().fields();
+
+ self.projection_exprs.as_ref().map(|p| {
+ let column_indices = p.ordered_column_indices();
+
+ column_indices
+ .iter()
+ .filter(|&&col_i| col_i < fields.len())
+ .map(|&col_i| self.file_schema().field(col_i).name())
.cloned()
- .collect()
+ .collect::<Vec<_>>()
})
}
@@ -875,11 +909,11 @@ impl FileScanConfig {
}
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
- self.projection.as_ref().map(|p| {
- p.iter()
- .filter(|col_idx| **col_idx <
self.file_schema().fields().len())
- .copied()
- .collect()
+ self.projection_exprs.as_ref().map(|p| {
+ p.ordered_column_indices()
+ .into_iter()
+ .filter(|&i| i < self.file_schema().fields().len())
+ .collect::<Vec<_>>()
})
}
@@ -1415,10 +1449,15 @@ fn get_projected_output_ordering(
return false;
}
+ let indices = base_config
+ .projection_exprs
+ .as_ref()
+ .map(|p| p.ordered_column_indices());
+
let statistics = match MinMaxStatistics::new_from_files(
&new_ordering,
projected_schema,
- base_config.projection.as_deref(),
+ indices.as_deref(),
group.iter(),
) {
Ok(statistics) => statistics,
@@ -1479,7 +1518,7 @@ mod tests {
use datafusion_common::{assert_batches_eq, internal_err};
use datafusion_expr::{Operator, SortExpr};
use datafusion_physical_expr::create_physical_sort_expr;
- use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
+ use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
/// Returns the column names on the schema
@@ -2143,7 +2182,7 @@ mod tests {
file_schema,
Arc::new(MockSource::default()),
)
- .with_projection(projection)
+ .with_projection_indices(projection)
.with_statistics(statistics)
.with_table_partition_cols(table_partition_cols)
.build()
@@ -2196,7 +2235,7 @@ mod tests {
// Build with various configurations
let config = builder
.with_limit(Some(1000))
- .with_projection(Some(vec![0, 1]))
+ .with_projection_indices(Some(vec![0, 1]))
.with_table_partition_cols(vec![Field::new(
"date",
wrap_partition_type_in_dict(DataType::Utf8),
@@ -2219,7 +2258,10 @@ mod tests {
assert_eq!(config.object_store_url, object_store_url);
assert_eq!(*config.file_schema(), file_schema);
assert_eq!(config.limit, Some(1000));
- assert_eq!(config.projection, Some(vec![0, 1]));
+ assert_eq!(
+ config.projection_exprs.as_ref().map(|p| p.column_indices()),
+ Some(vec![0, 1])
+ );
assert_eq!(config.table_partition_cols().len(), 1);
assert_eq!(config.table_partition_cols()[0].name(), "date");
assert_eq!(config.file_groups.len(), 1);
@@ -2253,7 +2295,7 @@ mod tests {
Arc::clone(&file_schema),
Arc::clone(&file_source),
)
- .with_projection(Some(vec![0, 1, 2]))
+ .with_projection_indices(Some(vec![0, 1, 2]))
.build();
// Simulate projection being updated. Since the filter has already
been pushed down,
@@ -2302,7 +2344,10 @@ mod tests {
assert_eq!(config.object_store_url, object_store_url);
assert_eq!(*config.file_schema(), file_schema);
assert_eq!(config.limit, None);
- assert_eq!(config.projection, None);
+ assert_eq!(
+ config.projection_exprs.as_ref().map(|p| p.column_indices()),
+ None
+ );
assert!(config.table_partition_cols().is_empty());
assert!(config.file_groups.is_empty());
assert_eq!(
@@ -2357,7 +2402,7 @@ mod tests {
Arc::clone(&schema),
Arc::clone(&file_source),
)
- .with_projection(Some(vec![0, 2]))
+ .with_projection_indices(Some(vec![0, 2]))
.with_limit(Some(10))
.with_table_partition_cols(partition_cols.clone())
.with_file(file.clone())
@@ -2375,7 +2420,13 @@ mod tests {
let partition_cols =
partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
assert_eq!(new_config.object_store_url, object_store_url);
assert_eq!(*new_config.file_schema(), schema);
- assert_eq!(new_config.projection, Some(vec![0, 2]));
+ assert_eq!(
+ new_config
+ .projection_exprs
+ .as_ref()
+ .map(|p| p.column_indices()),
+ Some(vec![0, 2])
+ );
assert_eq!(new_config.limit, Some(10));
assert_eq!(*new_config.table_partition_cols(), partition_cols);
assert_eq!(new_config.file_groups.len(), 1);
@@ -2594,7 +2645,7 @@ mod tests {
Arc::clone(&schema),
Arc::new(MockSource::default()),
)
- .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
+ .with_projection_indices(Some(vec![0, 2])) // Only project columns 0
and 2
.with_file_groups(vec![file_group])
.build();
diff --git a/datafusion/datasource/src/table_schema.rs
b/datafusion/datasource/src/table_schema.rs
index 8e95585ce8..863c123e3b 100644
--- a/datafusion/datasource/src/table_schema.rs
+++ b/datafusion/datasource/src/table_schema.rs
@@ -132,6 +132,10 @@ impl TableSchema {
table_partition_cols: Vec<FieldRef>,
) -> TableSchema {
self.table_partition_cols = table_partition_cols;
+ // Rebuild the table schema with the new partition columns
+ let mut builder = SchemaBuilder::from(self.file_schema.as_ref());
+ builder.extend(self.table_partition_cols.iter().cloned());
+ self.table_schema = Arc::new(builder.finish());
self
}
diff --git a/datafusion/physical-expr/src/projection.rs
b/datafusion/physical-expr/src/projection.rs
index e35bfbb3a2..fc972d644e 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -100,24 +100,24 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>,
String) {
/// representing a complete projection operation and provides
/// methods to manipulate and analyze the projection as a whole.
#[derive(Debug, Clone)]
-pub struct Projection {
+pub struct ProjectionExprs {
exprs: Vec<ProjectionExpr>,
}
-impl std::fmt::Display for Projection {
+impl std::fmt::Display for ProjectionExprs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let exprs: Vec<String> = self.exprs.iter().map(|e|
e.to_string()).collect();
write!(f, "Projection[{}]", exprs.join(", "))
}
}
-impl From<Vec<ProjectionExpr>> for Projection {
+impl From<Vec<ProjectionExpr>> for ProjectionExprs {
fn from(value: Vec<ProjectionExpr>) -> Self {
Self { exprs: value }
}
}
-impl From<&[ProjectionExpr]> for Projection {
+impl From<&[ProjectionExpr]> for ProjectionExprs {
fn from(value: &[ProjectionExpr]) -> Self {
Self {
exprs: value.to_vec(),
@@ -125,15 +125,83 @@ impl From<&[ProjectionExpr]> for Projection {
}
}
-impl AsRef<[ProjectionExpr]> for Projection {
+impl FromIterator<ProjectionExpr> for ProjectionExprs {
+ fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
+ Self {
+ exprs: exprs.into_iter().collect::<Vec<_>>(),
+ }
+ }
+}
+
+impl AsRef<[ProjectionExpr]> for ProjectionExprs {
fn as_ref(&self) -> &[ProjectionExpr] {
&self.exprs
}
}
-impl Projection {
- pub fn new(exprs: Vec<ProjectionExpr>) -> Self {
- Self { exprs }
+impl ProjectionExprs {
+ pub fn new<I>(exprs: I) -> Self
+ where
+ I: IntoIterator<Item = ProjectionExpr>,
+ {
+ Self {
+ exprs: exprs.into_iter().collect::<Vec<_>>(),
+ }
+ }
+
+ /// Creates a [`ProjectionExpr`] from a list of column indices.
+ ///
+ /// This is a convenience method for creating simple column-only
projections, where each projection expression is a reference to a column
+ /// in the input schema.
+ ///
+ /// # Behavior
+ /// - Ordering: the output projection preserves the exact order of indices
provided in the input slice
+ /// For example, `[2, 0, 1]` will produce projections for columns 2, 0,
then 1 in that order
+ /// - Duplicates: Duplicate indices are allowed and will create multiple
projection expressions referencing the same source column
+ /// For example, `[0, 0]` creates 2 separate projections both
referencing column 0
+ ///
+ /// # Panics
+ /// Panics if any index in `indices` is out of bounds for the provided
schema.
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use std::sync::Arc;
+ /// use arrow::datatypes::{Schema, Field, DataType};
+ /// use datafusion_physical_expr::projection::ProjectionExprs;
+ ///
+ /// // Create a schema with three columns
+ /// let schema = Arc::new(Schema::new(vec![
+ /// Field::new("a", DataType::Int32, false),
+ /// Field::new("b", DataType::Utf8, false),
+ /// Field::new("c", DataType::Float64, false),
+ /// ]));
+ ///
+ /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
+ /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
+ ///
+ /// // This creates: SELECT c@2 AS c, a@0 AS a
+ /// assert_eq!(projection.as_ref().len(), 2);
+ /// assert_eq!(projection.as_ref()[0].alias, "c");
+ /// assert_eq!(projection.as_ref()[1].alias, "a");
+ ///
+ /// // Duplicate indices are allowed
+ /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1],
&schema);
+ /// assert_eq!(projection_with_dups.as_ref().len(), 3);
+ /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
+ /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
+ /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
+ /// ```
+ pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
+ let projection_exprs = indices.iter().map(|&i| {
+ let field = schema.field(i);
+ ProjectionExpr {
+ expr: Arc::new(Column::new(field.name(), i)),
+ alias: field.name().clone(),
+ }
+ });
+
+ Self::from_iter(projection_exprs)
}
/// Returns an iterator over the projection expressions
@@ -167,7 +235,7 @@ impl Projection {
///
/// ```rust
/// use std::sync::Arc;
- /// use datafusion_physical_expr::projection::{Projection, ProjectionExpr};
+ /// use datafusion_physical_expr::projection::{ProjectionExprs,
ProjectionExpr};
/// use datafusion_physical_expr::expressions::{Column, BinaryExpr,
Literal};
/// use datafusion_common::{Result, ScalarValue};
/// use datafusion_expr::Operator;
@@ -175,7 +243,7 @@ impl Projection {
/// fn main() -> Result<()> {
/// // Example from the docstring:
/// // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
- /// let base = Projection::new(vec![
+ /// let base = ProjectionExprs::new(vec![
/// ProjectionExpr {
/// expr: Arc::new(Column::new("c", 2)),
/// alias: "x".to_string(),
@@ -191,7 +259,7 @@ impl Projection {
/// ]);
///
/// // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
- /// let top = Projection::new(vec,
+ /// this function:
+ /// - Preserves the projection order (does not sort)
+ /// - Preserves duplicates (does not deduplicate)
+ ///
+ /// # Example
+ ///
+ /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is
at index 2,
+ /// this function would return `[2, 0, 2]`.
+ ///
+ /// Use [`column_indices()`](Self::column_indices) instead if the
projection may contain
+ /// non-column expressions or if you need a deduplicated sorted list.
+ pub fn ordered_column_indices(&self) -> Vec<usize> {
+ self.exprs
+ .iter()
+ .map(|e| {
+ e.expr
+ .as_any()
+ .downcast_ref::<Column>()
+ .expect("Expected column reference in projection")
+ .index()
+ })
+ .collect()
+ }
+
/// Project a schema according to this projection.
/// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a`
is at index 0 and `b` is at index 1,
/// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output
schema would be `[x: Int32, y: Int32]`.
@@ -327,7 +435,7 @@ impl Projection {
}
}
-impl<'a> IntoIterator for &'a Projection {
+impl<'a> IntoIterator for &'a ProjectionExprs {
type Item = &'a ProjectionExpr;
type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
@@ -336,7 +444,7 @@ impl<'a> IntoIterator for &'a Projection {
}
}
-impl IntoIterator for Projection {
+impl IntoIterator for ProjectionExprs {
type Item = ProjectionExpr;
type IntoIter = std::vec::IntoIter<ProjectionExpr>;
@@ -1570,7 +1678,7 @@ pub(crate) mod tests {
let source = get_stats();
let schema = get_schema();
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("col1", 1)),
alias: "col1".to_string(),
@@ -1612,7 +1720,7 @@ pub(crate) mod tests {
let source = get_stats();
let schema = get_schema();
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("col2", 2)),
alias: "col2".to_string(),
@@ -1663,7 +1771,7 @@ pub(crate) mod tests {
alias: "b".to_string(),
},
];
- let projection = Projection::new(exprs.clone());
+ let projection = ProjectionExprs::new(exprs.clone());
assert_eq!(projection.as_ref().len(), 2);
Ok(())
}
@@ -1674,7 +1782,7 @@ pub(crate) mod tests {
expr: Arc::new(Column::new("x", 0)),
alias: "x".to_string(),
}];
- let projection: Projection = exprs.clone().into();
+ let projection: ProjectionExprs = exprs.clone().into();
assert_eq!(projection.as_ref().len(), 1);
Ok(())
}
@@ -1691,7 +1799,7 @@ pub(crate) mod tests {
alias: "col2".to_string(),
},
];
- let projection = Projection::new(exprs);
+ let projection = ProjectionExprs::new(exprs);
let as_ref: &[ProjectionExpr] = projection.as_ref();
assert_eq!(as_ref.len(), 2);
Ok(())
@@ -1700,7 +1808,7 @@ pub(crate) mod tests {
#[test]
fn test_column_indices_multiple_columns() -> Result<()> {
// Test with reversed column order to ensure proper reordering
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("c", 5)),
alias: "c".to_string(),
@@ -1722,7 +1830,7 @@ pub(crate) mod tests {
#[test]
fn test_column_indices_duplicates() -> Result<()> {
// Test that duplicate column indices appear only once
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("a", 1)),
alias: "a".to_string(),
@@ -1743,7 +1851,7 @@ pub(crate) mod tests {
#[test]
fn test_column_indices_unsorted() -> Result<()> {
// Test that column indices are sorted in the output
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("c", 5)),
alias: "c".to_string(),
@@ -1769,7 +1877,7 @@ pub(crate) mod tests {
Operator::Plus,
Arc::new(Column::new("b", 4)),
));
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr,
alias: "sum".to_string(),
@@ -1786,7 +1894,7 @@ pub(crate) mod tests {
#[test]
fn test_column_indices_empty() -> Result<()> {
- let projection = Projection::new(vec![]);
+ let projection = ProjectionExprs::new(vec![]);
assert_eq!(projection.column_indices(), Vec::<usize>::new());
Ok(())
}
@@ -1794,7 +1902,7 @@ pub(crate) mod tests {
#[test]
fn test_merge_simple_columns() -> Result<()> {
// First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
- let base_projection = Projection::new(vec![
+ let base_projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("c", 2)),
alias: "x".to_string(),
@@ -1810,7 +1918,7 @@ pub(crate) mod tests {
]);
// Second projection: SELECT y@1 AS col2, x@0 AS col1
- let top_projection = Projection::new(vec![
+ let top_projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("y", 1)),
alias: "col2".to_string(),
@@ -1831,7 +1939,7 @@ pub(crate) mod tests {
#[test]
fn test_merge_with_expressions() -> Result<()> {
// First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
- let base_projection = Projection::new(vec![
+ let base_projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("c", 2)),
alias: "x".to_string(),
@@ -1847,7 +1955,7 @@ pub(crate) mod tests {
]);
// Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
- let top_projection = Projection::new(vec![
+ let top_projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("y", 1)),
@@ -1876,7 +1984,7 @@ pub(crate) mod tests {
#[test]
fn try_merge_error() {
// Create a base projection
- let base = Projection::new(vec![
+ let base = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("a", 0)),
alias: "x".to_string(),
@@ -1888,7 +1996,7 @@ pub(crate) mod tests {
]);
// Create a top projection that references a non-existent column index
- let top = Projection::new(vec![ProjectionExpr {
+ let top = ProjectionExprs::new(vec![ProjectionExpr {
expr: Arc::new(Column::new("z", 5)), // Invalid index
alias: "result".to_string(),
}]);
@@ -1907,7 +2015,7 @@ pub(crate) mod tests {
let input_schema = get_schema();
// Projection: SELECT col2 AS c, col0 AS a
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("col2", 2)),
alias: "c".to_string(),
@@ -1940,7 +2048,7 @@ pub(crate) mod tests {
let input_schema = get_schema();
// Projection: SELECT col0 + 1 AS incremented
- let projection = Projection::new(vec![ProjectionExpr {
+ let projection = ProjectionExprs::new(vec![ProjectionExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("col0", 0)),
Operator::Plus,
@@ -1974,7 +2082,7 @@ pub(crate) mod tests {
]);
// Projection: SELECT col0 AS renamed
- let projection = Projection::new(vec![ProjectionExpr {
+ let projection = ProjectionExprs::new(vec![ProjectionExpr {
expr: Arc::new(Column::new("col0", 0)),
alias: "renamed".to_string(),
}]);
@@ -1994,7 +2102,7 @@ pub(crate) mod tests {
#[test]
fn test_project_schema_empty() -> Result<()> {
let input_schema = get_schema();
- let projection = Projection::new(vec![]);
+ let projection = ProjectionExprs::new(vec![]);
let output_schema = projection.project_schema(&input_schema)?;
@@ -2009,7 +2117,7 @@ pub(crate) mod tests {
let input_schema = get_schema();
// Projection: SELECT col1 AS text, col0 AS num
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("col1", 1)),
alias: "text".to_string(),
@@ -2057,7 +2165,7 @@ pub(crate) mod tests {
let input_schema = get_schema();
// Projection with expression: SELECT col0 + 1 AS incremented, col1 AS
text
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("col0", 0)),
@@ -2105,7 +2213,7 @@ pub(crate) mod tests {
let input_schema = get_schema();
// Projection with only primitive width columns: SELECT col2 AS f,
col0 AS i
- let projection = Projection::new(vec![
+ let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Column::new("col2", 2)),
alias: "f".to_string(),
@@ -2136,7 +2244,7 @@ pub(crate) mod tests {
let input_stats = get_stats();
let input_schema = get_schema();
- let projection = Projection::new(vec![]);
+ let projection = ProjectionExprs::new(vec![]);
let output_stats = projection.project_statistics(input_stats,
&input_schema)?;
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index 4dc88bc566..2c84570b33 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -53,7 +53,9 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql,
PhysicalExprRef};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
// Re-exported from datafusion-physical-expr for backwards compatibility
// We recommend updating your imports to use datafusion-physical-expr directly
-pub use datafusion_physical_expr::projection::{update_expr, Projection,
ProjectionExpr};
+pub use datafusion_physical_expr::projection::{
+ update_expr, ProjectionExpr, ProjectionExprs,
+};
use futures::stream::{Stream, StreamExt};
use log::trace;
@@ -65,7 +67,7 @@ use log::trace;
#[derive(Debug, Clone)]
pub struct ProjectionExec {
/// The projection expressions stored as tuples of (expression, output
column name)
- projection: Projection,
+ projection: ProjectionExprs,
/// The schema once the projection has been applied to the input
schema: SchemaRef,
/// The input plan
@@ -130,7 +132,7 @@ impl ProjectionExec {
let input_schema = input.schema();
// convert argument to Vec<ProjectionExpr>
let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
- let projection = Projection::new(expr_vec);
+ let projection = ProjectionExprs::new(expr_vec);
let schema = Arc::new(projection.project_schema(&input_schema)?);
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 7c4b9e55b8..2a3906d493 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -545,7 +545,7 @@ pub fn parse_protobuf_file_scan_config(
.with_file_groups(file_groups)
.with_constraints(constraints)
.with_statistics(statistics)
- .with_projection(Some(projection))
+ .with_projection_indices(Some(projection))
.with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
.with_table_partition_cols(table_partition_cols)
.with_output_ordering(output_ordering)
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 399c234191..dc0a78dbcc 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -532,9 +532,10 @@ pub fn serialize_file_scan_config(
statistics: Some((&conf.file_source.statistics().unwrap()).into()),
limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
projection: conf
- .projection
+ .projection_exprs
.as_ref()
- .unwrap_or(&(0..schema.fields().len()).collect::<Vec<_>>())
+ .map(|p| p.column_indices())
+ .unwrap_or((0..schema.fields().len()).collect::<Vec<_>>())
.iter()
.map(|n| *n as u32)
.collect(),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index a0456e2031..c8b2bc02e4 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -920,7 +920,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols()
-> Result<()> {
schema,
file_source,
)
- .with_projection(Some(vec![0, 1]))
+ .with_projection_indices(Some(vec![0, 1]))
.with_file_group(FileGroup::new(vec![file_group]))
.with_table_partition_cols(vec![Field::new(
"part".to_string(),
@@ -1814,7 +1814,7 @@ async fn roundtrip_projection_source() -> Result<()> {
1024,
)])])
.with_statistics(statistics)
- .with_projection(Some(vec![0, 1, 2]))
+ .with_projection_indices(Some(vec![0, 1, 2]))
.build();
let filter = Arc::new(
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index ecf465dd3f..45a19cea80 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -151,8 +151,8 @@ pub async fn from_substrait_rel(
.iter()
.map(|item| item.field as usize)
.collect();
- base_config_builder =
-
base_config_builder.with_projection(Some(column_indices));
+ base_config_builder = base_config_builder
+ .with_projection_indices(Some(column_indices));
}
}
diff --git a/datafusion/substrait/src/physical_plan/producer.rs
b/datafusion/substrait/src/physical_plan/producer.rs
index 63abd14d6f..20d41c2e61 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -92,11 +92,12 @@ pub fn to_substrait_rel(
};
let mut select_struct = None;
- if let Some(projection) = file_config.projection.as_ref() {
+ if let Some(projection) = file_config.projection_exprs.as_ref() {
let struct_items = projection
- .iter()
+ .column_indices()
+ .into_iter()
.map(|index| StructItem {
- field: *index as i32,
+ field: index as i32,
// FIXME: duckdb sets this to None, but it's not clear
why.
//
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
child: None,
diff --git a/docs/source/library-user-guide/upgrading.md
b/docs/source/library-user-guide/upgrading.md
index 4174fef7a6..c568b8b28e 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -125,6 +125,57 @@ Users may need to update their paths to account for these
changes.
See [issue #17713] for more details.
+### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs`
+
+The `projection` field in `FileScanConfig` has been renamed to
`projection_exprs` and its type has changed from `Option<Vec<usize>>` to
`Option<ProjectionExprs>`. This change enables more powerful projection
pushdown capabilities by supporting arbitrary physical expressions rather than
just column indices.
+
+**Impact on direct field access:**
+
+If you directly access the `projection` field:
+
+```rust
+# /* comment to avoid running
+let config: FileScanConfig = ...;
+let projection = config.projection;
+# */
+```
+
+You should update to:
+
+```rust
+# /* comment to avoid running
+let config: FileScanConfig = ...;
+let projection_exprs = config.projection_exprs;
+# */
+```
+
+**Impact on builders:**
+
+The `FileScanConfigBuilder::with_projection()` method has been deprecated in
favor of `with_projection_indices()`:
+
+```diff
+let config = FileScanConfigBuilder::new(url, schema, file_source)
+- .with_projection(Some(vec![0, 2, 3]))
++ .with_projection_indices(Some(vec![0, 2, 3]))
+ .build();
+```
+
+Note: `with_projection()` still works but is deprecated and will be removed in
a future release.
+
+**What is `ProjectionExprs`?**
+
+`ProjectionExprs` is a new type that represents a list of physical expressions
for projection. While it can be constructed from column indices (which is what
`with_projection_indices` does internally), it also supports arbitrary physical
expressions, enabling advanced features like expression evaluation during
scanning.
+
+You can access column indices from `ProjectionExprs` using its methods if
needed:
+
+```rust
+# /* comment to avoid running
+let projection_exprs: ProjectionExprs = ...;
+// Get the column indices if the projection only contains simple column
references
+let indices = projection_exprs.column_indices();
+# */
+```
+
### `DESCRIBE query` support
`DESCRIBE query` was previously an alias for `EXPLAIN query`, which outputs the
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]