Can you open a JIRA? On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood <sadhan.s...@gmail.com> wrote:
> I am running on master, pulled yesterday I believe but saw the same issue > with 1.2.0 > > On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> Which version are you running on again? >> >> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sadhan.s...@gmail.com> >> wrote: >> >>> Also attaching the parquet file if anyone wants to take a further look. >>> >>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sadhan.s...@gmail.com> >>> wrote: >>> >>>> So, I am seeing this issue with spark sql throwing an exception when >>>> trying to read selective columns from a thrift parquet file and also when >>>> caching them: >>>> On some further digging, I was able to narrow it down to at-least one >>>> particular column type: map<string, set<string>> to be causing this issue. >>>> To reproduce this I created a test thrift file with a very basic schema and >>>> stored some sample data in a parquet file: >>>> >>>> Test.thrift >>>> =========== >>>> typedef binary SomeId >>>> >>>> enum SomeExclusionCause { >>>> WHITELIST = 1, >>>> HAS_PURCHASE = 2, >>>> } >>>> >>>> struct SampleThriftObject { >>>> 10: string col_a; >>>> 20: string col_b; >>>> 30: string col_c; >>>> 40: optional map<SomeExclusionCause, set<SomeId>> col_d; >>>> } >>>> ============= >>>> >>>> And loading the data in spark through schemaRDD: >>>> >>>> import org.apache.spark.sql.SchemaRDD >>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc); >>>> val parquetFile = "/path/to/generated/parquet/file" >>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile) >>>> parquetFileRDD.printSchema >>>> root >>>> |-- col_a: string (nullable = true) >>>> |-- col_b: string (nullable = true) >>>> |-- col_c: string (nullable = true) >>>> |-- col_d: map (nullable = true) >>>> | |-- key: string >>>> | |-- value: array (valueContainsNull = true) >>>> | | |-- element: string (containsNull = false) >>>> >>>> parquetFileRDD.registerTempTable("test") >>>> sqlContext.cacheTable("test") >>>> sqlContext.sql("select col_a from test").collect() <-- see the >>>> exception stack here >>>> >>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read >>>> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet >>>> at >>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) >>>> at >>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) >>>> at >>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) >>>> at >>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>> at >>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>> at >>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>> at >>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>> at >>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>> at >>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) >>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) >>>> at >>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) >>>> at >>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) >>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >>>> at java.util.ArrayList.elementData(ArrayList.java:418) >>>> at java.util.ArrayList.get(ArrayList.java:431) >>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >>>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) >>>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) >>>> at >>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282) >>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) >>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) >>>> at >>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) >>>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) >>>> at >>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) >>>> at >>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) >>>> ... 27 more >>>> >>>> If you take out the col_d from the thrift file, the problem goes away. >>>> The problem also shows up when trying to read the particular column without >>>> caching the table first. The same file can be dumped/read using >>>> parquet-tools just fine. Here is the file dump using parquet-tools: >>>> >>>> row group 0 >>>> -------------------------------------------------------------------------------- >>>> col_a: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC >>>> [more]... >>>> col_b: BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN >>>> [more]... >>>> col_c: BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E >>>> [more]... >>>> col_d: >>>> .map: >>>> ..key: BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E >>>> [more]... >>>> ..value: >>>> ...value_tuple: BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E >>>> [more]... >>>> >>>> col_a TV=9 RL=0 DL=1 >>>> >>>> ---------------------------------------------------------------------------- >>>> page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9 >>>> >>>> col_b TV=9 RL=0 DL=1 >>>> >>>> ---------------------------------------------------------------------------- >>>> page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9 >>>> >>>> col_c TV=9 RL=0 DL=1 >>>> >>>> ---------------------------------------------------------------------------- >>>> page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9 >>>> >>>> col_d.map.key TV=9 RL=1 DL=2 >>>> >>>> ---------------------------------------------------------------------------- >>>> page 0: DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9 >>>> >>>> col_d.map.value.value_tuple TV=9 RL=2 DL=4 >>>> >>>> ---------------------------------------------------------------------------- >>>> page 0: DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9 >>>> >>>> BINARY col_a >>>> -------------------------------------------------------------------------------- >>>> *** row group 1 of 1, values 1 to 9 *** >>>> value 1: R:1 D:1 V:a1 >>>> value 2: R:1 D:1 V:a2 >>>> value 3: R:1 D:1 V:a3 >>>> value 4: R:1 D:1 V:a4 >>>> value 5: R:1 D:1 V:a5 >>>> value 6: R:1 D:1 V:a6 >>>> value 7: R:1 D:1 V:a7 >>>> value 8: R:1 D:1 V:a8 >>>> value 9: R:1 D:1 V:a9 >>>> >>>> BINARY col_b >>>> -------------------------------------------------------------------------------- >>>> *** row group 1 of 1, values 1 to 9 *** >>>> value 1: R:1 D:1 V:b1 >>>> value 2: R:1 D:1 V:b2 >>>> value 3: R:1 D:1 V:b3 >>>> value 4: R:1 D:1 V:b4 >>>> value 5: R:1 D:1 V:b5 >>>> value 6: R:1 D:1 V:b6 >>>> value 7: R:1 D:1 V:b7 >>>> value 8: R:1 D:1 V:b8 >>>> value 9: R:1 D:1 V:b9 >>>> >>>> BINARY col_c >>>> -------------------------------------------------------------------------------- >>>> *** row group 1 of 1, values 1 to 9 *** >>>> value 1: R:1 D:1 V:c1 >>>> value 2: R:1 D:1 V:c2 >>>> value 3: R:1 D:1 V:c3 >>>> value 4: R:1 D:1 V:c4 >>>> value 5: R:1 D:1 V:c5 >>>> value 6: R:1 D:1 V:c6 >>>> value 7: R:1 D:1 V:c7 >>>> value 8: R:1 D:1 V:c8 >>>> value 9: R:1 D:1 V:c9 >>>> >>>> BINARY col_d.map.key >>>> -------------------------------------------------------------------------------- >>>> *** row group 1 of 1, values 1 to 9 *** >>>> value 1: R:0 D:0 V:<null> >>>> value 2: R:0 D:0 V:<null> >>>> value 3: R:0 D:0 V:<null> >>>> value 4: R:0 D:0 V:<null> >>>> value 5: R:0 D:0 V:<null> >>>> value 6: R:0 D:0 V:<null> >>>> value 7: R:0 D:0 V:<null> >>>> value 8: R:0 D:0 V:<null> >>>> value 9: R:0 D:0 V:<null> >>>> >>>> BINARY col_d.map.value.value_tuple >>>> -------------------------------------------------------------------------------- >>>> *** row group 1 of 1, values 1 to 9 *** >>>> value 1: R:0 D:0 V:<null> >>>> value 2: R:0 D:0 V:<null> >>>> value 3: R:0 D:0 V:<null> >>>> value 4: R:0 D:0 V:<null> >>>> value 5: R:0 D:0 V:<null> >>>> value 6: R:0 D:0 V:<null> >>>> value 7: R:0 D:0 V:<null> >>>> value 8: R:0 D:0 V:<null> >>>> value 9: R:0 D:0 V:<null> >>>> >>>> >>>> I am happy to provide more information but any help is appreciated. >>>> >>>> >>>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sadhan.s...@gmail.com> >>>> wrote: >>>> >>>>> Hi Cheng, >>>>> >>>>> I tried reading the parquet file(on which we were getting the >>>>> exception) through parquet-tools and it is able to dump the file and I can >>>>> read the metadata, etc. I also loaded the file through hive table and can >>>>> run a table scan query on it as well. Let me know if I can do more to help >>>>> resolve the problem, I'll run it through a debugger and see if I can get >>>>> more information on it in the meantime. >>>>> >>>>> Thanks, >>>>> Sadhan >>>>> >>>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <lian.cs....@gmail.com> >>>>> wrote: >>>>> >>>>>> (Forgot to cc user mail list) >>>>>> >>>>>> >>>>>> On 11/16/14 4:59 PM, Cheng Lian wrote: >>>>>> >>>>>> Hey Sadhan, >>>>>> >>>>>> Thanks for the additional information, this is helpful. Seems that >>>>>> some Parquet internal contract was broken, but I'm not sure whether it's >>>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was >>>>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind >>>>>> to help to narrow down the problem by trying to scan exactly the same >>>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other >>>>>> systems work, then there must be something wrong with Spark SQL. >>>>>> >>>>>> Cheng >>>>>> >>>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sadhan.s...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Cheng, >>>>>>> >>>>>>> Thanks for your response. Here is the stack trace from yarn logs: >>>>>>> >>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >>>>>>> at java.util.ArrayList.elementData(ArrayList.java:418) >>>>>>> at java.util.ArrayList.get(ArrayList.java:431) >>>>>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >>>>>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >>>>>>> at >>>>>>> parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) >>>>>>> at >>>>>>> parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) >>>>>>> at >>>>>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282) >>>>>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) >>>>>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) >>>>>>> at >>>>>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) >>>>>>> at >>>>>>> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) >>>>>>> at >>>>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) >>>>>>> at >>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) >>>>>>> ... 26 more >>>>>>> >>>>>>> >>>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <lian.cs....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Sadhan, >>>>>>>> >>>>>>>> Could you please provide the stack trace of the >>>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first >>>>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from >>>>>>>> the >>>>>>>> table to give COUNT(*). In the second case, however, the whole >>>>>>>> table is asked to be cached lazily via the cacheTable call, thus >>>>>>>> it’s scanned to build the in-memory columnar cache. Then thing went >>>>>>>> wrong >>>>>>>> while scanning this LZO compressed Parquet file. But unfortunately the >>>>>>>> stack trace at hand doesn’t indicate the root cause. >>>>>>>> >>>>>>>> Cheng >>>>>>>> >>>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote: >>>>>>>> >>>>>>>> While testing SparkSQL on a bunch of parquet files (basically used >>>>>>>> to be a partition for one of our hive tables), I encountered this >>>>>>>> error: >>>>>>>> >>>>>>>> import org.apache.spark.sql.SchemaRDD >>>>>>>> import org.apache.hadoop.fs.FileSystem; >>>>>>>> import org.apache.hadoop.conf.Configuration; >>>>>>>> import org.apache.hadoop.fs.Path; >>>>>>>> >>>>>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>>>>>>> >>>>>>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile) >>>>>>>> parquetFileRDD.registerTempTable("xyz_20141109") >>>>>>>> sqlContext.sql("SELECT count(*) FROM xyz_20141109").collect() <-- >>>>>>>> works fine >>>>>>>> sqlContext.cacheTable("xyz_20141109") >>>>>>>> sqlContext.sql("SELECT count(*) FROM xyz_20141109").collect() <-- >>>>>>>> fails with an exception >>>>>>>> >>>>>>>> parquet.io.ParquetDecodingException: Can not read value at 0 in >>>>>>>> block -1 in file >>>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet >>>>>>>> >>>>>>>> at >>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) >>>>>>>> >>>>>>>> at >>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>> >>>>>>>> at >>>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) >>>>>>>> >>>>>>>> at >>>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) >>>>>>>> >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>>>>>>> >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>>>>>>> >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>>>>>>> >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>>> >>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >>>>>>>> >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>> >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>> >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> >