Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-09 Thread via GitHub


cloud-fan closed pull request #55252: [SPARK-56385][SQL] Track pushed filter 
expressions on DataSourceV2ScanRelation
URL: https://github.com/apache/spark/pull/55252


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-09 Thread via GitHub


cloud-fan commented on PR #55252:
URL: https://github.com/apache/spark/pull/55252#issuecomment-4219500690

   the job failure looks flaky, thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-09 Thread via GitHub


yyanyy commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3060747839


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -157,13 +157,22 @@ case class DataSourceV2Relation(
  * @param keyGroupedPartitioning if set, the partitioning expressions that are 
used to split the
  *   rows in the scan across different partitions
  * @param ordering if set, the ordering provided by the scan
+ * @param pushedFilters Catalyst expressions for filters that were fully 
pushed to the data
+ *  source and do not appear as post-scan filters
  */
 case class DataSourceV2ScanRelation(
 relation: DataSourceV2Relation,
 scan: Scan,
 output: Seq[AttributeReference],
 keyGroupedPartitioning: Option[Seq[Expression]] = None,
-ordering: Option[Seq[SortOrder]] = None) extends LeafNode with 
NamedRelation {
+ordering: Option[Seq[SortOrder]] = None,
+pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with 
NamedRelation {
+
+  // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so 
that pushed

Review Comment:
   hmm I think in the long term it's still the right thing to do, just that we 
don't want to do this very soon, so from the standpoint of completeness I think 
there's still value in having this comment? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-09 Thread via GitHub


anton5798 commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3057742783


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -157,13 +157,22 @@ case class DataSourceV2Relation(
  * @param keyGroupedPartitioning if set, the partitioning expressions that are 
used to split the
  *   rows in the scan across different partitions
  * @param ordering if set, the ordering provided by the scan
+ * @param pushedFilters Catalyst expressions for filters that were fully 
pushed to the data
+ *  source and do not appear as post-scan filters
  */
 case class DataSourceV2ScanRelation(
 relation: DataSourceV2Relation,
 scan: Scan,
 output: Seq[AttributeReference],
 keyGroupedPartitioning: Option[Seq[Expression]] = None,
-ordering: Option[Seq[SortOrder]] = None) extends LeafNode with 
NamedRelation {
+ordering: Option[Seq[SortOrder]] = None,
+pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with 
NamedRelation {
+
+  // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so 
that pushed

Review Comment:
   As discussed offline, let's remove this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-08 Thread via GitHub


yyanyy commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3054403087


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -787,14 +800,22 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 
   val wrappedScan = getWrappedScan(scan, sHolder)
 
-  val scanRelation = DataSourceV2ScanRelation(sHolder.relation, 
wrappedScan, output)
-
   val projectionOverSchema =
 ProjectionOverSchema(output.toStructType, AttributeSet(output))
   val projectionFunc = (expr: Expression) => expr transformDown {
 case projectionOverSchema(newExpr) => newExpr
   }
 
+  // Remap pushed filter attributes to the pruned output schema. Note: if 
a pushed filter

Review Comment:
   It actually could happen since in V2 a fully pushed filter can be removed 
from the plan and no longer be referenced, triggering column pruning to drop 
it. But Wenchen also pointed a similar issue later, that I'll share some more 
details in that comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-08 Thread via GitHub


yyanyy commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3054397422


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -157,13 +157,22 @@ case class DataSourceV2Relation(
  * @param keyGroupedPartitioning if set, the partitioning expressions that are 
used to split the
  *   rows in the scan across different partitions
  * @param ordering if set, the ordering provided by the scan
+ * @param pushedFilters Catalyst expressions for filters that were fully 
pushed to the data
+ *  source and do not appear as post-scan filters
  */
 case class DataSourceV2ScanRelation(
 relation: DataSourceV2Relation,
 scan: Scan,
 output: Seq[AttributeReference],
 keyGroupedPartitioning: Option[Seq[Expression]] = None,
-ordering: Option[Seq[SortOrder]] = None) extends LeafNode with 
NamedRelation {
+ordering: Option[Seq[SortOrder]] = None,
+pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with 
NamedRelation {
+
+  // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so 
that pushed

Review Comment:
   Thank you both for the review! I'd like to defer that in a later PR to avoid 
blocking this one for too long for adjustments for golden files and others that 
might take quite some investigation time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-08 Thread via GitHub


yyanyy commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3054405090


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -698,6 +705,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
   assert(realOutput.length == holder.output.length,
 "The data source returns unexpected number of columns")
   val wrappedScan = getWrappedScan(scan, holder)
+  // Note: holder.pushedFilterExpressions is not propagated here because 
the output schema

Review Comment:
   In theory it's not, but the aggregate path replaces the output schema 
entirely (table columns -> aggregate columns), so the original filter 
expressions can't be remapped to the new output. So I'd prefer to defer this 
and revisit if there's a concrete use case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-08 Thread via GitHub


yyanyy commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3054406427


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -95,6 +95,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper {
 
   val postScanFilters = postScanFiltersWithoutSubquery ++ 
normalizedFiltersWithSubquery
 
+  // Compute the pushed filter expressions: the normalized filters that 
were fully pushed
+  // down (i.e., not in postScanFilters). These are stored on the scan 
relation for
+  // potential future use in constraint propagation.

Review Comment:
   thanks for the info, TIL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-08 Thread via GitHub


yyanyy commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3054447715


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -197,7 +206,8 @@ case class DataSourceV2ScanRelation(
   ),
   ordering = ordering.map(
 _.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, 
output)))
-  )
+  ),
+  pushedFilters = pushedFilters.map(QueryPlan.normalizeExpressions(_, 
output))

Review Comment:
   I debated with myself a bit on this and decided to not include pushed 
filters whose references are no longer in the pruned output.
   
   The reason for me to include them before, is that I was a bit worried that a 
fully pushed filter can be lost from `pushedFilter` when the filtered column is 
not projected:
   
   > `SELECT j FROM t WHERE i > 3` with a connector that pushes both 
GreaterThan and IsNotNull:
   
   1. After analysis: `Project([j], Filter(i > 3, DataSourceV2Relation([i, 
j])))`
   2. InferFiltersFromConstraints (runs before V2ScanRelationPushDown): derives 
`IsNotNull(i) → Project([j], Filter(i > 3 AND IsNotNull(i), 
DataSourceV2Relation([i, j])))`
   3. pushDownFilters: both `i > 3` and `IsNotNull(i)` fully pushed → Filter 
node removed → `Project([j], ScanBuilderHolder), pushedFilterExpressions = [i > 
3]`
   4. pruneColumns: only j is referenced → `output = [j]`, column `i` pruned → 
pushed filter `i > 3` references pruned `i` → dropped
   
   And I was a bit worried since part of the intention for this field was to 
keep what guarantee the connector can make, and now we couldn't have the 
complete information.
   
   But for now, I think this is acceptable since 1/ pushedFilters is 
informational only; a potential future improvement could be to keep fully 
pushed filters as post-scan Filter nodes, which would prevent the column from 
being pruned in the first place, and 2/ it appears that spark plan validator 
will also catch and reject this case when it realizes that some references in 
expressions is not resolvable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-07 Thread via GitHub


cloud-fan commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3049621837


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -157,13 +157,22 @@ case class DataSourceV2Relation(
  * @param keyGroupedPartitioning if set, the partitioning expressions that are 
used to split the
  *   rows in the scan across different partitions
  * @param ordering if set, the ordering provided by the scan
+ * @param pushedFilters Catalyst expressions for filters that were fully 
pushed to the data
+ *  source and do not appear as post-scan filters
  */
 case class DataSourceV2ScanRelation(
 relation: DataSourceV2Relation,
 scan: Scan,
 output: Seq[AttributeReference],
 keyGroupedPartitioning: Option[Seq[Expression]] = None,
-ordering: Option[Seq[SortOrder]] = None) extends LeafNode with 
NamedRelation {
+ordering: Option[Seq[SortOrder]] = None,
+pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with 
NamedRelation {
+
+  // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so 
that pushed

Review Comment:
   I don't have a strong opinion. We can do it in this PR and update golden 
files if needed, or defer it to a followup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-07 Thread via GitHub


cloud-fan commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3049602382


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -197,7 +206,8 @@ case class DataSourceV2ScanRelation(
   ),
   ordering = ordering.map(
 _.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, 
output)))
-  )
+  ),
+  pushedFilters = pushedFilters.map(QueryPlan.normalizeExpressions(_, 
output))

Review Comment:
   The comment in `pruneColumns` (line 809-814 of V2ScanRelationPushDown.scala) 
says stale references are "acceptable while pushedFilters is informational 
only." However, this line normalizes `pushedFilters` against `output` via 
`normalizeExpressions`, which returns ordinal -1 for attributes not in the 
output, leaving the original `exprId` intact. Two equivalent plans would have 
different `exprId`s in their canonical form, breaking `canonicalized ==` 
comparison (used by subquery dedup, plan caching, etc.).
   
   Consider filtering out pushed filters with stale references in 
`pruneColumns`:
   ```scala
   val remappedPushedFilters = 
sHolder.pushedFilterExpressions.map(projectionFunc)
 .filter(_.references.subsetOf(AttributeSet(output)))
   ```
   This ensures only filters with valid references participate in 
canonicalization, and is consistent with the "drop filters with stale 
references" option already mentioned in the comment.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -95,6 +95,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper {
 
   val postScanFilters = postScanFiltersWithoutSubquery ++ 
normalizedFiltersWithSubquery
 
+  // Compute the pushed filter expressions: the normalized filters that 
were fully pushed
+  // down (i.e., not in postScanFilters). These are stored on the scan 
relation for
+  // potential future use in constraint propagation.

Review Comment:
   `ExpressionSet.contains` only checks its internal `baseSet`, which excludes 
non-deterministic expressions (they go to `originals` only — see 
`ExpressionSet.add` at ExpressionSet.scala:97-103). So if a non-deterministic 
filter like `rand() > 0.5` is untranslatable and ends up in 
`postScanFiltersWithoutSubquery`, `postScanFilterSet.contains` will return 
`false` for it, and it will be incorrectly included in 
`pushedFilterExpressions`.
   
   Trace: `SELECT * FROM t WHERE i > 3 AND rand() > 0.5`:
   1. `normalizedFiltersWithoutSubquery` = `[i > 3, rand() > 0.5]`
   2. `PushDownUtils.pushFilters` can't translate `rand() > 0.5` → 
`untranslatableExprs` → `postScanFiltersWithoutSubquery`
   3. `ExpressionSet(postScanFiltersWithoutSubquery)` → `rand() > 0.5` goes to 
`originals` only
   4. `.filterNot(postScanFilterSet.contains)` → `contains(rand() > 0.5)` → 
`false` → NOT filtered
   5. Result: `pushedFilterExpressions` incorrectly includes `rand() > 0.5`
   
   Consider filtering the result:
   ```suggestion
 val postScanFilterSet = ExpressionSet(postScanFiltersWithoutSubquery)
 sHolder.pushedFilterExpressions =
   
normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-07 Thread via GitHub


aokolnychyi commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3048973714


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -698,6 +705,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
   assert(realOutput.length == holder.output.length,
 "The data source returns unexpected number of columns")
   val wrappedScan = getWrappedScan(scan, holder)
+  // Note: holder.pushedFilterExpressions is not propagated here because 
the output schema

Review Comment:
   Is it hard to fix? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-07 Thread via GitHub


aokolnychyi commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3048972978


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -787,14 +800,22 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 
   val wrappedScan = getWrappedScan(scan, sHolder)
 
-  val scanRelation = DataSourceV2ScanRelation(sHolder.relation, 
wrappedScan, output)
-
   val projectionOverSchema =
 ProjectionOverSchema(output.toStructType, AttributeSet(output))
   val projectionFunc = (expr: Expression) => expr transformDown {
 case projectionOverSchema(newExpr) => newExpr
   }
 
+  // Remap pushed filter attributes to the pruned output schema. Note: if 
a pushed filter

Review Comment:
   I am not sure I follow. How can we drop the column if a filter references 
it? That seems like a violation of the contract?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation [spark]

2026-04-07 Thread via GitHub


aokolnychyi commented on code in PR #55252:
URL: https://github.com/apache/spark/pull/55252#discussion_r3048968128


##
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##
@@ -157,13 +157,22 @@ case class DataSourceV2Relation(
  * @param keyGroupedPartitioning if set, the partitioning expressions that are 
used to split the
  *   rows in the scan across different partitions
  * @param ordering if set, the ordering provided by the scan
+ * @param pushedFilters Catalyst expressions for filters that were fully 
pushed to the data
+ *  source and do not appear as post-scan filters
  */
 case class DataSourceV2ScanRelation(
 relation: DataSourceV2Relation,
 scan: Scan,
 output: Seq[AttributeReference],
 keyGroupedPartitioning: Option[Seq[Expression]] = None,
-ordering: Option[Seq[SortOrder]] = None) extends LeafNode with 
NamedRelation {
+ordering: Option[Seq[SortOrder]] = None,
+pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with 
NamedRelation {
+
+  // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so 
that pushed

Review Comment:
   @cloud-fan, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]