Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520

On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust 
wrote:

> Can you open a JIRA?
>
> On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood 
> wrote:
>
>> I am running on master, pulled yesterday I believe but saw the same issue
>> with 1.2.0
>>
>> On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust > > wrote:
>>
>>> Which version are you running on again?
>>>
>>> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood 
>>> wrote:
>>>
 Also attaching the parquet file if anyone wants to take a further look.

 On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood 
 wrote:

> So, I am seeing this issue with spark sql throwing an exception when
> trying to read selective columns from a thrift parquet file and also when
> caching them:
> On some further digging, I was able to narrow it down to at-least one
> particular column type: map> to be causing this issue.
> To reproduce this I created a test thrift file with a very basic schema 
> and
> stored some sample data in a parquet file:
>
> Test.thrift
> ===
> typedef binary SomeId
>
> enum SomeExclusionCause {
>   WHITELIST = 1,
>   HAS_PURCHASE = 2,
> }
>
> struct SampleThriftObject {
>   10: string col_a;
>   20: string col_b;
>   30: string col_c;
>   40: optional map> col_d;
> }
> =
>
> And loading the data in spark through schemaRDD:
>
> import org.apache.spark.sql.SchemaRDD
> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
> val parquetFile = "/path/to/generated/parquet/file"
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.printSchema
> root
>  |-- col_a: string (nullable = true)
>  |-- col_b: string (nullable = true)
>  |-- col_c: string (nullable = true)
>  |-- col_d: map (nullable = true)
>  ||-- key: string
>  ||-- value: array (valueContainsNull = true)
>  |||-- element: string (containsNull = false)
>
> parquetFileRDD.registerTempTable("test")
> sqlContext.cacheTable("test")
> sqlContext.sql("select col_a from test").collect() <-- see the
> exception stack here
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not
> read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at jav

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Michael Armbrust
Can you open a JIRA?

On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood  wrote:

> I am running on master, pulled yesterday I believe but saw the same issue
> with 1.2.0
>
> On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust 
> wrote:
>
>> Which version are you running on again?
>>
>> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood 
>> wrote:
>>
>>> Also attaching the parquet file if anyone wants to take a further look.
>>>
>>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood 
>>> wrote:
>>>
 So, I am seeing this issue with spark sql throwing an exception when
 trying to read selective columns from a thrift parquet file and also when
 caching them:
 On some further digging, I was able to narrow it down to at-least one
 particular column type: map> to be causing this issue.
 To reproduce this I created a test thrift file with a very basic schema and
 stored some sample data in a parquet file:

 Test.thrift
 ===
 typedef binary SomeId

 enum SomeExclusionCause {
   WHITELIST = 1,
   HAS_PURCHASE = 2,
 }

 struct SampleThriftObject {
   10: string col_a;
   20: string col_b;
   30: string col_c;
   40: optional map> col_d;
 }
 =

 And loading the data in spark through schemaRDD:

 import org.apache.spark.sql.SchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc);
 val parquetFile = "/path/to/generated/parquet/file"
 val parquetFileRDD = sqlContext.parquetFile(parquetFile)
 parquetFileRDD.printSchema
 root
  |-- col_a: string (nullable = true)
  |-- col_b: string (nullable = true)
  |-- col_c: string (nullable = true)
  |-- col_d: map (nullable = true)
  ||-- key: string
  ||-- value: array (valueContainsNull = true)
  |||-- element: string (containsNull = false)

 parquetFileRDD.registerTempTable("test")
 sqlContext.cacheTable("test")
 sqlContext.sql("select col_a from test").collect() <-- see the
 exception stack here

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
 value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnI

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
I am running on master, pulled yesterday I believe but saw the same issue
with 1.2.0

On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust 
wrote:

