[ 
https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15721376#comment-15721376
 ] 

Xiao Li commented on SPARK-18539:
---------------------------------

Basically, we have to know whether a column exists or not before 
executing/processing a query. Otherwise, we are unable to know whether a filter 
can be pushed down or not. 

To completely resolve the issue, we need to infer the schema every time when we 
need to read the external file. Schema inference is very expensive when we need 
to scan many many (small) files. Thus, it does not make sense for Spark to do 
it. Right?

> Cannot filter by nonexisting column in parquet file
> ---------------------------------------------------
>
>                 Key: SPARK-18539
>                 URL: https://issues.apache.org/jira/browse/SPARK-18539
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.1, 2.0.2
>            Reporter: Vitaly Gerasimov
>            Priority: Critical
>
> {code}
>   import org.apache.spark.SparkConf
>   import org.apache.spark.sql.SparkSession
>   import org.apache.spark.sql.types.DataTypes._
>   import org.apache.spark.sql.types.{StructField, StructType}
>   val sc = SparkSession.builder().config(new 
> SparkConf().setMaster("local")).getOrCreate()
>   val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}"""))
>   sc.read
>     .schema(StructType(Seq(StructField("a", IntegerType))))
>     .json(jsonRDD)
>     .write
>     .parquet("/tmp/test")
>   sc.read
>     .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", 
> IntegerType, nullable = true))))
>     .load("/tmp/test")
>     .createOrReplaceTempView("table")
>   sc.sql("select b from table where b is not null").show()
> {code}
> returns:
> {code}
> 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalArgumentException: Column [b] was not found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> expected result:
> {code}
> +---+
> |  b|
> +---+
> +---+
> {code}
> It works fine in 2.0.0 and 1.6.2. However, if I only select the nonexisting 
> column (without filter) it also works fine.
> Query plan:
> {code}
> == Parsed Logical Plan ==
> 'Project ['b]
> +- 'Filter isnotnull('b)
>    +- 'UnresolvedRelation `table`
> == Analyzed Logical Plan ==
> b: int
> Project [b#8]
> +- Filter isnotnull(b#8)
>    +- SubqueryAlias table
>       +- Relation[a#7,b#8] parquet
> == Optimized Logical Plan ==
> Project [b#8]
> +- Filter isnotnull(b#8)
>    +- Relation[a#7,b#8] parquet
> == Physical Plan ==
> *Project [b#8]
> +- *Filter isnotnull(b#8)
>    +- *BatchedScan parquet [b#8] Format: ParquetFormat, InputPaths: 
> file:/tmp/test, PartitionFilters: [], PushedFilters: [IsNotNull(b)], 
> ReadSchema: struct<b:int>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to