[ 
https://issues.apache.org/jira/browse/SPARK-39393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huaxin Gao resolved SPARK-39393.
--------------------------------
    Fix Version/s: 3.1.3
                   3.3.0
                   3.2.2
                   3.4.0
         Assignee: Amin Borjian
       Resolution: Fixed

> Parquet data source only supports push-down predicate filters for 
> non-repeated primitive types
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39393
>                 URL: https://issues.apache.org/jira/browse/SPARK-39393
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
>            Reporter: Amin Borjian
>            Assignee: Amin Borjian
>            Priority: Major
>              Labels: parquet
>             Fix For: 3.1.3, 3.3.0, 3.2.2, 3.4.0
>
>
> I use an example to illustrate the problem. The reason for the problem and 
> the problem-solving approach are stated below.
> Assume follow Protocol buffer schema:
> {code:java}
> message Model {
>      string name = 1;
>      repeated string keywords = 2;
> }
> {code}
> Suppose a parquet file is created from a set of records in the above format 
> with the help of the {{parquet-protobuf}} library.
> Using Spark version 3.0.2 or older, we could run the following query using 
> {{{}spark-shell{}}}:
> {code:java}
> val data = spark.read.parquet("/path/to/parquet")
> data.registerTempTable("models")
> spark.sql("select * from models where array_contains(keywords, 
> 'X')").show(false)
> {code}
> But after updating Spark, we get the following error:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: FilterPredicates do not 
> currently support repeated columns. Column keywords is repeated.
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
>   at 
> org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
>   at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
>   at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
>   at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
>   at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
> ...
> {code}
> At first it seems the problem is the parquet library. But in fact, our 
> problem is because of this line that has been around since 2014 (based on Git 
> history):
> [Parquet Schema Compatibility 
> Validator|https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java#L194]
> After some check, I notice that the cause of the problem is due to a change 
> in the data filtering conditions:
> {code:java}
> spark.sql("select * from log where array_contains(keywords, 
> 'X')").explain(true);
> // Spark 3.0.2 and older
> == Physical Plan ==
> ... 
> +- FileScan parquet [link#0,keywords#1]
>   DataFilters: [array_contains(keywords#1, Google)]
>   PushedFilters: []
>   ...
> // Spark 3.1.0 and newer
> == Physical Plan == ... 
> +- FileScan parquet [link#0,keywords#1]
>   DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
>   PushedFilters: [IsNotNull(keywords)]
>   ...{code}
> It's good that the filtering section has become smarter. Unfortunately, due 
> to unfamiliarity with code base, I could not find the exact location of the 
> change and related pull request. In general, this change is suitable for 
> non-repeated parquet fields, but in the repeated field, it causes an error 
> from the parquet library. (Like the example given)
> The only temporary solution in my opinion to solve the problem is to disable 
> the following setting, which in general greatly reduces performance:
> {code:java}
> SET spark.sql.parquet.filterPushdown=false {code}
> I created a patch for this bug and a pull request will be sent soon.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to