adriangb commented on code in PR #20061:
URL: https://github.com/apache/datafusion/pull/20061#discussion_r2741776013
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1671,6 +1654,217 @@ impl DefaultPhysicalPlanner {
Ok(exec_node)
}
+ /// Plan a TableScan node, handling filter pushdown classification and
+ /// wrapping with FilterExec/ProjectionExec as needed.
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ use datafusion_expr::TableProviderFilterPushDown;
+
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove all qualifiers from filters as the provider doesn't know
+ // (nor should care) how the relation was referred to in the query
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Separate volatile filters (they should never be pushed down)
+ let (volatile_filters, non_volatile_filters): (Vec<Expr>, Vec<Expr>) =
filters
+ .into_iter()
+ .partition(|pred: &Expr| pred.is_volatile());
+
+ // Classify filters using supports_filters_pushdown
+ let filter_refs: Vec<&Expr> = non_volatile_filters.iter().collect();
+ let supported = provider.supports_filters_pushdown(&filter_refs)?;
+
+ assert_eq_or_internal_err!(
+ non_volatile_filters.len(),
+ supported.len(),
+ "supports_filters_pushdown returned {} results for {} filters",
+ supported.len(),
+ non_volatile_filters.len()
+ );
+
+ // Separate filters into:
+ // - pushable_filters: Exact or Inexact filters to pass to the provider
+ // - post_scan_filters: Inexact, Unsupported, and volatile filters for
FilterExec
+ let mut pushable_filters = Vec::new();
+ let mut post_scan_filters = Vec::new();
+
+ for (filter, support) in
non_volatile_filters.into_iter().zip(supported.iter()) {
+ match support {
+ TableProviderFilterPushDown::Exact => {
+ pushable_filters.push(filter);
+ }
+ TableProviderFilterPushDown::Inexact => {
+ pushable_filters.push(filter.clone());
+ post_scan_filters.push(filter);
+ }
+ TableProviderFilterPushDown::Unsupported => {
+ post_scan_filters.push(filter);
+ }
+ }
+ }
+
+ // Add volatile filters to post_scan_filters
+ post_scan_filters.extend(volatile_filters);
+
+ // Compute required column indices for the scan
+ // We need columns from both projection expressions and post-scan
filters
+ let scan_projection = self.compute_scan_projection(
+ &scan.projection,
+ &post_scan_filters,
+ &source_schema,
+ )?;
+
+ // Check if we have inexact filters - if so, we can't push limit
+ let has_inexact =
supported.contains(&TableProviderFilterPushDown::Inexact);
+ let scan_limit = if has_inexact || !post_scan_filters.is_empty() {
+ None // Can't push limit when post-filtering is needed
+ } else {
+ scan.fetch
+ };
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(scan_projection.as_deref())
+ .with_filters(if pushable_filters.is_empty() {
+ None
+ } else {
+ Some(&pushable_filters)
+ })
+ .with_limit(scan_limit);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Create a DFSchema from the scan output for filter and projection
creation
+ // The scan output schema is the physical plan's schema
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+
+ // Wrap with FilterExec if needed
+ if !post_scan_filters.is_empty()
+ && let Some(filter_expr) = conjunction(post_scan_filters)
+ {
+ let physical_filter =
+ self.create_physical_expr(&filter_expr, &scan_df_schema,
session_state)?;
+ plan = Arc::new(FilterExecBuilder::new(physical_filter,
plan).build()?);
+ }
+
+ // Wrap with ProjectionExec if projection is present and differs from
scan output
+ // (either non-identity, or fewer columns due to filter-only columns)
+ if let Some(ref proj_exprs) = scan.projection {
+ let needs_projection = !self
+ .is_identity_column_projection(proj_exprs, &source_schema)
+ || scan_output_schema.fields().len() != proj_exprs.len();
+
+ if needs_projection {
+ // Unnormalize projection expressions to match the scan output
schema
+ let unnormalized_proj_exprs =
+ unnormalize_cols(proj_exprs.iter().cloned());
+ let physical_exprs: Vec<ProjectionExpr> =
unnormalized_proj_exprs
+ .iter()
+ .map(|e| {
+ let physical =
+ self.create_physical_expr(e, &scan_df_schema,
session_state)?;
+ // Use the original qualified name for the output alias
+ let name = e.schema_name().to_string();
+ Ok(ProjectionExpr {
+ expr: physical,
+ alias: name,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ plan = Arc::new(ProjectionExec::try_new(physical_exprs,
plan)?);
+ }
+ }
+
+ // Apply limit if it wasn't pushed to scan
+ if scan.fetch.is_some() && scan_limit.is_none() {
+ // Limit wasn't pushed, need to apply it here
+ // Note: This is typically handled by other optimizer rules
Review Comment:
Need to double check this. If handled by other optimizer rules why even have
this clause here? Maybe we should give the same treatment to limit (handle it
here instead of deferring to other optimizer rules)?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1968,9 +2162,16 @@ fn extract_dml_filters(input: &Arc<LogicalPlan>) ->
Result<Vec<Expr>> {
let mut filters = Vec::new();
input.apply(|node| {
- if let LogicalPlan::Filter(filter) = node {
- // Split AND predicates into individual expressions
-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ match node {
+ LogicalPlan::Filter(filter) => {
+ // Split AND predicates into individual expressions
+
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ }
Review Comment:
Maybe we can drop this since filters are effectively pushed into TableScan
now?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1671,6 +1654,217 @@ impl DefaultPhysicalPlanner {
Ok(exec_node)
}
+ /// Plan a TableScan node, handling filter pushdown classification and
+ /// wrapping with FilterExec/ProjectionExec as needed.
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ use datafusion_expr::TableProviderFilterPushDown;
+
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove all qualifiers from filters as the provider doesn't know
+ // (nor should care) how the relation was referred to in the query
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Separate volatile filters (they should never be pushed down)
+ let (volatile_filters, non_volatile_filters): (Vec<Expr>, Vec<Expr>) =
filters
+ .into_iter()
+ .partition(|pred: &Expr| pred.is_volatile());
+
+ // Classify filters using supports_filters_pushdown
+ let filter_refs: Vec<&Expr> = non_volatile_filters.iter().collect();
+ let supported = provider.supports_filters_pushdown(&filter_refs)?;
+
+ assert_eq_or_internal_err!(
+ non_volatile_filters.len(),
+ supported.len(),
+ "supports_filters_pushdown returned {} results for {} filters",
+ supported.len(),
+ non_volatile_filters.len()
+ );
+
+ // Separate filters into:
+ // - pushable_filters: Exact or Inexact filters to pass to the provider
+ // - post_scan_filters: Inexact, Unsupported, and volatile filters for
FilterExec
+ let mut pushable_filters = Vec::new();
+ let mut post_scan_filters = Vec::new();
+
+ for (filter, support) in
non_volatile_filters.into_iter().zip(supported.iter()) {
+ match support {
+ TableProviderFilterPushDown::Exact => {
+ pushable_filters.push(filter);
+ }
+ TableProviderFilterPushDown::Inexact => {
+ pushable_filters.push(filter.clone());
+ post_scan_filters.push(filter);
+ }
+ TableProviderFilterPushDown::Unsupported => {
+ post_scan_filters.push(filter);
+ }
+ }
+ }
+
+ // Add volatile filters to post_scan_filters
+ post_scan_filters.extend(volatile_filters);
+
+ // Compute required column indices for the scan
+ // We need columns from both projection expressions and post-scan
filters
+ let scan_projection = self.compute_scan_projection(
+ &scan.projection,
+ &post_scan_filters,
+ &source_schema,
+ )?;
+
+ // Check if we have inexact filters - if so, we can't push limit
+ let has_inexact =
supported.contains(&TableProviderFilterPushDown::Inexact);
+ let scan_limit = if has_inexact || !post_scan_filters.is_empty() {
+ None // Can't push limit when post-filtering is needed
+ } else {
+ scan.fetch
+ };
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(scan_projection.as_deref())
+ .with_filters(if pushable_filters.is_empty() {
+ None
+ } else {
+ Some(&pushable_filters)
+ })
+ .with_limit(scan_limit);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Create a DFSchema from the scan output for filter and projection
creation
+ // The scan output schema is the physical plan's schema
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+
+ // Wrap with FilterExec if needed
+ if !post_scan_filters.is_empty()
+ && let Some(filter_expr) = conjunction(post_scan_filters)
+ {
+ let physical_filter =
+ self.create_physical_expr(&filter_expr, &scan_df_schema,
session_state)?;
+ plan = Arc::new(FilterExecBuilder::new(physical_filter,
plan).build()?);
+ }
+
+ // Wrap with ProjectionExec if projection is present and differs from
scan output
+ // (either non-identity, or fewer columns due to filter-only columns)
Review Comment:
The idea for #19387 is that we might be able to push down trivial
expressions here, thus avoiding the need for any physical optimizer
changes/rules.
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
Review Comment:
Note: check that we are correctly handling functional deps in the builder.
Wonder how to handle that if the projection is expressions? Do we loose
functional dependencies? Maybe check how the LogicalPlan::Projection or
ProjectionExec handle this sort of thing.
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -4070,11 +4074,8 @@ async fn right_semi_with_alias_filter() -> Result<()> {
actual,
@r"
RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]
- Projection: t1.a [a:UInt32]
- Filter: t1.c > Int32(1) [a:UInt32, c:Int32]
- TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]
- Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]
- TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]
+ TableScan: t1 projection=[a], unsupported_filters=[t1.c > Int32(1)]
[a:UInt32]
Review Comment:
same note about `filters` vs `unsupported_filters` in the debug output. I
think just `filters=[...]` would be fine and I assume it's a simpler
implementation
##########
datafusion/sqllogictest/test_files/spark/collection/size.slt:
##########
@@ -129,4 +129,3 @@ SELECT size(column1) FROM VALUES (map(['a'], [1])),
(map(['a','b'], [1,2])), (NU
1
2
-1
-
Review Comment:
Need to go through diff and restore originals for slt files with whitespace
only changes.
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2807,12 +2831,199 @@ impl TableScan {
Ok(Self {
table_name,
source: table_source,
- projection,
+ projection: projection_exprs,
projected_schema,
filters,
fetch,
})
}
+
+ /// Returns the column indices if the projection is a simple column
projection.
+ ///
+ /// Returns `None` if the projection contains non-column expressions or
+ /// if columns cannot be resolved to indices in the source schema.
+ pub fn projection_indices(&self) -> Option<Vec<usize>> {
+ let projection = self.projection.as_ref()?;
+ let schema = self.source.schema();
+
+ let indices: Option<Vec<usize>> = projection
+ .iter()
+ .map(|expr| {
+ if let Expr::Column(col) = expr {
+ schema.index_of(col.name()).ok()
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ indices
+ }
+
+ /// Returns true if the projection is an identity projection (all columns
in order)
+ /// or if there is no projection.
+ pub fn is_identity_projection(&self) -> bool {
+ match &self.projection {
+ None => true,
+ Some(exprs) => {
+ let schema = self.source.schema();
+ if exprs.len() != schema.fields().len() {
+ return false;
+ }
+ exprs.iter().enumerate().all(|(i, expr)| {
+ if let Expr::Column(col) = expr {
+ schema.index_of(col.name()).ok() == Some(i)
+ } else {
+ false
+ }
+ })
+ }
+ }
+ }
+}
+
+/// Builder for constructing [`TableScan`] with expression-based projections.
+///
+/// This builder provides more flexibility than [`TableScan::try_new`] by
+/// allowing arbitrary expressions in the projection, not just column
references.
+///
+/// # Example
+/// ```ignore
+/// let scan = TableScanBuilder::new("my_table", table_source)
+/// .with_projection(vec![col("a"), col("b") + lit(1)])
+/// .with_filters(vec![col("c").gt(lit(10))])
+/// .build()?;
+/// ```
+#[derive(Clone)]
+pub struct TableScanBuilder {
+ table_name: TableReference,
+ table_source: Arc<dyn TableSource>,
+ projection: Option<Vec<Expr>>,
+ filters: Vec<Expr>,
+ fetch: Option<usize>,
+}
+
+impl TableScanBuilder {
+ /// Create a new TableScanBuilder with the given table name and source.
+ pub fn new(
+ table_name: impl Into<TableReference>,
+ table_source: Arc<dyn TableSource>,
+ ) -> Self {
+ Self {
+ table_name: table_name.into(),
+ table_source,
+ projection: None,
+ filters: vec![],
+ fetch: None,
+ }
+ }
+
+ /// Set the projection expressions.
+ ///
+ /// If `None`, all columns from the source are selected.
+ pub fn with_projection(mut self, projection: Option<Vec<Expr>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Set the filter expressions.
+ ///
+ /// These filters will be stored on the TableScan and the physical planner
+ /// will decide which filters to push down to the source based on
+ /// `supports_filters_pushdown`.
+ pub fn with_filters(mut self, filters: Vec<Expr>) -> Self {
+ self.filters = filters;
+ self
+ }
+
+ /// Set the fetch limit.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Build the TableScan.
+ ///
+ /// Computes the output schema based on the projection expressions.
+ pub fn build(self) -> Result<TableScan> {
+ if self.table_name.table().is_empty() {
+ return plan_err!("table_name cannot be empty");
+ }
+
+ let schema = self.table_source.schema();
+ let func_dependencies = FunctionalDependencies::new_from_constraints(
+ self.table_source.constraints(),
+ schema.fields.len(),
+ );
+
+ // Build the projected schema from projection expressions
+ let projected_schema = match &self.projection {
+ Some(exprs) => {
+ // Create a qualified schema for expression evaluation
+ let qualified_schema = DFSchema::try_from_qualified_schema(
+ self.table_name.clone(),
+ &schema,
+ )?;
+
+ // Derive output fields from projection expressions
+ let fields: Vec<(Option<TableReference>, FieldRef)> = exprs
+ .iter()
+ .map(|expr| {
+ let (qualifier, field) =
expr.to_field(&qualified_schema)?;
+ Ok((qualifier.clone(), field))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Try to compute functional dependencies for simple column
projections
+ let projected_func_dependencies = if let Some(indices) =
+ self.projection_indices_internal(exprs, &schema)
+ {
+ func_dependencies
+ .project_functional_dependencies(&indices,
indices.len())
+ } else {
+ FunctionalDependencies::empty()
Review Comment:
Confirming from above: if the projection is anything but simple column
projections we loose functional dependencies. Is this okay? Is this what would
have happened anyway if we had `Projection -> TableScan`? Does this have a
negative impact on e.g. filter tricks we can do if we know dependencies between
columns?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -4284,6 +4493,10 @@ digraph {
}
#[tokio::test]
+ // With the new physical planner that adds ProjectionExec to handle schema
+ // mismatches, this now proceeds past the schema check and hits the
+ // unimplemented NoOpExecutionPlan.
+ #[should_panic(expected = "NoOpExecutionPlan")]
Review Comment:
Should consider rewriting / improving this test
##########
datafusion/core/tests/sql/explain_analyze.rs:
##########
@@ -438,23 +434,15 @@ async fn csv_explain_plans() {
{
graph[label="LogicalPlan"]
2[shape=box label="Explain"]
- 3[shape=box label="Projection: aggregate_test_100.c1"]
+ 3[shape=box label="TableScan: aggregate_test_100 projection=[c1],
partial_filters=[aggregate_test_100.c2 > Int8(10)]"]
Review Comment:
same note about `partial_filters` vs. just `filters`
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -520,15 +520,20 @@ impl LogicalPlanBuilder {
{
let sub_plan = p.into_owned();
- if let Some(proj) = table_scan.projection {
- let projection_exprs = proj
+ if let Some(proj_exprs) = table_scan.projection {
+ // Strip qualifiers from projection expressions since the
sub_plan
+ // may have unqualified column names
+ let unqualified_exprs: Vec<Expr> = proj_exprs
Review Comment:
This matches what we do with filters. When projections get pushed down into
a scan there is no point in having qualifiers on them.
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -3137,18 +3141,18 @@ async fn test_count_wildcard_on_where_exist() ->
Result<()> {
assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
@r"
- +---------------+-----------------------------------------------------+
- | plan_type | plan |
- +---------------+-----------------------------------------------------+
- | logical_plan | LeftSemi Join: |
- | | TableScan: t1 projection=[a, b] |
- | | SubqueryAlias: __correlated_sq_1 |
- | | EmptyRelation: rows=1 |
- | physical_plan | NestedLoopJoinExec: join_type=RightSemi |
- | | PlaceholderRowExec |
- | | DataSourceExec: partitions=1, partition_sizes=[1] |
- | | |
- +---------------+-----------------------------------------------------+
+
+---------------+------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+------------------------------------------------------------------------+
+ | logical_plan | LeftSemi Join:
|
+ | | TableScan: t1 projection=[a, b],
unsupported_filters=[Boolean(true)] |
Review Comment:
Hmm a bit weird of debug output:
1. Do we want `unsupported_filters` or just `filters`? Does the formatter
call `TableProvider::supports_filter_pushdown` just to include debug output?
2. Why is there a `true` filter? It will be optimized away at some point
(probably in logical -> physical conversion). At the very least we should check
if the test is adding a pointless filter (and consider updating the test unless
it is actually part of what's being tested) or otherwise why we are ending up
with `where true`.
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2807,12 +2831,199 @@ impl TableScan {
Ok(Self {
table_name,
source: table_source,
- projection,
+ projection: projection_exprs,
projected_schema,
filters,
fetch,
})
}
+
+ /// Returns the column indices if the projection is a simple column
projection.
+ ///
+ /// Returns `None` if the projection contains non-column expressions or
+ /// if columns cannot be resolved to indices in the source schema.
+ pub fn projection_indices(&self) -> Option<Vec<usize>> {
+ let projection = self.projection.as_ref()?;
+ let schema = self.source.schema();
+
+ let indices: Option<Vec<usize>> = projection
+ .iter()
+ .map(|expr| {
+ if let Expr::Column(col) = expr {
+ schema.index_of(col.name()).ok()
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ indices
+ }
+
+ /// Returns true if the projection is an identity projection (all columns
in order)
+ /// or if there is no projection.
+ pub fn is_identity_projection(&self) -> bool {
+ match &self.projection {
+ None => true,
+ Some(exprs) => {
+ let schema = self.source.schema();
+ if exprs.len() != schema.fields().len() {
+ return false;
+ }
+ exprs.iter().enumerate().all(|(i, expr)| {
+ if let Expr::Column(col) = expr {
+ schema.index_of(col.name()).ok() == Some(i)
+ } else {
+ false
+ }
+ })
+ }
+ }
+ }
+}
+
+/// Builder for constructing [`TableScan`] with expression-based projections.
+///
+/// This builder provides more flexibility than [`TableScan::try_new`] by
+/// allowing arbitrary expressions in the projection, not just column
references.
+///
+/// # Example
+/// ```ignore
+/// let scan = TableScanBuilder::new("my_table", table_source)
+/// .with_projection(vec![col("a"), col("b") + lit(1)])
+/// .with_filters(vec![col("c").gt(lit(10))])
+/// .build()?;
+/// ```
+#[derive(Clone)]
+pub struct TableScanBuilder {
+ table_name: TableReference,
+ table_source: Arc<dyn TableSource>,
+ projection: Option<Vec<Expr>>,
+ filters: Vec<Expr>,
+ fetch: Option<usize>,
+}
+
+impl TableScanBuilder {
+ /// Create a new TableScanBuilder with the given table name and source.
+ pub fn new(
+ table_name: impl Into<TableReference>,
+ table_source: Arc<dyn TableSource>,
+ ) -> Self {
+ Self {
+ table_name: table_name.into(),
+ table_source,
+ projection: None,
+ filters: vec![],
+ fetch: None,
+ }
+ }
+
+ /// Set the projection expressions.
+ ///
+ /// If `None`, all columns from the source are selected.
+ pub fn with_projection(mut self, projection: Option<Vec<Expr>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Set the filter expressions.
+ ///
+ /// These filters will be stored on the TableScan and the physical planner
+ /// will decide which filters to push down to the source based on
+ /// `supports_filters_pushdown`.
+ pub fn with_filters(mut self, filters: Vec<Expr>) -> Self {
+ self.filters = filters;
+ self
+ }
+
+ /// Set the fetch limit.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Build the TableScan.
+ ///
+ /// Computes the output schema based on the projection expressions.
+ pub fn build(self) -> Result<TableScan> {
+ if self.table_name.table().is_empty() {
+ return plan_err!("table_name cannot be empty");
+ }
+
+ let schema = self.table_source.schema();
+ let func_dependencies = FunctionalDependencies::new_from_constraints(
+ self.table_source.constraints(),
+ schema.fields.len(),
+ );
+
+ // Build the projected schema from projection expressions
+ let projected_schema = match &self.projection {
+ Some(exprs) => {
+ // Create a qualified schema for expression evaluation
+ let qualified_schema = DFSchema::try_from_qualified_schema(
+ self.table_name.clone(),
+ &schema,
+ )?;
+
+ // Derive output fields from projection expressions
+ let fields: Vec<(Option<TableReference>, FieldRef)> = exprs
+ .iter()
+ .map(|expr| {
+ let (qualifier, field) =
expr.to_field(&qualified_schema)?;
+ Ok((qualifier.clone(), field))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Try to compute functional dependencies for simple column
projections
+ let projected_func_dependencies = if let Some(indices) =
+ self.projection_indices_internal(exprs, &schema)
+ {
+ func_dependencies
+ .project_functional_dependencies(&indices,
indices.len())
+ } else {
+ FunctionalDependencies::empty()
+ };
+
+ let df_schema =
+ DFSchema::new_with_metadata(fields,
schema.metadata.clone())?;
+
df_schema.with_functional_dependencies(projected_func_dependencies)?
+ }
+ None => {
+ let df_schema = DFSchema::try_from_qualified_schema(
+ self.table_name.clone(),
+ &schema,
+ )?;
+ df_schema.with_functional_dependencies(func_dependencies)?
+ }
+ };
+
+ Ok(TableScan {
+ table_name: self.table_name,
+ source: self.table_source,
+ projection: self.projection,
+ projected_schema: Arc::new(projected_schema),
+ filters: self.filters,
+ fetch: self.fetch,
+ })
+ }
+
+ /// Helper to get column indices from projection expressions
+ fn projection_indices_internal(
Review Comment:
This duplicates `TableScan::projection_indices`
Maybe we should just make this helper public in `utils.rs` and remove the
method from `TableScan`?
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2807,12 +2831,199 @@ impl TableScan {
Ok(Self {
table_name,
source: table_source,
- projection,
+ projection: projection_exprs,
projected_schema,
filters,
fetch,
})
}
+
+ /// Returns the column indices if the projection is a simple column
projection.
+ ///
+ /// Returns `None` if the projection contains non-column expressions or
+ /// if columns cannot be resolved to indices in the source schema.
+ pub fn projection_indices(&self) -> Option<Vec<usize>> {
+ let projection = self.projection.as_ref()?;
+ let schema = self.source.schema();
+
+ let indices: Option<Vec<usize>> = projection
+ .iter()
+ .map(|expr| {
+ if let Expr::Column(col) = expr {
+ schema.index_of(col.name()).ok()
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ indices
+ }
+
+ /// Returns true if the projection is an identity projection (all columns
in order)
+ /// or if there is no projection.
+ pub fn is_identity_projection(&self) -> bool {
Review Comment:
Unused function. Should remove.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]