Borjianamin98 opened a new pull request, #36781:
URL: https://github.com/apache/spark/pull/36781

   ### What changes were proposed in this pull request?
   
   In Spark version 3.1.0 and newer, Spark creates extra filter predicate 
conditions for repeated parquet columns.
   These fields do not have the ability to have a filter predicate, according 
to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in 
the parquet library.
   
   This PR solves this problem until the appropriate functionality is provided 
by the parquet.
   
   Before this PR:
   
   Assume follow Protocol buffer schema:
   
   ```
   message Model {
       string name = 1;
       repeated string keywords = 2;
   }
   ```
   
   Suppose a parquet file is created from a set of records in the above format 
with the help of the parquet-protobuf library.
   Using Spark version 3.1.0 or newer, we get following exception when run the 
following query using spark-shell:
   
   ```
   val data = spark.read.parquet("/path/to/parquet")
   data.registerTempTable("models")
   spark.sql("select * from models where array_contains(keywords, 
'X')").show(false)
   ```
   
   ```
   Caused by: java.lang.IllegalArgumentException: FilterPredicates do not 
currently support repeated columns. Column keywords is repeated.
     at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
     at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
     at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
     at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
     at 
org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
     at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
     at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
     at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
     at 
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
     at 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
     at 
org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
     at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
     at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
     at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
     at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
     at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
     at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
   ...
   ```
   
   The cause of the problem is due to a change in the data filtering conditions:
   
   ```
   spark.sql("select * from log where array_contains(keywords, 
'X')").explain(true);
   
   // Spark 3.0.2 and older
   == Physical Plan ==
   ... 
   +- FileScan parquet [link#0,keywords#1]
     DataFilters: [array_contains(keywords#1, Google)]
     PushedFilters: []
     ...
   
   // Spark 3.1.0 and newer
   == Physical Plan == ... 
   +- FileScan parquet [link#0,keywords#1]
     DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
     PushedFilters: [IsNotNull(keywords)]
     ...
   ```
   
   Pushing filters down for repeated columns of parquet is not necessary 
because it is not supported by parquet library for now. So we can exclude them 
from pushed predicate filters and solve issue.
   
   ### Why are the changes needed?
   
   Predicate filters that are pushed down to parquet should not be created on 
repeated-type fields.
   
   ### Does this PR introduce any user-facing change?
   
   No, It's only fixed a bug and before this, due to the limitations of the 
parquet library, no more work was possible.
   
   ### How was this patch tested?
   
   Need no more tests and checked only by executing code base tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to