[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file

2016-12-05 Thread Xin Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723888#comment-15723888
 ] 

Xin Wu edited comment on SPARK-18539 at 12/6/16 12:46 AM:
--

I think we will hit the issue if we use user-specified schema. Here is what I 
tried in spark-shell built from master branch:
{code}
val df = spark.range(1).coalesce(1)
df.selectExpr("id AS 
a").write.parquet("/Users/xinwu/spark-test/data/spark-18539")
val schema = StructType(Seq(StructField("a", IntegerType), StructField("b", 
IntegerType)))
spark.read.option("mergeSchema", 
"true").schema(schema).parquet("/Users/xinwu/spark-test/data/spark-18539").filter("b
 is null").count()
{code}

The exception is 
{code}
Caused by: java.lang.IllegalArgumentException: Column [b] was not found in 
schema!
  at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
  at 
org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:121)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
  at 
org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:308)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
  at 
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
  at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
  at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:377)
{code}

Here I have one parquet file missing column b and query with user-specified 
schema (a, b). 




was (Author: xwu0226):
I think we will hit the issue if we use user-specified schema. Here is what I 
tried in spark-shell built from master branch:
{code}
val df = spark.range(1).coalesce(1)
df.selectExpr("id AS 
a").write.parquet("/Users/xinwu/spark-test/data/spark-18539")
val schema = StructType(Seq(StructField("a", IntegerType), StructField("b", 
IntegerType)))
spark.read.option("mergeSchema", 
"true").schema(schema).parquet("/Users/xinwu/spark-test/data/spark-18539").filter("b
 < 0").count()
{code}

The exception is 
{code}
Caused by: java.lang.IllegalArgumentException: Column [b] was not found in 
schema!
  at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
  at 
org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:121)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
  at 
org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:308)
  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
  at 

[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file

2016-12-05 Thread Cheng Lian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723747#comment-15723747
 ] 

Cheng Lian edited comment on SPARK-18539 at 12/5/16 11:43 PM:
--

[~v-gerasimov], [~smilegator], and [~xwu0226], after some investigation, I 
don't think this is a bug now.

Just tested the master branch using the following test cases:
{code}
  for {
useVectorizedReader <- Seq(true, false)
mergeSchema <- Seq(true, false)
  } {
test(s"foo - mergeSchema: $mergeSchema - vectorized: $useVectorizedReader") 
{
  withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString) {
withTempPath { dir =>
  val path = dir.getCanonicalPath
  val df = spark.range(1).coalesce(1)

  df.selectExpr("id AS a", "id AS b").write.parquet(path)
  df.selectExpr("id AS a").write.mode("append").parquet(path)

  assertResult(0) {
spark.read
  .option("mergeSchema", mergeSchema.toString)
  .parquet(path)
  .filter("b < 0")
  .count()
  }
}
  }
}
  }
{code}
It turned out that this issue only happens when schema merging is turned off. 
This also explains why PR #9940 doesn't prevent PARQUET-389: because the trick 
PR #9940 employs happens during schema merging phase. On the other hand, you 
can't expect missing columns to be properly read when schema merging is turned 
off. Therefore, I don't think it's a bug.

The fix for the snippet mentioned in the ticket description is easy, just add 
{{.option("mergeSchema", "true")}} to enable schema merging.


was (Author: lian cheng):
[~v-gerasimov], [~smilegator], and [~xwu0226], after some investigation, I 
don't think this is a bug now.

Just tested the master branch using the following test cases:
{code}
  for {
useVectorizedReader <- Seq(true, false)
mergeSchema <- Seq(true, false)
  } {
test(s"foo - mergeSchema: $mergeSchema - vectorized: $useVectorizedReader") 
{
  withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString) {
withTempPath { dir =>
  val path = dir.getCanonicalPath
  val df = spark.range(1).coalesce(1)

  df.selectExpr("id AS a", "id AS b").write.parquet(path)
  df.selectExpr("id AS a").write.mode("append").parquet(path)

  assertResult(0) {
spark.read
  .option("mergeSchema", mergeSchema.toString)
  .parquet(path)
  .filter("b < 0")
  .count()
  }
}
  }
}
  }
{code}
It turned out that this issue only happens when schema merging is turned off. 
This also explains why PR #9940 doesn't prevent PARQUET-389: because the trick 
PR #9940 employs happens during schema merging phase. On the other hand, you 
can't expect missing columns can be properly read when schema merging is turned 
off. Therefore, I don't think it's a bug.

The fix for the snippet mentioned in the ticket description is easy, just add 
{{.option("mergeSchema", "true")}} to enable schema merging.

