[ https://issues.apache.org/jira/browse/SPARK-39393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huaxin Gao resolved SPARK-39393. -------------------------------- Fix Version/s: 3.1.3 3.3.0 3.2.2 3.4.0 Assignee: Amin Borjian Resolution: Fixed > Parquet data source only supports push-down predicate filters for > non-repeated primitive types > ---------------------------------------------------------------------------------------------- > > Key: SPARK-39393 > URL: https://issues.apache.org/jira/browse/SPARK-39393 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1 > Reporter: Amin Borjian > Assignee: Amin Borjian > Priority: Major > Labels: parquet > Fix For: 3.1.3, 3.3.0, 3.2.2, 3.4.0 > > > I use an example to illustrate the problem. The reason for the problem and > the problem-solving approach are stated below. > Assume follow Protocol buffer schema: > {code:java} > message Model { > string name = 1; > repeated string keywords = 2; > } > {code} > Suppose a parquet file is created from a set of records in the above format > with the help of the {{parquet-protobuf}} library. > Using Spark version 3.0.2 or older, we could run the following query using > {{{}spark-shell{}}}: > {code:java} > val data = spark.read.parquet("/path/to/parquet") > data.registerTempTable("models") > spark.sql("select * from models where array_contains(keywords, > 'X')").show(false) > {code} > But after updating Spark, we get the following error: > {code:java} > Caused by: java.lang.IllegalArgumentException: FilterPredicates do not > currently support repeated columns. Column keywords is repeated. > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72) > at > org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870) > at > org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789) > at > org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) > at > org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162) > at > org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127) > ... > {code} > At first it seems the problem is the parquet library. But in fact, our > problem is because of this line that has been around since 2014 (based on Git > history): > [Parquet Schema Compatibility > Validator|https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java#L194] > After some check, I notice that the cause of the problem is due to a change > in the data filtering conditions: > {code:java} > spark.sql("select * from log where array_contains(keywords, > 'X')").explain(true); > // Spark 3.0.2 and older > == Physical Plan == > ... > +- FileScan parquet [link#0,keywords#1] > DataFilters: [array_contains(keywords#1, Google)] > PushedFilters: [] > ... > // Spark 3.1.0 and newer > == Physical Plan == ... > +- FileScan parquet [link#0,keywords#1] > DataFilters: [isnotnull(keywords#1), array_contains(keywords#1, Google)] > PushedFilters: [IsNotNull(keywords)] > ...{code} > It's good that the filtering section has become smarter. Unfortunately, due > to unfamiliarity with code base, I could not find the exact location of the > change and related pull request. In general, this change is suitable for > non-repeated parquet fields, but in the repeated field, it causes an error > from the parquet library. (Like the example given) > The only temporary solution in my opinion to solve the problem is to disable > the following setting, which in general greatly reduces performance: > {code:java} > SET spark.sql.parquet.filterPushdown=false {code} > I created a patch for this bug and a pull request will be sent soon. > > -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org