LuciferYang opened a new pull request, #55579:
URL: https://github.com/apache/spark/pull/55579

   ### What changes were proposed in this pull request?
   
   Extends Dynamic Partition Pruning (DPP) and scalar-subquery partition 
pruning to V2 file sources (Parquet, ORC, CSV, JSON, Avro, Text). Before this 
change, V2 file scans were not recognized as DPP-eligible by 
`PartitionPruning.getFilterableTableScan` because they do not implement 
`SupportsRuntimeV2Filtering`, and scalar-subquery filters on partition columns 
were left as post-scan `FilterExec` without any file-level pruning.
   
   Implementation summary (4 files modified, ~50 lines):
   
   - `PartitionPruning.scala`: extend `getFilterableTableScan` with a case 
matching `DataSourceV2ScanRelation(scan: FileScan)`, resolving partition 
columns from `FileScan.readPartitionSchema`. The DPP rule then inserts the 
existing `DynamicPruningSubquery`; `PlanDynamicPruningFilters` and 
`CleanupDynamicPruningFilters` flow unchanged.
   - `FileScan.scala`: factor partition listing into 
`partitionsImpl(allPartitionFilters)`. Add public 
`planInputPartitionsWithRuntimeFilters(extraFilters)` that re-lists files with 
`partitionFilters ++ extraFilters`. **Immutable** — no mutable state added on 
the scan.
   - `BatchScanExec.scala`: `filteredPartitions` special-cases `FileScan` with 
non-empty `runtimeFilters` by calling the new method. Non-file scans continue 
to use `PushDownUtils.pushRuntimeFilters` as before, **preserving the 
`SupportsRuntimeV2Filtering` iterative-pushdown contract for upstream 
connectors (Iceberg, Delta).**
   - `DataSourceV2Strategy.scala`: lift scalar-subquery filters whose 
references are a subset of partition columns into 
`BatchScanExec.runtimeFilters` via an `effectiveRuntimeFilterAttrs` 
computation. Filters stay in `postScanFilters` for correctness; runtime layer 
applies them for pruning. **Scalar subqueries are NOT wrapped in 
`DynamicPruningExpression`** — that type remains reserved for join-derived 
pruning, so the runtime-filter classifier in 
`DataSourceV2EnhancedRuntimePartitionFilterSuite` stays correct.
   
   This addresses the same gap as the previously-stale [PR 
#37514](https://github.com/apache/spark/pull/37514) but with three deliberate 
design improvements:
   1. Immutable runtime-filter slot (no mutating `var` on `FileScan`).
   2. No new public trait — `PushDownUtils` ordering and the 
`SupportsRuntimeV2Filtering` contract for non-file V2 sources are untouched.
   3. Scalar-subquery routing without `DynamicPruningExpression` wrapping 
(preserves the test-classifier semantics in the SPARK-56521 / SPARK-55596 work).
   
   ### Why are the changes needed?
   
   V2 file sources are widely used (especially V2 Parquet), but they currently 
miss two important optimizations that V1 file sources have via 
`HadoopFsRelation`:
   
   1. **DPP**: a star-schema query like `SELECT * FROM fact JOIN dim ON 
fact.part = dim.id WHERE dim.x = 'k'` reads all partitions of `fact` even when 
`fact.part` matches only a few values from the filtered `dim`.
   
   2. **Scalar-subquery partition pruning**: `WHERE part_col = (SELECT max(x) 
FROM dim)` reads all partitions even when the subquery resolves to a single 
value.
   
   This PR closes both gaps using a minimal, immutable design.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. The change is internal to the optimizer/planner. Users see the same 
query results but, where applicable, with fewer files read at runtime.
   
   ### How was this patch tested?
   
   A new test suite `DataSourceV2FileSourceDPPSuite` covers seven cases:
   
   - DPP inserts `DynamicPruningSubquery` for V2 parquet partitioned table 
(positive)
   - DPP does NOT fire when join key is a non-partition column (negative)
   - DPP fires when partitioned fact is on the right side of the join
   - DPP filter reaches `BatchScanExec.runtimeFilters` as 
`DynamicPruningExpression` (physical-plan layer)
   - DPP prunes input partitions at runtime (verified via `numOutputRows` 
metric on the fact scan)
   - Scalar subquery on partition column of V2 parquet prunes partitions
   - Scalar-subquery filter does NOT wrap in `DynamicPruning` (regression guard)
   
   Existing test suites confirmed green:
   
   - `DataSourceV2EnhancedRuntimePartitionFilterSuite` (12 cases) — guards 
SPARK-56521 + SPARK-55596 iterative pushdown for non-file V2 sources
   - `OrcV2AggregatePushDownSuite` and `ParquetV2AggregatePushDownSuite` — V2 
aggregate pushdown unaffected
   - `DataSourceV2SQLSuiteV2Filter` (incl. SPARK-56467) — scalar-subquery 
classification unchanged for non-file V2 catalogs
   - `DynamicPartitionPruningV1SuiteAEOff/On` and 
`DynamicPartitionPruningV2SuiteAEOff/On` — V1 and in-memory V2 catalog DPP 
unaffected
   - `SubquerySuite` — scalar-subquery handling unchanged
   
   Total broader regression: **437 passed, 0 failed**.
   
   Local validation:
   
   ```bash
   build/sbt 'sql/testOnly *DataSourceV2FileSourceDPPSuite 
*DataSourceV2EnhancedRuntimePartitionFilterSuite *OrcV2AggregatePushDownSuite 
*ParquetV2AggregatePushDownSuite *DataSourceV2SQLSuiteV2Filter 
*DynamicPartitionPruningV1SuiteAEOff *DynamicPartitionPruningV1SuiteAEOn 
*DynamicPartitionPruningV2SuiteAEOff *DynamicPartitionPruningV2SuiteAEOn 
*SubquerySuite'
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to