[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723888#comment-15723888 ] Xin Wu edited comment on SPARK-18539 at 12/6/16 12:46 AM: -- I think we will hit the issue if we use user-specified schema. Here is what I tried in spark-shell built from master branch: {code} val df = spark.range(1).coalesce(1) df.selectExpr("id AS a").write.parquet("/Users/xinwu/spark-test/data/spark-18539") val schema = StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType))) spark.read.option("mergeSchema", "true").schema(schema).parquet("/Users/xinwu/spark-test/data/spark-18539").filter("b is null").count() {code} The exception is {code} Caused by: java.lang.IllegalArgumentException: Column [b] was not found in schema! at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58) at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:121) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58) at org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:308) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:377) {code} Here I have one parquet file missing column b and query with user-specified schema (a, b). was (Author: xwu0226): I think we will hit the issue if we use user-specified schema. Here is what I tried in spark-shell built from master branch: {code} val df = spark.range(1).coalesce(1) df.selectExpr("id AS a").write.parquet("/Users/xinwu/spark-test/data/spark-18539") val schema = StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType))) spark.read.option("mergeSchema", "true").schema(schema).parquet("/Users/xinwu/spark-test/data/spark-18539").filter("b < 0").count() {code} The exception is {code} Caused by: java.lang.IllegalArgumentException: Column [b] was not found in schema! at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58) at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:121) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58) at org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:308) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) at
[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723747#comment-15723747 ] Cheng Lian edited comment on SPARK-18539 at 12/5/16 11:43 PM: -- [~v-gerasimov], [~smilegator], and [~xwu0226], after some investigation, I don't think this is a bug now. Just tested the master branch using the following test cases: {code} for { useVectorizedReader <- Seq(true, false) mergeSchema <- Seq(true, false) } { test(s"foo - mergeSchema: $mergeSchema - vectorized: $useVectorizedReader") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(1).coalesce(1) df.selectExpr("id AS a", "id AS b").write.parquet(path) df.selectExpr("id AS a").write.mode("append").parquet(path) assertResult(0) { spark.read .option("mergeSchema", mergeSchema.toString) .parquet(path) .filter("b < 0") .count() } } } } } {code} It turned out that this issue only happens when schema merging is turned off. This also explains why PR #9940 doesn't prevent PARQUET-389: because the trick PR #9940 employs happens during schema merging phase. On the other hand, you can't expect missing columns to be properly read when schema merging is turned off. Therefore, I don't think it's a bug. The fix for the snippet mentioned in the ticket description is easy, just add {{.option("mergeSchema", "true")}} to enable schema merging. was (Author: lian cheng): [~v-gerasimov], [~smilegator], and [~xwu0226], after some investigation, I don't think this is a bug now. Just tested the master branch using the following test cases: {code} for { useVectorizedReader <- Seq(true, false) mergeSchema <- Seq(true, false) } { test(s"foo - mergeSchema: $mergeSchema - vectorized: $useVectorizedReader") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(1).coalesce(1) df.selectExpr("id AS a", "id AS b").write.parquet(path) df.selectExpr("id AS a").write.mode("append").parquet(path) assertResult(0) { spark.read .option("mergeSchema", mergeSchema.toString) .parquet(path) .filter("b < 0") .count() } } } } } {code} It turned out that this issue only happens when schema merging is turned off. This also explains why PR #9940 doesn't prevent PARQUET-389: because the trick PR #9940 employs happens during schema merging phase. On the other hand, you can't expect missing columns can be properly read when schema merging is turned off. Therefore, I don't think it's a bug. The fix for the snippet mentioned in the ticket description is easy, just add {{.option("mergeSchema", "true")}} to enable schema merging. > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at >
[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15721499#comment-15721499 ] Xiao Li edited comment on SPARK-18539 at 12/5/16 7:42 AM: -- The error is from Parquet. {noformat} 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Column [b] was not found in schema! at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) {noformat} If users specify the schema, we do not check whether the user-specified schema is right or wrong. We always respect the schema. If the schema contains the non-existent columns, we push down the filters to the Parquet, and then Parquet returns the above error to us. Below is the test case you can try. {noformat} Seq("parquet").foreach { format => withTempPath { path => Seq((1, "abc"), (2, "hello")).toDF("a", "b").write.format(format).save(path.toString) // user-specified schema contains nonexistent columns val schema = StructType( Seq(StructField("a", IntegerType), StructField("b", StringType), StructField("c", IntegerType))) val readDf = spark.read.schema(schema).format(format).load(path.toString) // Read the table without any filter checkAnswer(readDf, Row(1, "abc", null) :: Row(2, "hello", null) :: Nil) // Read the table with a filter on existing columns checkAnswer(readDf.filter("a < 2"), Row(1, "abc", null) :: Nil) val e = intercept[SparkException] { // Read the table with a filter on nonexistent columns readDf.filter("c < 2").show() }.getMessage assert(e.contains("Column [c] was not found in schema")) withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "false") { checkAnswer(readDf.filter("c < 2"), Nil) } } } {noformat} was (Author: smilegator): The error is from Parquet. {noformat} 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Column [b] was not found in schema! at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) {noformat} If users specify the schema, we do not check whether the user-specified schema is right or wrong. We always respect the schema. If the schema contains the non-existent columns, we push down the filters to the Parquet, and then Parquet returns the above error to us. Below is the test case you can try. {noformat} Seq("parquet").foreach { format => withTempPath { path => Seq((1, "abc"), (2, "hello")).toDF("a", "b").write.format(format).save(path.toString) // user-specified schema contains nonexistent columns val schema = StructType( Seq(StructField("a", IntegerType), StructField("b", StringType), StructField("c", IntegerType))) val readDf = spark.read.schema(schema).format(format).load(path.toString) // Read the table without any filter checkAnswer(readDf, Row(1, "abc", null) :: Row(2, "hello", null) :: Nil) // Read the table without a filter on existing columns checkAnswer(readDf.filter("a < 2"), Row(1, "abc", null) :: Nil) val e = intercept[SparkException] { // Read the table without a filter on nonexistent columns readDf.filter("c < 2").show() }.getMessage assert(e.contains("Column [c] was not found in schema")) withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "false") { checkAnswer(readDf.filter("c < 2"), Nil) } } } {noformat} > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new >
[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15697793#comment-15697793 ] Dongjoon Hyun edited comment on SPARK-18539 at 11/26/16 11:44 AM: -- Interesting. Is it valid for invalid `b`? {code} sc.read .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType, nullable = true .load("/tmp/test") .createOrReplaceTempView("table") {code} was (Author: dongjoon): Interesting. Is it valid? {code} sc.read .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType, nullable = true .load("/tmp/test") .createOrReplaceTempView("table") {code} > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at >