[ 
https://issues.apache.org/jira/browse/SPARK-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14721447#comment-14721447
 ] 

Cheng Lian commented on SPARK-10301:
------------------------------------

Updated ticket description to provide a more general view of this issue. Would 
also be helpful for reviewing https://github.com/apache/spark/pull/8509

> For struct type, if parquet's global schema has less fields than a file's 
> schema, data reading will fail
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-10301
>                 URL: https://issues.apache.org/jira/browse/SPARK-10301
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Yin Huai
>            Assignee: Cheng Lian
>            Priority: Critical
>
> We hit this issue when reading a complex Parquet dateset without turning on 
> schema merging.  The data set consists of Parquet files with different but 
> compatible schemas.  In this way, the schema of the dataset is defined by 
> either a summary file or a random physical Parquet file if no summary files 
> are available.  Apparently, this schema may not containing all fields 
> appeared in all physicla files.
> Parquet was designed with schema evolution and column pruning in mind, so it 
> should be legal for a user to use a tailored schema to read the dataset to 
> save disk IO.  For example, say we have a Parquet dataset consisting of two 
> physical Parquet files with the following two schemas:
> {noformat}
> message m0 {
>   optional group f0 {
>     optional int64 f00;
>     optional int64 f01;
>   }
> }
> message m1 {
>   optional group f0 {
>     optional int64 f01;
>     optional int64 f01;
>     optional int64 f02;
>   }
>   optional double f1;
> }
> {noformat}
> Users should be allowed to read the dataset with the following schema:
> {noformat}
> message m1 {
>   optional group f0 {
>     optional int64 f01;
>     optional int64 f02;
>   }
> }
> {noformat}
> so that {{f0.f00}} and {{f1}} are never touched.  The above case can be 
> expressed by the following {{spark-shell}} snippet:
> {noformat}
> import sqlContext._
> import sqlContext.implicits._
> import org.apache.spark.sql.types.{LongType, StructType}
> val path = "/tmp/spark/parquet"
> range(3).selectExpr("NAMED_STRUCT('f00', id, 'f01', id) AS f0").coalesce(1)
>         .write.mode("overwrite").parquet(path)
> range(3).selectExpr("NAMED_STRUCT('f00', id, 'f01', id, 'f02', id) AS f0", 
> "CAST(id AS DOUBLE) AS f1").coalesce(1)
>         .write.mode("append").parquet(path)
> val tailoredSchema =
>   new StructType()
>     .add(
>       "f0",
>       new StructType()
>         .add("f01", LongType, nullable = true)
>         .add("f02", LongType, nullable = true),
>       nullable = true)
> read.schema(tailoredSchema).parquet(path).show()
> {noformat}
> Expected output should be:
> {noformat}
> +--------+
> |      f0|
> +--------+
> |[0,null]|
> |[1,null]|
> |[2,null]|
> |   [0,0]|
> |   [1,1]|
> |   [2,2]|
> +--------+
> {noformat}
> However, current 1.5-SNAPSHOT version throws the following exception:
> {noformat}
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in 
> block -1 in file 
> hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>         at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>         at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
>         at 
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206)
>         at 
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
>         at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>         ... 25 more
> 15/08/30 16:42:59 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; 
> aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 
> (TID 2, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 0 in block -1 in file 
> hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>         at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>         at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
>         at 
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206)
>         at 
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
>         at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>         ... 25 more
> Driver stacktrace:
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>         at scala.Option.foreach(Option.scala:236)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
>         at 
> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
>         at 
> org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1403)
>         at 
> org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1403)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>         at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1921)
>         at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1402)
>         at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1332)
>         at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1395)
>         at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
>         at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
>         at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
>         at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
>         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>         at $iwC$$iwC$$iwC.<init>(<console>:61)
>         at $iwC$$iwC.<init>(<console>:63)
>         at $iwC.<init>(<console>:65)
>         at <init>(<console>:67)
>         at .<init>(<console>:71)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>         at 
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
>         at 
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>         at 
> org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825)
>         at 
> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
>         at 
> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
>         at 
> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>         at 
> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>         at 
> scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
>         at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809)
>         at 
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
>         at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
>         at 
> org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
>         at 
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
>         at 
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>         at 
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>         at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at 
> org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
>         at org.apache.spark.repl.Main$.main(Main.scala:31)
>         at org.apache.spark.repl.Main.main(Main.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
>         at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
> hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>         at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>         at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
>         at 
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206)
>         at 
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
>         at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>         ... 25 more
> {noformat}
> This issue can be generalized a step further.  Taking interoperability into 
> consideration, we may have a Parquet dataset consisting of physical Parquet 
> files sharing compatible logical schema, but created by different Parquet 
> libraries.  Because of the historical nested type compatibility issue, 
> physical Parquet schemas generated by different by those libraries may be 
> different.  It would be nice to be able to operate on such datasets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to