[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720964#comment-15720964 ]
Xiao Li commented on SPARK-18539: --------------------------------- The parquet filter push-down of Spark 2.x is different from the Spark 1.x. Since Spark 2.0, parquet scan starts using vectorization. Unfortunately, Spark 2.0.0 had a serious bug in filter push-down : https://issues.apache.org/jira/browse/SPARK-15639 After the fix was merged into Spark 2.0.1, you hit the behavior difference. We always respect user-specified schemas. When user-specified schema does not match the actual data schema, the behaviors are not well-defined. You might hit different errors or unexpected results. The behaviors could be different when you choose different formats. Let me dig it deeper about this specific issues and might provide you a better answer later > Cannot filter by nonexisting column in parquet file > --------------------------------------------------- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug > Affects Versions: 2.0.1, 2.0.2 > Reporter: Vitaly Gerasimov > Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType)))) > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true)))) > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > expected result: > {code} > +---+ > | b| > +---+ > +---+ > {code} > It works fine in 2.0.0 and 1.6.2. However, if I only select the nonexisting > column (without filter) it also works fine. > Query plan: > {code} > == Parsed Logical Plan == > 'Project ['b] > +- 'Filter isnotnull('b) > +- 'UnresolvedRelation `table` > == Analyzed Logical Plan == > b: int > Project [b#8] > +- Filter isnotnull(b#8) > +- SubqueryAlias table > +- Relation[a#7,b#8] parquet > == Optimized Logical Plan == > Project [b#8] > +- Filter isnotnull(b#8) > +- Relation[a#7,b#8] parquet > == Physical Plan == > *Project [b#8] > +- *Filter isnotnull(b#8) > +- *BatchedScan parquet [b#8] Format: ParquetFormat, InputPaths: > file:/tmp/test, PartitionFilters: [], PushedFilters: [IsNotNull(b)], > ReadSchema: struct<b:int> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org