Peter Toth created SPARK-26893:
----------------------------------

             Summary: Allow pushdown of partition pruning subquery filters to 
file source
                 Key: SPARK-26893
                 URL: https://issues.apache.org/jira/browse/SPARK-26893
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.0.0
            Reporter: Peter Toth


File source doesn't use subquery filters for partition pruning. But it could 
use those filters with a minor improvement.

This query is an example:
{noformat}
CREATE TABLE a (id INT, p INT) USING PARQUET PARTITIONED BY (p)
CREATE TABLE b (id INT) USING PARQUET
SELECT * FROM a WHERE p <= (SELECT MIN(id) FROM b){noformat}
Where the executed plan of the SELECT currently is:
{noformat}
*(1) Filter (p#252L <= Subquery subquery250)
: +- Subquery subquery250
: +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], 
output=[min(id)#255L])
: +- Exchange SinglePartition
: +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], 
output=[min#259L])
: +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b],
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: 
[], Format: Parquet, Location: 
PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0,
 file:..., PartitionCount: 2, PartitionFilters: [isnotnull(p#252L)], 
PushedFilters: [], ReadSchema: struct<id:bigint>
{noformat}
But it could be: 
{noformat}
*(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0,
 file:..., PartitionFilters: [isnotnull(p#252L), (p#252L <= Subquery 
subquery250)], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Subquery subquery250
+- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], 
output=[min#259L])
+- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b],
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
{noformat}
and so partition pruning could work in {{FileSourceScanExec}}.
 Please note that {{PartitionCount}} metadata can't be computed before 
execution so in this case it is no longer part of the plan.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to