> Cannot filter by nonexisting column in parquet file
> ---
>
> Key: SPARK-18539
> URL: https://issues.apache.org/jira/browse/SPARK-18539
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.0.2
>Reporter: Vitaly Gerasimov
>Priority: Critical
>
> {code}
>   import org.apache.spark.SparkConf
>   import org.apache.spark.sql.SparkSession
>   import org.apache.spark.sql.types.DataTypes._
>   import org.apache.spark.sql.types.{StructField, StructType}
>   val sc = SparkSession.builder().config(new 
> SparkConf().setMaster("local")).getOrCreate()
>   val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}"""))
>   sc.read
> .schema(StructType(Seq(StructField("a", IntegerType
> .json(jsonRDD)
> .write
> .parquet("/tmp/test")
>   sc.read
> .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", 
> IntegerType, nullable = true
> .load("/tmp/test")
> .createOrReplaceTempView("table")
>   sc.sql("select b from table where b is not null").show()
> {code}
> returns:
> {code}
> 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalArgumentException: Column [b] was not found in schema!
>   at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>   at 
> 

[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file

2016-12-04 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15721499#comment-15721499
 ] 

Xiao Li edited comment on SPARK-18539 at 12/5/16 7:42 AM:
--

The error is from Parquet.
{noformat}
16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException: Column [b] was not found in schema!
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
{noformat}

If users specify the schema, we do not check whether the user-specified schema 
is right or wrong. We always respect the schema. If the schema contains the 
non-existent columns, we push down the filters to the Parquet, and then Parquet 
returns the above error to us.

Below is the test case you can try.

{noformat}
Seq("parquet").foreach { format =>

  withTempPath { path =>
Seq((1, "abc"), (2, "hello")).toDF("a", 
"b").write.format(format).save(path.toString)

// user-specified schema contains nonexistent columns
val schema = StructType(
  Seq(StructField("a", IntegerType),
StructField("b", StringType),
StructField("c", IntegerType)))
val readDf = 
spark.read.schema(schema).format(format).load(path.toString)

// Read the table without any filter
checkAnswer(readDf, Row(1, "abc", null) :: Row(2, "hello", null) :: Nil)
// Read the table with a filter on existing columns
checkAnswer(readDf.filter("a < 2"), Row(1, "abc", null) :: Nil)

val e = intercept[SparkException] {
  // Read the table with a filter on nonexistent columns
  readDf.filter("c < 2").show()
}.getMessage
assert(e.contains("Column [c] was not found in schema"))

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "false") {
  checkAnswer(readDf.filter("c < 2"), Nil)
}
  }
}
{noformat}


was (Author: smilegator):
The error is from Parquet.
{noformat}
16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException: Column [b] was not found in schema!
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
{noformat}

If users specify the schema, we do not check whether the user-specified schema 
is right or wrong. We always respect the schema. If the schema contains the 
non-existent columns, we push down the filters to the Parquet, and then Parquet 
returns the above error to us.

Below is the test case you can try.

{noformat}
Seq("parquet").foreach { format =>

  withTempPath { path =>
Seq((1, "abc"), (2, "hello")).toDF("a", 
"b").write.format(format).save(path.toString)

// user-specified schema contains nonexistent columns
val schema = StructType(
  Seq(StructField("a", IntegerType),
StructField("b", StringType),
StructField("c", IntegerType)))
val readDf = 
spark.read.schema(schema).format(format).load(path.toString)

// Read the table without any filter
checkAnswer(readDf, Row(1, "abc", null) :: Row(2, "hello", null) :: Nil)
// Read the table without a filter on existing columns
checkAnswer(readDf.filter("a < 2"), Row(1, "abc", null) :: Nil)

val e = intercept[SparkException] {
  // Read the table without a filter on nonexistent columns
  readDf.filter("c < 2").show()
}.getMessage
assert(e.contains("Column [c] was not found in schema"))

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "false") {
  checkAnswer(readDf.filter("c < 2"), Nil)
}
  }
}
{noformat}

> Cannot filter by nonexisting column in parquet file
> ---
>
> Key: SPARK-18539
> URL: https://issues.apache.org/jira/browse/SPARK-18539
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.0.2
>Reporter: Vitaly Gerasimov
>Priority: Critical
>
> {code}
>   import org.apache.spark.SparkConf
>   import org.apache.spark.sql.SparkSession
>   import org.apache.spark.sql.types.DataTypes._
>   import org.apache.spark.sql.types.{StructField, StructType}
>   val sc = SparkSession.builder().config(new 
> 

[jira] [Comment Edited] (SPARK-18539) Cannot filter by nonexisting column in parquet file

2016-11-26 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15697793#comment-15697793
 ] 

Dongjoon Hyun edited comment on SPARK-18539 at 11/26/16 11:44 AM:
--

Interesting. Is it valid for invalid `b`?
{code}
sc.read
.schema(StructType(Seq(StructField("a", IntegerType), StructField("b", 
IntegerType, nullable = true
.load("/tmp/test")
.createOrReplaceTempView("table")
{code}


was (Author: dongjoon):
Interesting. Is it valid?
{code}
sc.read
.schema(StructType(Seq(StructField("a", IntegerType), StructField("b", 
IntegerType, nullable = true
.load("/tmp/test")
.createOrReplaceTempView("table")
{code}

> Cannot filter by nonexisting column in parquet file
> ---
>
> Key: SPARK-18539
> URL: https://issues.apache.org/jira/browse/SPARK-18539
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.0.2
>Reporter: Vitaly Gerasimov
>Priority: Critical
>
> {code}
>   import org.apache.spark.SparkConf
>   import org.apache.spark.sql.SparkSession
>   import org.apache.spark.sql.types.DataTypes._
>   import org.apache.spark.sql.types.{StructField, StructType}
>   val sc = SparkSession.builder().config(new 
> SparkConf().setMaster("local")).getOrCreate()
>   val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}"""))
>   sc.read
> .schema(StructType(Seq(StructField("a", IntegerType
> .json(jsonRDD)
> .write
> .parquet("/tmp/test")
>   sc.read
> .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", 
> IntegerType, nullable = true
> .load("/tmp/test")
> .createOrReplaceTempView("table")
>   sc.sql("select b from table where b is not null").show()
> {code}
> returns:
> {code}
> 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalArgumentException: Column [b] was not found in schema!
>   at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>   at 
> org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
>   at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>   at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>   at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>   at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>   at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
>