> Which version are you running on again?
>
> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood 
> wrote:
>
>> Also attaching the parquet file if anyone wants to take a further look.
>>
>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood 
>> wrote:
>>
>>> So, I am seeing this issue with spark sql throwing an exception when
>>> trying to read selective columns from a thrift parquet file and also when
>>> caching them:
>>> On some further digging, I was able to narrow it down to at-least one
>>> particular column type: map> to be causing this issue.
>>> To reproduce this I created a test thrift file with a very basic schema and
>>> stored some sample data in a parquet file:
>>>
>>> Test.thrift
>>> ===
>>> typedef binary SomeId
>>>
>>> enum SomeExclusionCause {
>>>   WHITELIST = 1,
>>>   HAS_PURCHASE = 2,
>>> }
>>>
>>> struct SampleThriftObject {
>>>   10: string col_a;
>>>   20: string col_b;
>>>   30: string col_c;
>>>   40: optional map> col_d;
>>> }
>>> =
>>>
>>> And loading the data in spark through schemaRDD:
>>>
>>> import org.apache.spark.sql.SchemaRDD
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>> val parquetFile = "/path/to/generated/parquet/file"
>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.printSchema
>>> root
>>>  |-- col_a: string (nullable = true)
>>>  |-- col_b: string (nullable = true)
>>>  |-- col_c: string (nullable = true)
>>>  |-- col_d: map (nullable = true)
>>>  ||-- key: string
>>>  ||-- value: array (valueContainsNull = true)
>>>  |||-- element: string (containsNull = false)
>>>
>>> parquetFileRDD.registerTempTable("test")
>>> sqlContext.cacheTable("test")
>>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>>> stack here
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>>> value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>> at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>> at java.util.ArrayList.get(ArrayList.java:431)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>> at
>>> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>> a

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Michael Armbrust
Which version are you running on again?

On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood  wrote:

> Also attaching the parquet file if anyone wants to take a further look.
>
> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood 
> wrote:
>
>> So, I am seeing this issue with spark sql throwing an exception when
>> trying to read selective columns from a thrift parquet file and also when
>> caching them:
>> On some further digging, I was able to narrow it down to at-least one
>> particular column type: map> to be causing this issue.
>> To reproduce this I created a test thrift file with a very basic schema and
>> stored some sample data in a parquet file:
>>
>> Test.thrift
>> ===
>> typedef binary SomeId
>>
>> enum SomeExclusionCause {
>>   WHITELIST = 1,
>>   HAS_PURCHASE = 2,
>> }
>>
>> struct SampleThriftObject {
>>   10: string col_a;
>>   20: string col_b;
>>   30: string col_c;
>>   40: optional map> col_d;
>> }
>> =
>>
>> And loading the data in spark through schemaRDD:
>>
>> import org.apache.spark.sql.SchemaRDD
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>> val parquetFile = "/path/to/generated/parquet/file"
>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>> parquetFileRDD.printSchema
>> root
>>  |-- col_a: string (nullable = true)
>>  |-- col_b: string (nullable = true)
>>  |-- col_c: string (nullable = true)
>>  |-- col_d: map (nullable = true)
>>  ||-- key: string
>>  ||-- value: array (valueContainsNull = true)
>>  |||-- element: string (containsNull = false)
>>
>> parquetFileRDD.registerTempTable("test")
>> sqlContext.cacheTable("test")
>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>> stack here
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>> value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
>> at
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>> at
>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>> at
>> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>> at
>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>> at
>> parquet.hadoop.InternalParquet

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
Also attaching the parquet file if anyone wants to take a further look.

On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood  wrote:

> So, I am seeing this issue with spark sql throwing an exception when
> trying to read selective columns from a thrift parquet file and also when
> caching them:
> On some further digging, I was able to narrow it down to at-least one
> particular column type: map> to be causing this issue.
> To reproduce this I created a test thrift file with a very basic schema and
> stored some sample data in a parquet file:
>
> Test.thrift
> ===
> typedef binary SomeId
>
> enum SomeExclusionCause {
>   WHITELIST = 1,
>   HAS_PURCHASE = 2,
> }
>
> struct SampleThriftObject {
>   10: string col_a;
>   20: string col_b;
>   30: string col_c;
>   40: optional map> col_d;
> }
> =
>
> And loading the data in spark through schemaRDD:
>
> import org.apache.spark.sql.SchemaRDD
> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
> val parquetFile = "/path/to/generated/parquet/file"
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.printSchema
> root
>  |-- col_a: string (nullable = true)
>  |-- col_b: string (nullable = true)
>  |-- col_c: string (nullable = true)
>  |-- col_d: map (nullable = true)
>  ||-- key: string
>  ||-- value: array (valueContainsNull = true)
>  |||-- element: string (containsNull = false)
>
> parquetFileRDD.registerTempTable("test")
> sqlContext.cacheTable("test")
> sqlContext.sql("select col_a from test").collect() <-- see the exception
> stack here
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
> value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
> at
> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
> at
> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
> at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
> ... 27 more
>
> If you take out the col_d from the 

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
So, I am seeing this issue with spark sql throwing an exception when trying
to read selective columns from a thrift parquet file and also when caching
them:
On some further digging, I was able to narrow it down to at-least one
particular column type: map> to be causing this issue.
To reproduce this I created a test thrift file with a very basic schema and
stored some sample data in a parquet file:

