askalt commented on code in PR #20003:
URL: https://github.com/apache/datafusion/pull/20003#discussion_r2731044365
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1154,34 +1160,109 @@ impl OptimizerRule for PushDownFilter {
.map(|(pred, _)| pred);
// Add new scan filters
- let new_scan_filters: Vec<Expr> = scan
- .filters
- .iter()
- .chain(new_scan_filters)
- .unique()
- .cloned()
- .collect();
+ let new_scan_filters: Vec<Expr> =
+ new_scan_filters.unique().cloned().collect();
+
+ let source_schema = scan.source.schema();
+ let mut additional_projection = HashSet::new();
// Compose predicates to be of `Unsupported` or `Inexact`
pushdown type, and also include volatile filters
let new_predicate: Vec<Expr> = zip
- .filter(|(_, res)| res !=
&TableProviderFilterPushDown::Exact)
+ .filter(|(expr, res)| {
+ if *res == TableProviderFilterPushDown::Exact {
+ return false;
+ }
+ // For each not exactly supported filter we must
ensure that all columns are projected,
+ // so we collect all columns which are not currently
projected.
+ expr.apply(|expr| {
+ if let Expr::Column(column) = expr
+ && let Ok(idx) =
source_schema.index_of(column.name())
+ && scan
+ .projection
+ .as_ref()
+ .is_some_and(|p| !p.contains(&idx))
+ {
+ additional_projection.insert(idx);
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .unwrap();
+ true
+ })
.map(|(pred, _)| pred)
.chain(volatile_filters)
.cloned()
.collect();
- let new_scan = LogicalPlan::TableScan(TableScan {
- filters: new_scan_filters,
- ..scan
- });
-
- Transformed::yes(new_scan).transform_data(|new_scan| {
- if let Some(predicate) = conjunction(new_predicate) {
- make_filter(predicate,
Arc::new(new_scan)).map(Transformed::yes)
+ // Wraps with a filter if some filters are not supported
exactly.
+ let filtered = move |plan| {
+ if let Some(new_predicate) = conjunction(new_predicate) {
+ Filter::try_new(new_predicate, Arc::new(plan))
+ .map(LogicalPlan::Filter)
} else {
- Ok(Transformed::no(new_scan))
+ Ok(plan)
}
- })
+ };
+
+ if additional_projection.is_empty() {
+ // No additional projection is required.
+ let new_scan = LogicalPlan::TableScan(TableScan {
+ filters: new_scan_filters,
+ ..scan
+ });
+ return filtered(new_scan).map(Transformed::yes);
+ }
+
+ let scan_table_name = &scan.table_name;
+ let new_scan = filtered(
+ LogicalPlanBuilder::scan_with_filters_fetch(
+ scan_table_name.clone(),
+ Arc::clone(&scan.source),
+ scan.projection.clone().map(|mut projection| {
+ // Extend a projection.
+ projection.extend(additional_projection);
Review Comment:
It seems for me that the change only touches correctness of the rule (not
performance), here we must extend projection (because filter is explicitly not
supported by the provider).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]