LuciferYang opened a new pull request, #55579: URL: https://github.com/apache/spark/pull/55579
### What changes were proposed in this pull request? Extends Dynamic Partition Pruning (DPP) and scalar-subquery partition pruning to V2 file sources (Parquet, ORC, CSV, JSON, Avro, Text). Before this change, V2 file scans were not recognized as DPP-eligible by `PartitionPruning.getFilterableTableScan` because they do not implement `SupportsRuntimeV2Filtering`, and scalar-subquery filters on partition columns were left as post-scan `FilterExec` without any file-level pruning. Implementation summary (4 files modified, ~50 lines): - `PartitionPruning.scala`: extend `getFilterableTableScan` with a case matching `DataSourceV2ScanRelation(scan: FileScan)`, resolving partition columns from `FileScan.readPartitionSchema`. The DPP rule then inserts the existing `DynamicPruningSubquery`; `PlanDynamicPruningFilters` and `CleanupDynamicPruningFilters` flow unchanged. - `FileScan.scala`: factor partition listing into `partitionsImpl(allPartitionFilters)`. Add public `planInputPartitionsWithRuntimeFilters(extraFilters)` that re-lists files with `partitionFilters ++ extraFilters`. **Immutable** — no mutable state added on the scan. - `BatchScanExec.scala`: `filteredPartitions` special-cases `FileScan` with non-empty `runtimeFilters` by calling the new method. Non-file scans continue to use `PushDownUtils.pushRuntimeFilters` as before, **preserving the `SupportsRuntimeV2Filtering` iterative-pushdown contract for upstream connectors (Iceberg, Delta).** - `DataSourceV2Strategy.scala`: lift scalar-subquery filters whose references are a subset of partition columns into `BatchScanExec.runtimeFilters` via an `effectiveRuntimeFilterAttrs` computation. Filters stay in `postScanFilters` for correctness; runtime layer applies them for pruning. **Scalar subqueries are NOT wrapped in `DynamicPruningExpression`** — that type remains reserved for join-derived pruning, so the runtime-filter classifier in `DataSourceV2EnhancedRuntimePartitionFilterSuite` stays correct. This addresses the same gap as the previously-stale [PR #37514](https://github.com/apache/spark/pull/37514) but with three deliberate design improvements: 1. Immutable runtime-filter slot (no mutating `var` on `FileScan`). 2. No new public trait — `PushDownUtils` ordering and the `SupportsRuntimeV2Filtering` contract for non-file V2 sources are untouched. 3. Scalar-subquery routing without `DynamicPruningExpression` wrapping (preserves the test-classifier semantics in the SPARK-56521 / SPARK-55596 work). ### Why are the changes needed? V2 file sources are widely used (especially V2 Parquet), but they currently miss two important optimizations that V1 file sources have via `HadoopFsRelation`: 1. **DPP**: a star-schema query like `SELECT * FROM fact JOIN dim ON fact.part = dim.id WHERE dim.x = 'k'` reads all partitions of `fact` even when `fact.part` matches only a few values from the filtered `dim`. 2. **Scalar-subquery partition pruning**: `WHERE part_col = (SELECT max(x) FROM dim)` reads all partitions even when the subquery resolves to a single value. This PR closes both gaps using a minimal, immutable design. ### Does this PR introduce _any_ user-facing change? No. The change is internal to the optimizer/planner. Users see the same query results but, where applicable, with fewer files read at runtime. ### How was this patch tested? A new test suite `DataSourceV2FileSourceDPPSuite` covers seven cases: - DPP inserts `DynamicPruningSubquery` for V2 parquet partitioned table (positive) - DPP does NOT fire when join key is a non-partition column (negative) - DPP fires when partitioned fact is on the right side of the join - DPP filter reaches `BatchScanExec.runtimeFilters` as `DynamicPruningExpression` (physical-plan layer) - DPP prunes input partitions at runtime (verified via `numOutputRows` metric on the fact scan) - Scalar subquery on partition column of V2 parquet prunes partitions - Scalar-subquery filter does NOT wrap in `DynamicPruning` (regression guard) Existing test suites confirmed green: - `DataSourceV2EnhancedRuntimePartitionFilterSuite` (12 cases) — guards SPARK-56521 + SPARK-55596 iterative pushdown for non-file V2 sources - `OrcV2AggregatePushDownSuite` and `ParquetV2AggregatePushDownSuite` — V2 aggregate pushdown unaffected - `DataSourceV2SQLSuiteV2Filter` (incl. SPARK-56467) — scalar-subquery classification unchanged for non-file V2 catalogs - `DynamicPartitionPruningV1SuiteAEOff/On` and `DynamicPartitionPruningV2SuiteAEOff/On` — V1 and in-memory V2 catalog DPP unaffected - `SubquerySuite` — scalar-subquery handling unchanged Total broader regression: **437 passed, 0 failed**. Local validation: ```bash build/sbt 'sql/testOnly *DataSourceV2FileSourceDPPSuite *DataSourceV2EnhancedRuntimePartitionFilterSuite *OrcV2AggregatePushDownSuite *ParquetV2AggregatePushDownSuite *DataSourceV2SQLSuiteV2Filter *DynamicPartitionPruningV1SuiteAEOff *DynamicPartitionPruningV1SuiteAEOn *DynamicPartitionPruningV2SuiteAEOff *DynamicPartitionPruningV2SuiteAEOn *SubquerySuite' ``` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
