Re: SparkSQL exception on cached parquet table
Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 27 more If you take out the col_d from the thrift file, the problem goes away. The problem
Re: SparkSQL exception on cached parquet table
Which version are you running on again? On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at
Re: SparkSQL exception on cached parquet table
I am running on master, pulled yesterday I believe but saw the same issue with 1.2.0 On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust mich...@databricks.com wrote: Which version are you running on again? On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at
Re: SparkSQL exception on cached parquet table
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520 On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust mich...@databricks.com wrote: Can you open a JIRA? On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood sadhan.s...@gmail.com wrote: I am running on master, pulled yesterday I believe but saw the same issue with 1.2.0 On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust mich...@databricks.com wrote: Which version are you running on again? On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at
Re: SparkSQL exception on cached parquet table
(Forgot to cc user mail list) On 11/16/14 4:59 PM, Cheng Lian wrote: Hey Sadhan, Thanks for the additional information, this is helpful. Seems that some Parquet internal contract was broken, but I'm not sure whether it's caused by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged somehow. I'm investigating this. In the meanwhile, would you mind to help to narrow down the problem by trying to scan exactly the same Parquet file with some other systems (e.g. Hive or Impala)? If other systems work, then there must be something wrong with Spark SQL. Cheng On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com mailto:sadhan.s...@gmail.com wrote: Hi Cheng, Thanks for your response. Here is the stack trace from yarn logs: Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 26 more On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hi Sadhan, Could you please provide the stack trace of the |ArrayIndexOutOfBoundsException| (if any)? The reason why the first query succeeds is that Spark SQL doesn’t bother reading all data from the table to give |COUNT(*)|. In the second case, however, the whole table is asked to be cached lazily via the |cacheTable| call, thus it’s scanned to build the in-memory columnar cache. Then thing went wrong while scanning this LZO compressed Parquet file. But unfortunately the stack trace at hand doesn’t indicate the root cause. Cheng On 11/15/14 5:28 AM, Sadhan Sood wrote: While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
Re: SparkSQL exception on cached parquet table
Hi Cheng, I tried reading the parquet file(on which we were getting the exception) through parquet-tools and it is able to dump the file and I can read the metadata, etc. I also loaded the file through hive table and can run a table scan query on it as well. Let me know if I can do more to help resolve the problem, I'll run it through a debugger and see if I can get more information on it in the meantime. Thanks, Sadhan On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian lian.cs@gmail.com wrote: (Forgot to cc user mail list) On 11/16/14 4:59 PM, Cheng Lian wrote: Hey Sadhan, Thanks for the additional information, this is helpful. Seems that some Parquet internal contract was broken, but I'm not sure whether it's caused by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged somehow. I'm investigating this. In the meanwhile, would you mind to help to narrow down the problem by trying to scan exactly the same Parquet file with some other systems (e.g. Hive or Impala)? If other systems work, then there must be something wrong with Spark SQL. Cheng On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi Cheng, Thanks for your response. Here is the stack trace from yarn logs: Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 26 more On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Sadhan, Could you please provide the stack trace of the ArrayIndexOutOfBoundsException (if any)? The reason why the first query succeeds is that Spark SQL doesn’t bother reading all data from the table to give COUNT(*). In the second case, however, the whole table is asked to be cached lazily via the cacheTable call, thus it’s scanned to build the in-memory columnar cache. Then thing went wrong while scanning this LZO compressed Parquet file. But unfortunately the stack trace at hand doesn’t indicate the root cause. Cheng On 11/15/14 5:28 AM, Sadhan Sood wrote: While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
Re: SparkSQL exception on cached parquet table
Hi Sadhan, Could you please provide the stack trace of the |ArrayIndexOutOfBoundsException| (if any)? The reason why the first query succeeds is that Spark SQL doesn’t bother reading all data from the table to give |COUNT(*)|. In the second case, however, the whole table is asked to be cached lazily via the |cacheTable| call, thus it’s scanned to build the in-memory columnar cache. Then thing went wrong while scanning this LZO compressed Parquet file. But unfortunately the stack trace at hand doesn’t indicate the root cause. Cheng On 11/15/14 5:28 AM, Sadhan Sood wrote: While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException
Re: SparkSQL exception on cached parquet table
Hi Cheng, Thanks for your response.Here is the stack trace from yarn logs: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-exception-on-cached-parquet-table-tp18978p19020.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL exception on cached parquet table
While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException