[ https://issues.apache.org/jira/browse/SPARK-35985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-35985: ------------------------------------ Assignee: Apache Spark > File source V2 ignores partition filters when empty readDataSchema > ------------------------------------------------------------------ > > Key: SPARK-35985 > URL: https://issues.apache.org/jira/browse/SPARK-35985 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0 > Reporter: Steven Aerts > Assignee: Apache Spark > Priority: Major > > A V2 datasource fails to rely on partition filters when it only wants to know > how many entries there are, and is not interested of their context. > So when the {{readDataSchema}} of the {{FileScan}} is empty, partition > filters are not pushed down and all data is scanned. > Some examples where this happens: > {code:java} > scala> spark.sql("SELECT count(*) FROM parq WHERE day=20210702").explain > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#136] > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) > +- *(1) Project > +- *(1) Filter (isnotnull(day#68) AND (day#68 = 20210702)) > +- *(1) ColumnarToRow > +- BatchScan[day#68] ParquetScan DataFilters: [], Format: parquet, Location: > InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilers: > [IsNotNull(day), EqualTo(day,20210702)], ReadSchema: struct<>, PushedFilters: > [IsNotNull(day), EqualTo(day,20210702)] > scala> spark.sql("SELECT input_file_name() FROM parq WHERE > day=20210702").explain > == Physical Plan == > *(1) Project [input_file_name() AS input_file_name()#131] > +- *(1) Filter (isnotnull(day#68) AND (day#68 = 20210702)) > +- *(1) ColumnarToRow > +- BatchScan[day#68] ParquetScan DataFilters: [], Format: parquet, Location: > InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilers: > [IsNotNull(day), EqualTo(day,20210702)], ReadSchema: struct<>, PushedFilters: > [IsNotNull(day), EqualTo(day,20210702)] > {code} > > Once the {{readDataSchema}} is not empty, it works correctly: > {code:java} > scala> spark.sql("SELECT header.tenant FROM parq WHERE day=20210702").explain > == Physical Plan == > *(1) Project [header#51.tenant AS tenant#199] > +- BatchScan[header#51, day#68] ParquetScan DataFilters: [], Format: parquet, > Location: InMemoryFileIndex[file:/..., PartitionFilters: [isnotnull(day#68), > (day#68 = 20210702)], PushedFilers: [IsNotNull(day), EqualTo(day,20210702)], > ReadSchema: struct<header:struct<tenant:string>>, PushedFilters: > [IsNotNull(day), EqualTo(day,20210702)]{code} > > In V1 this optimization is available: > {code:java} > scala> spark.sql("SELECT count(*) FROM parq WHERE day=20210702").explain > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)]) > +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#27] > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) > +- *(1) Project > +- *(1) ColumnarToRow > +- FileScan parquet [year#15,month#16,day#17,hour#18] Batched: true, > DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/..., > PartitionFilters: [isnotnull(day#17), (day#17 = 20210702)], PushedFilters: > [], ReadSchema: struct<>{code} > The examples use {{ParquetScan}}, but the problem happens for all File based > V2 datasources. > The fix for this issue feels very straight forward. In > {{PruneFileSourcePartitions}} queries with an empty {{readDataSchema}} are > explicitly excluded from being pushed down: > {code:java} > if filters.nonEmpty && scan.readDataSchema.nonEmpty =>{code} > Removing that condition seems to fix the issue however, this might be too > naive. > I am making a PR with tests where this change can be discussed. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org