[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15724110#comment-15724110 ]
Liang-Chi Hsieh commented on SPARK-18539: ----------------------------------------- That's cool. > Cannot filter by nonexisting column in parquet file > --------------------------------------------------- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug > Affects Versions: 2.0.1, 2.0.2 > Reporter: Vitaly Gerasimov > Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType)))) > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true)))) > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > expected result: > {code} > +---+ > | b| > +---+ > +---+ > {code} > It works fine in 2.0.0 and 1.6.2. However, if I only select the nonexisting > column (without filter) it also works fine. > Query plan: > {code} > == Parsed Logical Plan == > 'Project ['b] > +- 'Filter isnotnull('b) > +- 'UnresolvedRelation `table` > == Analyzed Logical Plan == > b: int > Project [b#8] > +- Filter isnotnull(b#8) > +- SubqueryAlias table > +- Relation[a#7,b#8] parquet > == Optimized Logical Plan == > Project [b#8] > +- Filter isnotnull(b#8) > +- Relation[a#7,b#8] parquet > == Physical Plan == > *Project [b#8] > +- *Filter isnotnull(b#8) > +- *BatchedScan parquet [b#8] Format: ParquetFormat, InputPaths: > file:/tmp/test, PartitionFilters: [], PushedFilters: [IsNotNull(b)], > ReadSchema: struct<b:int> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org