kosiew commented on code in PR #19884:
URL: https://github.com/apache/datafusion/pull/19884#discussion_r2716262153
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1907,24 +1907,125 @@ fn get_physical_expr_pair(
}
/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
-/// Walks the logical plan tree and collects Filter predicates,
-/// splitting AND conjunctions into individual expressions.
-/// Column qualifiers are stripped so expressions can be evaluated against
-/// the TableProvider's schema.
///
-fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
+/// Walks the logical plan tree and collects Filter predicates and any filters
+/// pushed down into TableScan nodes, splitting AND conjunctions into
individual expressions.
+///
+/// For UPDATE...FROM queries involving multiple tables, this function only
extracts predicates
+/// that reference the target table. Filters from source table scans are
excluded to prevent
+/// incorrect filter semantics.
+///
+/// Column qualifiers are stripped so expressions can be evaluated against the
TableProvider's
+/// schema. Deduplication is performed because filters may appear in both
Filter nodes and
+/// TableScan.filters when the optimizer performs partial (Inexact) filter
pushdown.
+///
+/// # Parameters
+/// - `input`: The logical plan tree to extract filters from (typically a
DELETE or UPDATE plan)
+/// - `target`: The target table reference to scope filter extraction
(prevents multi-table filter leakage)
+///
+/// # Returns
+/// A vector of unqualified filter expressions that can be passed to the
TableProvider for execution.
+/// Returns an empty vector if no applicable filters are found.
+///
+fn extract_dml_filters(
+ input: &Arc<LogicalPlan>,
+ target: &TableReference,
+) -> Result<Vec<Expr>> {
let mut filters = Vec::new();
input.apply(|node| {
- if let LogicalPlan::Filter(filter) = node {
- // Split AND predicates into individual expressions
-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+ match node {
+ LogicalPlan::Filter(filter) => {
+ // Split AND predicates into individual expressions
+ for predicate in split_conjunction(&filter.predicate) {
+ if predicate_is_on_target(predicate, target)? {
+ filters.push(predicate.clone());
+ }
+ }
+ }
+ LogicalPlan::TableScan(TableScan {
+ table_name,
+ filters: scan_filters,
+ ..
+ }) => {
+ // Only extract filters from the target table scan.
+ // This prevents incorrect filter extraction in UPDATE...FROM
scenarios
+ // where multiple table scans may have filters.
+ if table_name.resolved_eq(target) {
+ for filter in scan_filters {
+
filters.extend(split_conjunction(filter).into_iter().cloned());
+ }
+ }
+ }
+ // Plans without filter information
+ LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Values(_)
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::Explain(_)
+ | LogicalPlan::Analyze(_)
+ | LogicalPlan::Distinct(_)
+ | LogicalPlan::Extension(_)
+ | LogicalPlan::Statement(_)
+ | LogicalPlan::Dml(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Copy(_)
+ | LogicalPlan::Unnest(_)
+ | LogicalPlan::RecursiveQuery(_) => {
+ // No filters to extract from leaf/meta plans
+ }
+ // Plans with inputs (may contain filters in children)
+ LogicalPlan::Projection(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Limit(_)
+ | LogicalPlan::Sort(_)
+ | LogicalPlan::Union(_)
+ | LogicalPlan::Join(_)
+ | LogicalPlan::Repartition(_)
+ | LogicalPlan::Aggregate(_)
+ | LogicalPlan::Window(_)
+ | LogicalPlan::Subquery(_) => {
+ // Filter information may appear in child nodes; continue
traversal
+ // to extract filters from Filter/TableScan nodes deeper in
the plan
+ }
}
Ok(TreeNodeRecursion::Continue)
})?;
- // Strip table qualifiers from column references
- filters.into_iter().map(strip_column_qualifiers).collect()
+ // Strip qualifiers and deduplicate. This ensures:
+ // 1. Only target-table predicates are retained from Filter nodes
+ // 2. Qualifiers stripped for TableProvider compatibility
+ // 3. Duplicates removed (from Filter nodes + TableScan.filters)
+ //
+ // Deduplication is necessary because filters may appear in both Filter
nodes
+ // and TableScan.filters when the optimizer performs partial (Inexact)
pushdown.
+ let mut seen_filters = HashSet::new();
+ filters
+ .into_iter()
+ .try_fold(Vec::new(), |mut deduped, filter| {
+ let unqualified = strip_column_qualifiers(filter).map_err(|e| {
+ e.context(format!(
+ "Failed to strip column qualifiers for DML filter on table
'{target}'"
+ ))
+ })?;
+ if seen_filters.insert(unqualified.clone()) {
+ deduped.push(unqualified);
+ }
+ Ok(deduped)
+ })
+}
+
+/// Determine whether a predicate references only columns from the target
table.
+fn predicate_is_on_target(expr: &Expr, target: &TableReference) ->
Result<bool> {
Review Comment:
Failing close causes these slt tests to fail:
https://github.com/apache/datafusion/blob/d90d0746d64bf6e91a81b3ec6954369bd0851bb2/datafusion/sqllogictest/test_files/update.slt#L70-L105
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]