cloud-fan commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3049602382


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -197,7 +206,8 @@ case class DataSourceV2ScanRelation(
       ),
       ordering = ordering.map(
         _.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, 
output)))
-      )
+      ),
+      pushedFilters = pushedFilters.map(QueryPlan.normalizeExpressions(_, 
output))

Review Comment:
   The comment in `pruneColumns` (line 809-814 of V2ScanRelationPushDown.scala) 
says stale references are "acceptable while pushedFilters is informational 
only." However, this line normalizes `pushedFilters` against `output` via 
`normalizeExpressions`, which returns ordinal -1 for attributes not in the 
output, leaving the original `exprId` intact. Two equivalent plans would have 
different `exprId`s in their canonical form, breaking `canonicalized ==` 
comparison (used by subquery dedup, plan caching, etc.).
   
   Consider filtering out pushed filters with stale references in 
`pruneColumns`:
   ```scala
   val remappedPushedFilters = 
sHolder.pushedFilterExpressions.map(projectionFunc)
     .filter(_.references.subsetOf(AttributeSet(output)))
   ```
   This ensures only filters with valid references participate in 
canonicalization, and is consistent with the "drop filters with stale 
references" option already mentioned in the comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -95,6 +95,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper {
 
       val postScanFilters = postScanFiltersWithoutSubquery ++ 
normalizedFiltersWithSubquery
 
+      // Compute the pushed filter expressions: the normalized filters that 
were fully pushed
+      // down (i.e., not in postScanFilters). These are stored on the scan 
relation for
+      // potential future use in constraint propagation.

Review Comment:
   `ExpressionSet.contains` only checks its internal `baseSet`, which excludes 
non-deterministic expressions (they go to `originals` only — see 
`ExpressionSet.add` at ExpressionSet.scala:97-103). So if a non-deterministic 
filter like `rand() > 0.5` is untranslatable and ends up in 
`postScanFiltersWithoutSubquery`, `postScanFilterSet.contains` will return 
`false` for it, and it will be incorrectly included in 
`pushedFilterExpressions`.
   
   Trace: `SELECT * FROM t WHERE i > 3 AND rand() > 0.5`:
   1. `normalizedFiltersWithoutSubquery` = `[i > 3, rand() > 0.5]`
   2. `PushDownUtils.pushFilters` can't translate `rand() > 0.5` → 
`untranslatableExprs` → `postScanFiltersWithoutSubquery`
   3. `ExpressionSet(postScanFiltersWithoutSubquery)` → `rand() > 0.5` goes to 
`originals` only
   4. `.filterNot(postScanFilterSet.contains)` → `contains(rand() > 0.5)` → 
`false` → NOT filtered
   5. Result: `pushedFilterExpressions` incorrectly includes `rand() > 0.5`
   
   Consider filtering the result:
   ```suggestion
         val postScanFilterSet = ExpressionSet(postScanFiltersWithoutSubquery)
         sHolder.pushedFilterExpressions =
           
normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to