Test.thrift
===
typedef binary SomeId

enum SomeExclusionCause {
  WHITELIST = 1,
  HAS_PURCHASE = 2,
}

struct SampleThriftObject {
  10: string col_a;
  20: string col_b;
  30: string col_c;
  40: optional map> col_d;
}
=

And loading the data in spark through schemaRDD:

import org.apache.spark.sql.SchemaRDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val parquetFile = "/path/to/generated/parquet/file"
val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.printSchema
root
 |-- col_a: string (nullable = true)
 |-- col_b: string (nullable = true)
 |-- col_c: string (nullable = true)
 |-- col_d: map (nullable = true)
 ||-- key: string
 ||-- value: array (valueContainsNull = true)
 |||-- element: string (containsNull = false)

parquetFileRDD.registerTempTable("test")
sqlContext.cacheTable("test")
sqlContext.sql("select col_a from test").collect() <-- see the exception
stack here

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
at
parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
... 27 more

If you take out the col_d from the thrift file, the problem goes away. The
problem also shows up when trying to read the particular column without
caching the table first. The same file can be dumped/read using
parquet-tools just fine. Here is the file dump using parquet-tools:

row group 0

co

Re: SparkSQL exception on cached parquet table

2014-11-16 Thread Sadhan Sood
Hi Cheng,

I tried reading the parquet file(on which we were getting the exception)
through parquet-tools and it is able to dump the file and I can read the
metadata, etc. I also loaded the file through hive table and can run a
table scan query on it as well. Let me know if I can do more to help
resolve the problem, I'll run it through a debugger and see if I can get
more information on it in the meantime.

Thanks,
Sadhan

On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian  wrote:

>  (Forgot to cc user mail list)
>
>
> On 11/16/14 4:59 PM, Cheng Lian wrote:
>
> Hey Sadhan,
>
>  Thanks for the additional information, this is helpful. Seems that some
> Parquet internal contract was broken, but I'm not sure whether it's caused
> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
> somehow. I'm investigating this. In the meanwhile, would you mind to help
> to narrow down the problem by trying to scan exactly the same Parquet file
> with some other systems (e.g. Hive or Impala)? If other systems work, then
> there must be something wrong with Spark SQL.
>
>  Cheng
>
> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood 
> wrote:
>
>> Hi Cheng,
>>
>>  Thanks for your response. Here is the stack trace from yarn logs:
>>
>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>> at 
>> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>> at 
>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>> at 
>> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>> at 
>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>> at 
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>> ... 26 more
>>
>>
>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian 
>> wrote:
>>
>>>  Hi Sadhan,
>>>
>>> Could you please provide the stack trace of the
>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first query
>>> succeeds is that Spark SQL doesn’t bother reading all data from the table
>>> to give COUNT(*). In the second case, however, the whole table is asked
>>> to be cached lazily via the cacheTable call, thus it’s scanned to build
>>> the in-memory columnar cache. Then thing went wrong while scanning this LZO
>>> compressed Parquet file. But unfortunately the stack trace at hand doesn’t
>>> indicate the root cause.
>>>
>>> Cheng
>>>
>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>
>>> While testing SparkSQL on a bunch of parquet files (basically used to be
>>> a partition for one of our hive tables), I encountered this error:
>>>
>>>  import org.apache.spark.sql.SchemaRDD
>>> import org.apache.hadoop.fs.FileSystem;
>>> import org.apache.hadoop.conf.Configuration;
>>> import org.apache.hadoop.fs.Path;
>>>
>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- works
>>> fine
>>> sqlContext.cacheTable("xyz_20141109")
>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- fails
>>> with an exception
>>>
>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in block
>>> -1 in file
>>> hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet
>>>
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>
>>> at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>
>>> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>
>>> at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>
>>> at
>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>
>>> at
>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>
>>> at
>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>
>>> at
>>> org.apache.spark.CacheManager.getOr

Re: SparkSQL exception on cached parquet table

2014-11-16 Thread Cheng Lian

(Forgot to cc user mail list)

On 11/16/14 4:59 PM, Cheng Lian wrote:

Hey Sadhan,

Thanks for the additional information, this is helpful. Seems that 
some Parquet internal contract was broken, but I'm not sure whether 
it's caused by Spark SQL or Parquet, or even maybe the Parquet file 
itself was damaged somehow. I'm investigating this. In the meanwhile, 
would you mind to help to narrow down the problem by trying to scan 
exactly the same Parquet file with some other systems (e.g. Hive or 
Impala)? If other systems work, then there must be something wrong 
with Spark SQL.


Cheng

On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood > wrote:


Hi Cheng,

Thanks for your response. Here is the stack trace from yarn logs:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at 
parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at 
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at 
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
 ... 26 more


On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian mailto:lian.cs@gmail.com>> wrote:

Hi Sadhan,

Could you please provide the stack trace of the
|ArrayIndexOutOfBoundsException| (if any)? The reason why the
first query succeeds is that Spark SQL doesn’t bother reading
all data from the table to give |COUNT(*)|. In the second
case, however, the whole table is asked to be cached lazily
via the |cacheTable| call, thus it’s scanned to build the
in-memory columnar cache. Then thing went wrong while scanning
this LZO compressed Parquet file. But unfortunately the stack
trace at hand doesn’t indicate the root cause.

Cheng

On 11/15/14 5:28 AM, Sadhan Sood wrote:


While testing SparkSQL on a bunch of parquet files (basically
used to be a partition for one of our hive tables), I
encountered this error:

import org.apache.spark.sql.SchemaRDD
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.registerTempTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM
xyz_20141109").collect() <-- works fine
sqlContext.cacheTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM
xyz_20141109").collect() <-- fails with an exception

parquet.io.ParquetDecodingException: Can not read value at 0
in block -1 in file

hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet

at

parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

at

parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

at
org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)

at

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

at
scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)

at
scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)

at

org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

at
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at
org.apache.spark.rdd.RDD.computeOrReadCheckpoin

Re: SparkSQL exception on cached parquet table

2014-11-15 Thread sadhan
Hi Cheng,

Thanks for your response.Here is the stack trace from yarn logs:





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-exception-on-cached-parquet-table-tp18978p19020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL exception on cached parquet table

2014-11-15 Thread Cheng Lian

Hi Sadhan,

Could you please provide the stack trace of the 
|ArrayIndexOutOfBoundsException| (if any)? The reason why the first 
query succeeds is that Spark SQL doesn’t bother reading all data from 
the table to give |COUNT(*)|. In the second case, however, the whole 
table is asked to be cached lazily via the |cacheTable| call, thus it’s 
scanned to build the in-memory columnar cache. Then thing went wrong 
while scanning this LZO compressed Parquet file. But unfortunately the 
stack trace at hand doesn’t indicate the root cause.


Cheng

On 11/15/14 5:28 AM, Sadhan Sood wrote:

While testing SparkSQL on a bunch of parquet files (basically used to 
be a partition for one of our hive tables), I encountered this error:


import org.apache.spark.sql.SchemaRDD
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.registerTempTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- 
works fine

sqlContext.cacheTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- 
fails with an exception


parquet.io.ParquetDecodingException: Can not read value at 0 in block 
-1 in file 
hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet


at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)


at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)


at 
org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)


at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)


at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)

at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)

at 
org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)


at 
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)


at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)


at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)


at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)


at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)


at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)


at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)


at org.apache.spark.scheduler.Task.run(Task.scala:56)

at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)


at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)


at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)


at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException



​