Extremely slow shuffle writes and large job time fluxuations

2016-07-19 Thread Jon Chase
I'm running into an issue with a pyspark job where I'm sometimes seeing
extremely variable job times (20min to 2hr) and very long shuffle times
(e.g. ~2 minutes for 18KB/86 records).

Cluster set up is Amazon EMR 4.4.0, Spark 1.6.0, an m4.2xl driver and a
single m4.10xlarge (40 vCPU, 160GB) executor with the following params:

--conf spark.driver.memory=10G --conf spark.default.parallelism=160 --conf
spark.driver.maxResultSize=4g --num-executors 2 --executor-cores 20
--executor-memory 67G

What's odd is that sometimes the job will run in 20 minutes and sometimes
it will take 2 hours - both jobs with the same data.  I'm using RDDs (not
DataFrames).  There's plenty of RAM, I've looked at the GC logs (using CMS)
and they look fine.  The job reads some data from files, does some
maps/filters/joins/etc; nothing too special.

The only thing I've noticed that looks odd is that the slow instances of
the job have unusually long Shuffle Write times for some tasks.  For
example, a .join operation has ~30 tasks out of 320 that take 2.5 minutes,
GC time of 0.1 seconds, Shuffle Read Size / Records of 12KB/30, and, most
interestingly, Write Time of 2.5 minutes for Shuffle Write Size / Records
of 18KB/86 records.  When looking at the event time line for the stage it's
almost all yellow (Shuffle Write).

We've been running this job on a difference EMR cluster topology (12
m3.2xlarge's) and have not seen the slow down described above.  We've only
observed it on the m4.10xl machine.

It might be worth mentioning again that this is pyspark and no DataFrames
(just RDDs).  When I run 'top' I sometimes see lots (e.g. 60 or 70) python
processes on the executor (I assume one per partition being processed?).

It seems like this has something to do with the single m4.10xl set up, as
we haven't seen this behavior on the 12 m3.2xl cluster.

What I really don't understand is why the job seems to run fine (20
minutes) for a while, and then (for the same data) takes so much longer (2
hours), and with such long shuffle write times.


Re: About memory leak in spark 1.4.1

2015-09-28 Thread Jon Chase
I'm seeing a similar (same?) problem on Spark 1.4.1 running on Yarn (Amazon
EMR, Java 8).  I'm running a Spark Streaming app 24/7 and system memory
eventually gets exhausted after about 3 days and the JVM process dies with:

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 12288 bytes for committing
reserved memory.
# An error report file with more information is saved as:
#
/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1442933070871_0002/container_1442933070871_0002_01_02/hs_err_pid19082.log
[thread 139846843156224 also had an error]


To reiterate what Sea said, the heap is fine, this is NOT a heap memory
issue - I've monitored it with scripts and also observed it via VisualVm -
this is an off heap issue.  I ran pmap on the pid of CoarseGrainedExecutor,
spaced about 5 hours apart, and saw several 64mb chunks of off heap memory
allocated in that time:


7fccd400  65500K rw---[ anon ]
7fccd7ff7000 36K -[ anon ]
7fccd800  65528K rw---[ anon ]
7fccdbffe000  8K -[ anon ]
7fccdc00  65504K rw---[ anon ]
7fccdfff8000 32K -[ anon ]
7fcce000  65536K rw---[ anon ]
7fcce400  65508K rw---[ anon ]
7fcce7ff9000 28K -[ anon ]
7fcce800  65524K rw---[ anon ]
7fccebffd000 12K -[ anon ]
7fccec00  65532K rw---[ anon ]
7fcce000  4K -[ anon ]
7fccf000  65496K rw---[ anon ]
7fccf3ff6000 40K -[ anon ]
7fccf400  65496K rw---[ anon ]
7fccf7ff6000 40K -[ anon ]
7fccf800  65532K rw---[ anon ]
7fccfbfff000  4K -[ anon ]
7fccfc00  65520K rw---[ anon ]
7fccc000 16K -[ anon ]
7fcd  65508K rw---[ anon ]
7fcd03ff9000 28K -[ anon ]

Over these 8 hours, total memory usage by the JVM (as reported by top) had
grown ~786mb over 5 hours, or basically the sum of those 13 64mb chunks.

I dumped the memory from /proc/pid/, and was able to see a bunch of lines
from the data files that my Spark job is processing, but I couldn't tell
figure out what was actually creating these 64mb chunks.   I thought it
might be netty so I set spark.shuffle.io.preferDirectBufs to false, but
that hasn't changed anything.

The only thing I see in the config page regarding "64mb" is
spark.kryoserializer.buffer.max, which defaults to 64mb.  I'll try setting
that to something different, but as far as I know, kryo is not doing
anything off heap.

Still wondering if this could be netty, or maybe something Akka is doing if
it's using off heap mem?

There were not ERROR messages in the executor (or driver's) logs during
this time.

Any help would be greatly appreciated.  This issue continues to cause our
streaming apps to die every few days ,which is...less than ideal! :)

On Wed, Aug 5, 2015 at 9:10 AM, Sea <261810...@qq.com> wrote:

> No one help me... I help myself, I split the cluster to two cluster
> 1.4.1 and 1.3.0
>
>
> -- 原始邮件 --
> *发件人:* "Ted Yu";;
> *发送时间:* 2015年8月4日(星期二) 晚上10:28
> *收件人:* "Igor Berman";
> *抄送:* "Sea"<261810...@qq.com>; "Barak Gitsis"; "
> user@spark.apache.org"; "rxin";
> "joshrosen"; "davies";
> *主题:* Re: About memory leak in spark 1.4.1
>
> w.r.t. spark.deploy.spreadOut , here is the scaladoc:
>
>   // As a temporary workaround before better ways of configuring memory,
> we allow users to set
>   // a flag that will perform round-robin scheduling across the nodes
> (spreading out each app
>   // among all the nodes) instead of trying to consolidate each app onto a
> small # of nodes.
>   private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
> true)
>
> Cheers
>
> On Tue, Aug 4, 2015 at 4:13 AM, Igor Berman  wrote:
>
>> sorry, can't disclose info about my prod cluster
>>
>> nothing jumps into my mind regarding your config
>> we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there
>> is no documentation regarding this)
>>
>> If you are sure that you don't have memory leak in your business logic I
>> would try to reset each property to default(or just remove it from your
>> config) and try to run your job to see if it's not
>> somehow connected
>>
>> my config(nothing special really)
>> spark.shuffle.consolidateFiles true
>> spark.speculation false
>> spark.executor.extraJavaOptions -XX:+UseStringCache
>> -XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc
>> spark.executor.logs.rolling.maxRetainedFiles 1000
>> spark.executor.logs.rolling.strategy time
>> spark.worker.cleanup.enabled true
>> 

Re: Possible to combine all RDDs from a DStream batch into one?

2015-07-15 Thread Jon Chase
I should note that the amount of data in each batch is very small, so I'm
not concerned with performance implications of grouping into a single RDD.

On Wed, Jul 15, 2015 at 9:58 PM, Jon Chase jon.ch...@gmail.com wrote:

 I'm currently doing something like this in my Spark Streaming program
 (Java):

 dStream.foreachRDD((rdd, batchTime) - {
 log.info(processing RDD from batch {}, batchTime);
 
 // my rdd processing code
 
 });

 Instead of having my rdd processing code called once for each RDD in the
 batch, is it possible to essentially group all of the RDDs from the batch
 into a single RDD and single partition and therefore operate on all of the
 elements in the batch at once?

 My goal here is to do an operation exactly once for every batch.  As I
 understand it, foreachRDD is going to do the operation once for each RDD in
 the batch, which is not what I want.

 I've looked at DStream.repartition(int), but the docs make it sound like
 it only changes the number of partitions in the batch's existing RDDs, not
 the number of RDDs.



Possible to combine all RDDs from a DStream batch into one?

2015-07-15 Thread Jon Chase
I'm currently doing something like this in my Spark Streaming program
(Java):

dStream.foreachRDD((rdd, batchTime) - {
log.info(processing RDD from batch {}, batchTime);

// my rdd processing code

});

Instead of having my rdd processing code called once for each RDD in the
batch, is it possible to essentially group all of the RDDs from the batch
into a single RDD and single partition and therefore operate on all of the
elements in the batch at once?

My goal here is to do an operation exactly once for every batch.  As I
understand it, foreachRDD is going to do the operation once for each RDD in
the batch, which is not what I want.

I've looked at DStream.repartition(int), but the docs make it sound like it
only changes the number of partitions in the batch's existing RDDs, not the
number of RDDs.


Re: RDD collect hangs on large input data

2015-04-07 Thread Jon Chase
Zsolt - what version of Java are you running?

On Mon, Mar 30, 2015 at 7:12 AM, Zsolt Tóth toth.zsolt@gmail.com
wrote:

 Thanks for your answer!
 I don't call .collect because I want to trigger the execution. I call it
 because I need the rdd on the driver. This is not a huge RDD and it's not
 larger than the one returned with 50GB input data.

 The end of the stack trace:

 The two IP's are the two worker nodes, I think they can't connect to the
 driver after they finished their part of the collect().

 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 
 (TID 1745). 1414 bytes result sent to driver
 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with 
 curMem=405753, maxMem=4883742720
 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values 
 in memory (estimated size 200.0 B, free 4.5 GB)
 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with 
 curMem=405953, maxMem=4883742720
 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values 
 in memory (estimated size 80.0 B, free 4.5 GB)
 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block 
 rdd_4_867
 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 
 (TID 1740). 1440 bytes result sent to driver
 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block 
 rdd_4_868
 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 
 (TID 1741). 1422 bytes result sent to driver
 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in 
 connection from /10.102.129.251:42026
 java.io.IOException: Connection timed out
   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
   at 
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
   at 
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
   at java.lang.Thread.run(Thread.java:745)
 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in 
 connection from /10.102.129.251:41703
 java.io.IOException: Connection timed out
   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
   at 
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
   at 
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
   at java.lang.Thread.run(Thread.java:745)
 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in 
 connection from /10.99.144.92:49021
 java.io.IOException: Connection timed out
   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
   at 
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
   at 
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
   at 
 

Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet

2015-03-27 Thread Jon Chase
https://issues.apache.org/jira/browse/SPARK-6570

I also left in the call to saveAsParquetFile(), as it produced a similar
exception (though there was no use of explode there).

On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com wrote:

  This should be a bug in the Explode.eval(), which always assumes the
 underlying SQL array is represented by a Scala Seq. Would you mind to open
 a JIRA ticket for this? Thanks!

 Cheng

 On 3/27/15 7:00 PM, Jon Chase wrote:

 Spark 1.3.0

  Two issues:

  a) I'm unable to get a lateral view explode query to work on an array
 type
 b) I'm unable to save an array type to a Parquet file

  I keep running into this:

java.lang.ClassCastException: [I cannot be cast to
 scala.collection.Seq

  Here's a stack trace from the explode issue:

  root
  |-- col1: string (nullable = false)
  |-- col2s: array (nullable = true)
  ||-- element: integer (containsNull = true)

  ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage
 1.0 (TID 15)
 java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
  at
 org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
 ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
 ~[spark-sql_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
 ~[spark-sql_2.10-1.3.0.jar:1.3.0]
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 [na:1.8.0_31]
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 [na:1.8.0_31]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
 WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
 15, localhost): java.lang.ClassCastException: [I cannot be cast to
 scala.collection.Seq
  at
 org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48

Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet

2015-03-27 Thread Jon Chase
Spark 1.3.0

Two issues:

a) I'm unable to get a lateral view explode query to work on an array type
b) I'm unable to save an array type to a Parquet file

I keep running into this:

  java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq

Here's a stack trace from the explode issue:

root
 |-- col1: string (nullable = false)
 |-- col2s: array (nullable = true)
 ||-- element: integer (containsNull = true)

ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0
(TID 15)
java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
at
org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
~[spark-sql_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
~[spark-sql_2.10-1.3.0.jar:1.3.0]
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
~[scala-library-2.10.4.jar:na]
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:64)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_31]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
15, localhost): java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq
at
org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at 

Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet

2015-03-27 Thread Jon Chase
Done.  I also updated the name on the ticket to include both issues.
 Spark SQL arrays: explode() fails and cannot save array type to Parquet

https://issues.apache.org/jira/browse/SPARK-6570

On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian lian.cs@gmail.com wrote:

  Forgot to mention that, would you mind to also provide the full stack
 trace of the exception thrown in the saveAsParquetFile call? Thanks!

 Cheng

 On 3/27/15 7:35 PM, Jon Chase wrote:

 https://issues.apache.org/jira/browse/SPARK-6570

  I also left in the call to saveAsParquetFile(), as it produced a similar
 exception (though there was no use of explode there).

 On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com wrote:

  This should be a bug in the Explode.eval(), which always assumes the
 underlying SQL array is represented by a Scala Seq. Would you mind to open
 a JIRA ticket for this? Thanks!

 Cheng

 On 3/27/15 7:00 PM, Jon Chase wrote:

 Spark 1.3.0

  Two issues:

  a) I'm unable to get a lateral view explode query to work on an array
 type
 b) I'm unable to save an array type to a Parquet file

  I keep running into this:

java.lang.ClassCastException: [I cannot be cast to
 scala.collection.Seq

  Here's a stack trace from the explode issue:

  root
  |-- col1: string (nullable = false)
  |-- col2s: array (nullable = true)
  ||-- element: integer (containsNull = true)

  ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage
 1.0 (TID 15)
 java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
  at
 org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
 ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
 ~[spark-sql_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
 ~[spark-sql_2.10-1.3.0.jar:1.3.0]
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 ~[scala-library-2.10.4.jar:na]
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 ~[scala-library-2.10.4.jar:na]
  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 ~[spark-core_2.10-1.3.0.jar:1.3.0]
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 [na:1.8.0_31]
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 [na:1.8.0_31]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
 WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
 15, localhost): java.lang.ClassCastException: [I cannot be cast to
 scala.collection.Seq
  at
 org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
  at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
  at scala.collection.Iterator

Column not found in schema when querying partitioned table

2015-03-26 Thread Jon Chase
Spark 1.3.0, Parquet

I'm having trouble referencing partition columns in my queries.

In the following example, 'probeTypeId' is a partition column.  For
example, the directory structure looks like this:

/mydata
/probeTypeId=1
...files...
/probeTypeId=2
...files...

I see the column when I reference load a DF using the /mydata directory and
call df.printSchema():

...
 |-- probeTypeId: integer (nullable = true)
...

Parquet is also aware of the column:
 optional int32 probeTypeId;

And this works fine:

sqlContext.sql(select probeTypeId from df limit 1);

...as does df.show() - it shows the correct values for the partition column.


However, when I try to use a partition column in a where clause, I get an
exception stating that the column was not found in the schema:

sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1);



...
...
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column
[probeTypeId] was not found in schema!
at parquet.Preconditions.checkArgument(Preconditions.java:47)
at
parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172)
at
parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160)
at
parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142)
at
parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76)
at
parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41)
at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162)
at
parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46)
at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41)
at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22)
at
parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108)
at
parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28)
at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158)
...
...



What am I doing wrong?







Here's the full stack trace:

using local[*] for master
06:05:55,675 |-INFO in
ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute
not set
06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
Naming appender as [STDOUT]
06:05:55,721 |-INFO in
ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default
type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder]
property
06:05:55,768 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction
- Setting level of ROOT logger to INFO
06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction -
Attaching appender named [STDOUT] to Logger[ROOT]
06:05:55,769 |-INFO in
ch.qos.logback.classic.joran.action.ConfigurationAction - End of
configuration.
06:05:55,770 |-INFO in
ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering
current configuration as safe fallback point

INFO  org.apache.spark.SparkContext Running Spark version 1.3.0
WARN  o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
INFO  org.apache.spark.SecurityManager Changing view acls to: jon
INFO  org.apache.spark.SecurityManager Changing modify acls to: jon
INFO  org.apache.spark.SecurityManager SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(jon); users
with modify permissions: Set(jon)
INFO  akka.event.slf4j.Slf4jLogger Slf4jLogger started
INFO  Remoting Starting remoting
INFO  Remoting Remoting started; listening on addresses :[akka.tcp://
sparkDriver@192.168.1.134:62493]
INFO  org.apache.spark.util.Utils Successfully started service
'sparkDriver' on port 62493.
INFO  org.apache.spark.SparkEnv Registering MapOutputTracker
INFO  org.apache.spark.SparkEnv Registering BlockManagerMaster
INFO  o.a.spark.storage.DiskBlockManager Created local directory at
/var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-150e23b2-ff19-4a51-8cfc-25fb8e1b3f2b/blockmgr-6eea286c-7473-4bda-8886-7250156b68f4
INFO  org.apache.spark.storage.MemoryStore MemoryStore started with
capacity 1966.1 MB
INFO  org.apache.spark.HttpFileServer HTTP File server directory is
/var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-cf4687bd-1563-4ddf-b697-21c96fd95561/httpd-6343b9c9-bb66-43ac-ac43-6da80c7a1f95
INFO  org.apache.spark.HttpServer Starting HTTP Server
INFO  o.spark-project.jetty.server.Server 

Re: Column not found in schema when querying partitioned table

2015-03-26 Thread Jon Chase
I've filed this as https://issues.apache.org/jira/browse/SPARK-6554

On Thu, Mar 26, 2015 at 6:29 AM, Jon Chase jon.ch...@gmail.com wrote:

 Spark 1.3.0, Parquet

 I'm having trouble referencing partition columns in my queries.

 In the following example, 'probeTypeId' is a partition column.  For
 example, the directory structure looks like this:

 /mydata
 /probeTypeId=1
 ...files...
 /probeTypeId=2
 ...files...

 I see the column when I reference load a DF using the /mydata directory
 and call df.printSchema():

 ...
  |-- probeTypeId: integer (nullable = true)
 ...

 Parquet is also aware of the column:
  optional int32 probeTypeId;

 And this works fine:

 sqlContext.sql(select probeTypeId from df limit 1);

 ...as does df.show() - it shows the correct values for the partition
 column.


 However, when I try to use a partition column in a where clause, I get an
 exception stating that the column was not found in the schema:

 sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1);



 ...
 ...
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column
 [probeTypeId] was not found in schema!
 at parquet.Preconditions.checkArgument(Preconditions.java:47)
 at
 parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172)
 at
 parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160)
 at
 parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142)
 at
 parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76)
 at
 parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41)
 at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162)
 at
 parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46)
 at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41)
 at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22)
 at
 parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108)
 at
 parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28)
 at
 parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158)
 ...
 ...



 What am I doing wrong?







 Here's the full stack trace:

 using local[*] for master
 06:05:55,675 |-INFO in
 ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute
 not set
 06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
 About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
 06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
 Naming appender as [STDOUT]
 06:05:55,721 |-INFO in
 ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default
 type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder]
 property
 06:05:55,768 |-INFO in
 ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of
 ROOT logger to INFO
 06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction
 - Attaching appender named [STDOUT] to Logger[ROOT]
 06:05:55,769 |-INFO in
 ch.qos.logback.classic.joran.action.ConfigurationAction - End of
 configuration.
 06:05:55,770 |-INFO in
 ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering
 current configuration as safe fallback point

 INFO  org.apache.spark.SparkContext Running Spark version 1.3.0
 WARN  o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 INFO  org.apache.spark.SecurityManager Changing view acls to: jon
 INFO  org.apache.spark.SecurityManager Changing modify acls to: jon
 INFO  org.apache.spark.SecurityManager SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(jon); users
 with modify permissions: Set(jon)
 INFO  akka.event.slf4j.Slf4jLogger Slf4jLogger started
 INFO  Remoting Starting remoting
 INFO  Remoting Remoting started; listening on addresses :[akka.tcp://
 sparkDriver@192.168.1.134:62493]
 INFO  org.apache.spark.util.Utils Successfully started service
 'sparkDriver' on port 62493.
 INFO  org.apache.spark.SparkEnv Registering MapOutputTracker
 INFO  org.apache.spark.SparkEnv Registering BlockManagerMaster
 INFO  o.a.spark.storage.DiskBlockManager Created local directory at
 /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-150e23b2-ff19-4a51-8cfc-25fb8e1b3f2b/blockmgr-6eea286c-7473-4bda-8886-7250156b68f4
 INFO  org.apache.spark.storage.MemoryStore MemoryStore started with
 capacity 1966.1 MB
 INFO  org.apache.spark.HttpFileServer HTTP File server directory is
 /var

Spark SQL queries hang forever

2015-03-26 Thread Jon Chase
Spark 1.3.0 on YARN (Amazon EMR), cluster of 10 m3.2xlarge (8cpu, 30GB),
executor memory 20GB, driver memory 10GB

I'm using Spark SQL, mainly via spark-shell, to query 15GB of data spread
out over roughly 2,000 Parquet files and my queries frequently hang. Simple
queries like select count(*) from ... on the entire data set work ok.
Slightly more demanding ones with group by's and some aggregate functions
(percentile_approx, avg, etc.) work ok as well, as long as I have some
criteria in my where clause to keep the number of rows down.

Once I hit some limit on query complexity and rows processed, my queries
start to hang.  I've left them for up to an hour without seeing any
progress.  No OOM's either - the job is just stuck.

I've tried setting spark.sql.shuffle.partitions to 400 and even 800, but
with the same results: usually near the end of the tasks (like 780 of 800
complete), progress just stops:

15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 788.0 in
stage 1.0 (TID 1618) in 800 ms on
ip-10-209-22-211.eu-west-1.compute.internal (748/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 793.0 in
stage 1.0 (TID 1623) in 622 ms on
ip-10-105-12-41.eu-west-1.compute.internal (749/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 797.0 in
stage 1.0 (TID 1627) in 616 ms on ip-10-90-2-201.eu-west-1.compute.internal
(750/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 799.0 in
stage 1.0 (TID 1629) in 611 ms on ip-10-90-2-201.eu-west-1.compute.internal
(751/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 795.0 in
stage 1.0 (TID 1625) in 669 ms on
ip-10-105-12-41.eu-west-1.compute.internal (752/800)

^^^ this is where it stays forever

Looking at the Spark UI, several of the executors still list active tasks.
I do see that the Shuffle Read for executors that don't have any tasks
remaining is around 100MB, whereas it's more like 10MB for the executors
that still have tasks.

The first stage, mapPartitions, always completes fine.  It's the second
stage (takeOrdered), that hangs.

I've had this issue in 1.2.0 and 1.2.1 as well as 1.3.0.  I've also
encountered it when using JSON files (instead of Parquet).

Thoughts?  I'm blocked on using Spark SQL b/c most of the queries I do are
having this issue.


Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

2015-03-24 Thread Jon Chase
Shahab -

This should do the trick until Hao's changes are out:


sqlContext.sql(create temporary function foobar as 'com.myco.FoobarUDAF');

sqlContext.sql(select foobar(some_column) from some_table);


This works without requiring to 'deploy' a JAR with the UDAF in it - just
make sure the UDAF is in your project's classpath.




On Tue, Mar 10, 2015 at 8:21 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Oh, sorry, my bad, currently Spark SQL doesn't provide the user
 interface for UDAF, but it can work seamlessly with Hive UDAF (via
 HiveContext).



 I am also working on the UDAF interface refactoring, after that we can
 provide the custom interface for extension.



 https://github.com/apache/spark/pull/3247





 *From:* shahab [mailto:shahab.mok...@gmail.com]
 *Sent:* Wednesday, March 11, 2015 1:44 AM
 *To:* Cheng, Hao
 *Cc:* user@spark.apache.org
 *Subject:* Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how?



 Thanks Hao,

 But my question concerns UDAF (user defined aggregation function ) not
 UDTF( user defined type function ).

 I appreciate if you could point me to some starting point on UDAF
 development in Spark.



 Thanks

 Shahab

 On Tuesday, March 10, 2015, Cheng, Hao hao.ch...@intel.com wrote:

  Currently, Spark SQL doesn't provide interface for developing the custom
 UDTF, but it can work seamless with Hive UDTF.



 I am working on the UDTF refactoring for Spark SQL, hopefully will provide
 an Hive independent UDTF soon after that.



 *From:* shahab [mailto:shahab.mok...@gmail.com]
 *Sent:* Tuesday, March 10, 2015 5:44 PM
 *To:* user@spark.apache.org
 *Subject:* Registering custom UDAFs with HiveConetxt in SparkSQL, how?



 Hi,



 I need o develop couple of UDAFs and use them in the SparkSQL. While UDFs
 can be registered as a function in HiveContext, I could not find any
 documentation of how UDAFs can be registered in the HiveContext?? so far
 what I have found is to make a JAR file, out of developed UDAF class, and
 then deploy the JAR file to SparkSQL .



 But is there any way to avoid deploying the jar file and register it
 programmatically?





 best,

 /Shahab




Re: ReceiverInputDStream#saveAsTextFiles with a S3 URL results in double forward slash key names in S3

2014-12-23 Thread Jon Chase
I've had a lot of difficulties with using the s3:// prefix.  s3n:// seems
to work much better.  Can't find the link ATM, but seems I recall that
s3:// (Hadoop's original block format for s3) is no longer recommended for
use.  Amazon's EMR goes so far as to remap the s3:// to s3n:// behind the
scenes.

On Tue, Dec 23, 2014 at 9:29 AM, Enno Shioji eshi...@gmail.com wrote:

 ᐧ
 I filed a new issue HADOOP-11444. According to HADOOP-10372, s3 is likely
 to be deprecated anyway in favor of s3n.
 Also the comment section notes that Amazon has implemented an
 EmrFileSystem for S3 which is built using AWS SDK rather than JetS3t.




 On Tue, Dec 23, 2014 at 2:06 PM, Enno Shioji eshi...@gmail.com wrote:

 Hey Jay :)

 I tried s3n which uses the Jets3tNativeFileSystemStore, and the double
 slash went away.
 As far as I can see, it does look like a bug in hadoop-common; I'll file
 a ticket for it.

 Hope you are doing well, by the way!

 PS:
  Jets3tNativeFileSystemStore's implementation of pathToKey is:
 ==
   private static String pathToKey(Path path) {
 if (path.toUri().getScheme() != null 
 path.toUri().getPath().isEmpty()) {
   // allow uris without trailing slash after bucket to refer to root,
   // like s3n://mybucket
   return ;
 }
 if (!path.isAbsolute()) {
   throw new IllegalArgumentException(Path must be absolute:  +
 path);
 }
 String ret = path.toUri().getPath().substring(1); // remove initial
 slash
 if (ret.endsWith(/)  (ret.indexOf(/) != ret.length() - 1)) {
   ret = ret.substring(0, ret.length() -1);
   }
 return ret;
   }
 ==

 whereas Jets3tFileSystemStore uses:
 ==
   private String pathToKey(Path path) {
 if (!path.isAbsolute()) {
   throw new IllegalArgumentException(Path must be absolute:  +
 path);
 }
 return path.toUri().getPath();
   }
 ==






 On Tue, Dec 23, 2014 at 1:07 PM, Jay Vyas jayunit100.apa...@gmail.com
 wrote:

 Hi enno.  Might be worthwhile to cross post this on dev@hadoop...
 Obviously a simple spark way to test this would be to change the uri to
 write to hdfs:// or maybe you could do file:// , and confirm that the extra
 slash goes away.

 - if it's indeed a jets3t issue we should add a new unit test for this
 if the hcfs tests are passing for jets3tfilesystem, yet this error still
 exists.

 - To learn how to run HCFS tests against any FileSystem , see the wiki
 page : https://wiki.apache.org/hadoop/HCFS/Progress (see the July 14th
 entry on that page).

 - Is there another S3FileSystem implementation for AbstractFileSystem or
 is jets3t the only one?  That would be a easy  way to test this. And also a
 good workaround.

 I'm wondering, also why jets3tfilesystem is the AbstractFileSystem used
 by so many - is that the standard impl for storing using AbstractFileSystem
 interface?

 On Dec 23, 2014, at 6:06 AM, Enno Shioji eshi...@gmail.com wrote:

 Is anybody experiencing this? It looks like a bug in JetS3t to me, but
 thought I'd sanity check before filing an issue.


 
 I'm writing to S3 using ReceiverInputDStream#saveAsTextFiles with a S3
 URL (s3://fake-test/1234).

 The code does write to S3, but with double forward slashes (e.g.
 s3://fake-test//1234/-141933428/.

 I did a debug and it seem like the culprit is
 Jets3tFileSystemStore#pathToKey(path), which returns /fake-test/1234/...
 for the input s3://fake-test/1234/ when it should hack off the first
 forward slash. However, I couldn't find any bug report for JetS3t for this.

 Am I missing something, or is this likely a JetS3t bug?
 







Re: JavaRDD (Data Aggregation) based on key

2014-12-23 Thread Jon Chase
Have a look at RDD.groupBy(...) and reduceByKey(...)

On Tue, Dec 23, 2014 at 4:47 AM, sachin Singh sachin.sha...@gmail.com
wrote:

 Hi,
 I have a csv file having fields as a,b,c .
 I want to do aggregation(sum,average..) based on any field(a,b or c) as per
 user input,
 using Apache Spark Java API,Please Help Urgent!

 Thanks in advance,

 Regards
 Sachin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-Data-Aggregation-based-on-key-tp20828.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: S3 files , Spark job hungsup

2014-12-23 Thread Jon Chase
http://www.jets3t.org/toolkit/configuration.html

Put the following properties in a file named jets3t.properties and make
sure it is available during the running of your Spark job (just place it in
~/ and pass a reference to it when calling spark-submit with --file
~/jets3t.properties)



httpclient.connection-timeout-ms How many milliseconds to wait before a
connection times out. 0 means infinity.
Default: 6

httpclient.socket-timeout-ms
How many milliseconds to wait before a socket connection times out. 0 means
infinity.
Default: 6




You will also probably want to increase this value substantially:

httpclient.max-connections
The maximum number of simultaneous connections to allow globally
Default: 20
Note: If you have a fast Internet connection, you can improve the
performance of your S3 client by increasing this setting and the
corresponding S3 Service properties s3service.max-thread-count and
s3service.admin-max-thread-count. However, be careful because if you
increase this value too much for your connection you may exceed your
available bandwidth and cause communications errors.



On Mon, Dec 22, 2014 at 1:20 PM, durga katakam durgak...@gmail.com wrote:

 Yes . I am reading thousands of files every hours. Is there any way I can
 tell spark to timeout.
 Thanks for your help.

 -D

 On Mon, Dec 22, 2014 at 4:57 AM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Is it possible too many connections open to read from s3 from one node? I
 have this issue before because I open a few hundreds of files on s3 to read
 from one node. It just block itself without error until timeout later.

 On Monday, December 22, 2014, durga durgak...@gmail.com wrote:

 Hi All,

 I am facing a strange issue sporadically. occasionally my spark job is
 hungup on reading s3 files. It is not throwing exception . or making some
 progress, it is just hungs up there.

 Is this a known issue , Please let me know how could I solve this issue.

 Thanks,
 -D



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3-files-Spark-job-hungsup-tp20806.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Downloads from S3 exceedingly slow when running on spark-ec2

2014-12-23 Thread Jon Chase
Turns out I was using the s3:// prefix (in a standalone Spark cluster).  It
was writing a LOT of block_* files to my S3 bucket, which was the cause for
the slowness.  I was coming from Amazon EMR, where Amazon's underlying FS
implementation has re-mapped s3:// to s3n://, which doesn't use the block_*
files.

On Sat, Dec 20, 2014 at 8:17 PM, Paul Brown p...@mult.ifario.us wrote:


 I would suggest checking out disk IO on the nodes in your cluster and then
 reading up on the limiting behaviors that accompany different kinds of EC2
 storage.  Depending on how things are configured for your nodes, you may
 have a local storage configuration that provides bursty IOPS where you
 get apparently good performance at first and then limiting kicks in and
 slows down the rate at which you can write data to local storage.


 --
 p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

 On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm running a very simple Spark application that downloads files from S3,
 does a bit of mapping, then uploads new files.  Each file is roughly 2MB
 and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
 having any download speed issues (Amazon's EMR provides a custom
 implementation of the s3n:// file system, FWIW).

 When I say exceedingly slow, I mean that it takes about 2 minutes to
 download and process a 2MB file (this was taking ~2 seconds on the same
 instance types in Amazon's EMR).  When I download the same file from the
 EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
 other bandwidth checks for downloads from other external hosts - no speed
 problems there.

 Tried this w/Spark 1.1.0 and 1.1.1.

 When I do a thread dump on a worker, I typically see this a lot:



 Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000
 nid=0x59e9 runnable [0x7fd1f7dfb000]
java.lang.Thread.State: RUNNABLE
 at java.net.SocketInputStream.socketRead0(Native Method)
 at java.net.SocketInputStream.read(SocketInputStream.java:152)
 at java.net.SocketInputStream.read(SocketInputStream.java:122)
 at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
 at sun.security.ssl.InputRecord.read(InputRecord.java:480)
 at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
 - locked 0x0007e44dd140 (a java.lang.Object)
 at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
 at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
 - locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream)
 at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
 - locked 0x0007e44ea800 (a java.io.BufferedInputStream)
 at
 org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
 at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
 at
 org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
 at
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
 at
 org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
 at
 org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
 at
 org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
 at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
 at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
 at
 org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
 at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
 at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
 at
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus

Re: Fetch Failure

2014-12-19 Thread Jon Chase
I'm getting the same error (ExecutorLostFailure) - input RDD is 100k
small files (~2MB each).  I do a simple map, then keyBy(), and then
rdd.saveAsHadoopDataset(...).  Depending on the memory settings given to
spark-submit, the time before the first ExecutorLostFailure varies (more
memory == longer until failure) - but this usually happens after about 100
files being processed.

I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is
killing the executor b/c it thinks it's exceeding memory.  However, I can't
repro any OOM issues when running locally, no matter the size of the data
set.

It seems like Yarn thinks the heap size is increasing according to the Yarn
logs:

2014-12-18 22:06:43,505 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Memory usage of ProcessTree 24273 for container-id
container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory
used; 13.8 GB of 32.5 GB virtual memory used
2014-12-18 22:06:46,516 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Memory usage of ProcessTree 24273 for container-id
container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
used; 13.9 GB of 32.5 GB virtual memory used
2014-12-18 22:06:49,524 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Memory usage of ProcessTree 24273 for container-id
container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
used; 14.0 GB of 32.5 GB virtual memory used
2014-12-18 22:06:52,531 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Memory usage of ProcessTree 24273 for container-id
container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory
used; 14.1 GB of 32.5 GB virtual memory used
2014-12-18 22:06:55,538 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Memory usage of ProcessTree 24273 for container-id
container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
used; 14.2 GB of 32.5 GB virtual memory used
2014-12-18 22:06:58,549 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Memory usage of ProcessTree 24273 for container-id
container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
used; 14.3 GB of 32.5 GB virtual memory used
2014-12-18 22:06:58,549 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Process tree for container:
container_1418928607193_0011_01_02 has processes older than 1 iteration
running over the configured limit. Limit=6979321856, current usage =
6995812352
2014-12-18 22:06:58,549 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Container
[pid=24273,containerID=container_1418928607193_0011_01_02] is running
beyond physical memory limits. Current usage: 6.5 GB of 6.5 GB physical
memory used; 14.3 GB of 32.5 GB virtual memory used. Killing container.
Dump of the process-tree for container_1418928607193_0011_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 24273 4304 24273 24273 (bash) 0 0 115630080 302 /bin/bash -c
/usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
-Xms6144m -Xmx6144m  -verbose:gc -XX:+HeapDumpOnOutOfMemoryError
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
-Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkdri...@ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal:54357/user/CoarseGrainedScheduler
1 ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal 4 1
/mnt/var/log/hadoop/userlogs/application_1418928607193_0011/container_1418928607193_0011_01_02/stdout
2
/mnt/var/log/hadoop/userlogs/application_1418928607193_0011/container_1418928607193_0011_01_02/stderr
|- 24277 24273 24273 24273 (java) 13808 1730 15204556800 1707660
/usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms6144m
-Xmx6144m -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
-Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkdri...@ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal:54357/user/CoarseGrainedScheduler
1 

Re: Fetch Failure

2014-12-19 Thread Jon Chase
I'm actually already running 1.1.1.

I also just tried --conf spark.yarn.executor.memoryOverhead=4096, but no
luck.  Still getting ExecutorLostFailure (executor lost).



On Fri, Dec 19, 2014 at 10:43 AM, Rafal Kwasny rafal.kwa...@gmail.com
wrote:

 Hi,
 Just upgrade to 1.1.1 - it was fixed some time ago

 /Raf


 sandy.r...@cloudera.com wrote:

 Hi Jon,

 The fix for this is to increase spark.yarn.executor.memoryOverhead to
 something greater than it's default of 384.

 This will increase the gap between the executors heap size and what it
 requests from yarn. It's required because jvms take up some memory beyond
 their heap size.

 -Sandy

 On Dec 19, 2014, at 9:04 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm getting the same error (ExecutorLostFailure) - input RDD is 100k
 small files (~2MB each).  I do a simple map, then keyBy(), and then
 rdd.saveAsHadoopDataset(...).  Depending on the memory settings given to
 spark-submit, the time before the first ExecutorLostFailure varies (more
 memory == longer until failure) - but this usually happens after about 100
 files being processed.

 I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is
 killing the executor b/c it thinks it's exceeding memory.  However, I can't
 repro any OOM issues when running locally, no matter the size of the data
 set.

 It seems like Yarn thinks the heap size is increasing according to the
 Yarn logs:

 2014-12-18 22:06:43,505 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory
 used; 13.8 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:46,516 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
 used; 13.9 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:49,524 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
 used; 14.0 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:52,531 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory
 used; 14.1 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:55,538 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
 used; 14.2 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:58,549 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
 used; 14.3 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:58,549 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Process tree for container:
 container_1418928607193_0011_01_02 has processes older than 1 iteration
 running over the configured limit. Limit=6979321856, current usage =
 6995812352
 2014-12-18 22:06:58,549 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Container
 [pid=24273,containerID=container_1418928607193_0011_01_02] is running
 beyond physical memory limits. Current usage: 6.5 GB of 6.5 GB physical
 memory used; 14.3 GB of 32.5 GB virtual memory used. Killing container.
 Dump of the process-tree for container_1418928607193_0011_01_02 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 24273 4304 24273 24273 (bash) 0 0 115630080 302 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms6144m -Xmx6144m  -verbose:gc -XX:+HeapDumpOnOutOfMemoryError
 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
 -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
 -Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkdri...@ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal:54357/user/CoarseGrainedScheduler
 1 ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal 4 1
 /mnt/var/log/hadoop/userlogs/application_1418928607193_0011

Re: Fetch Failure

2014-12-19 Thread Jon Chase
Hmmm, I see this a lot (multiple times per second) in the stdout logs of my
application:

2014-12-19T16:12:35.748+: [GC (Allocation Failure) [ParNew:
286663K-12530K(306688K), 0.0074579 secs] 1470813K-1198034K(2063104K),
0.0075189 secs] [Times: user=0.03 sys=0.00, real=0.01 secs]


And finally I see

2014-12-19 16:12:36,116 ERROR [SIGTERM handler]
executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
RECEIVED SIGNAL 15: SIGTERM

which I assume is coming from Yarn, after which the log contains this and
then ends:

Heap
 par new generation   total 306688K, used 23468K [0x8000,
0x94cc, 0x94cc)
  eden space 272640K,   4% used [0x8000, 0x80abff10,
0x90a4)
  from space 34048K,  36% used [0x92b8, 0x937ab488,
0x94cc)
  to   space 34048K,   0% used [0x90a4, 0x90a4,
0x92b8)
 concurrent mark-sweep generation total 1756416K, used 1186756K
[0x94cc, 0x0001, 0x0001)
 Metaspace   used 52016K, capacity 52683K, committed 52848K, reserved
1095680K
  class spaceused 7149K, capacity 7311K, committed 7392K, reserved
1048576K







On Fri, Dec 19, 2014 at 11:16 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm actually already running 1.1.1.

 I also just tried --conf spark.yarn.executor.memoryOverhead=4096, but no
 luck.  Still getting ExecutorLostFailure (executor lost).



 On Fri, Dec 19, 2014 at 10:43 AM, Rafal Kwasny rafal.kwa...@gmail.com
 wrote:

 Hi,
 Just upgrade to 1.1.1 - it was fixed some time ago

 /Raf


 sandy.r...@cloudera.com wrote:

 Hi Jon,

 The fix for this is to increase spark.yarn.executor.memoryOverhead to
 something greater than it's default of 384.

 This will increase the gap between the executors heap size and what it
 requests from yarn. It's required because jvms take up some memory beyond
 their heap size.

 -Sandy

 On Dec 19, 2014, at 9:04 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm getting the same error (ExecutorLostFailure) - input RDD is 100k
 small files (~2MB each).  I do a simple map, then keyBy(), and then
 rdd.saveAsHadoopDataset(...).  Depending on the memory settings given to
 spark-submit, the time before the first ExecutorLostFailure varies (more
 memory == longer until failure) - but this usually happens after about 100
 files being processed.

 I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is
 killing the executor b/c it thinks it's exceeding memory.  However, I can't
 repro any OOM issues when running locally, no matter the size of the data
 set.

 It seems like Yarn thinks the heap size is increasing according to the
 Yarn logs:

 2014-12-18 22:06:43,505 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory
 used; 13.8 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:46,516 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
 used; 13.9 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:49,524 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
 used; 14.0 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:52,531 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory
 used; 14.1 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:55,538 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
 used; 14.2 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:58,549 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
 used; 14.3 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:58,549 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Process tree for container:
 container_1418928607193_0011_01_02 has processes older than 1 iteration
 running over the configured limit. Limit=6979321856, current usage =
 6995812352
 2014-12-18 22:06:58,549 WARN

Re: Fetch Failure

2014-12-19 Thread Jon Chase
Yes, same problem.

On Fri, Dec 19, 2014 at 11:29 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Do you hit the same errors?  Is it now saying your containers are exceed
 ~10 GB?

 On Fri, Dec 19, 2014 at 11:16 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm actually already running 1.1.1.

 I also just tried --conf spark.yarn.executor.memoryOverhead=4096, but no
 luck.  Still getting ExecutorLostFailure (executor lost).



 On Fri, Dec 19, 2014 at 10:43 AM, Rafal Kwasny rafal.kwa...@gmail.com
 wrote:

 Hi,
 Just upgrade to 1.1.1 - it was fixed some time ago

 /Raf


 sandy.r...@cloudera.com wrote:

 Hi Jon,

 The fix for this is to increase spark.yarn.executor.memoryOverhead to
 something greater than it's default of 384.

 This will increase the gap between the executors heap size and what it
 requests from yarn. It's required because jvms take up some memory beyond
 their heap size.

 -Sandy

 On Dec 19, 2014, at 9:04 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm getting the same error (ExecutorLostFailure) - input RDD is 100k
 small files (~2MB each).  I do a simple map, then keyBy(), and then
 rdd.saveAsHadoopDataset(...).  Depending on the memory settings given to
 spark-submit, the time before the first ExecutorLostFailure varies (more
 memory == longer until failure) - but this usually happens after about 100
 files being processed.

 I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is
 killing the executor b/c it thinks it's exceeding memory.  However, I can't
 repro any OOM issues when running locally, no matter the size of the data
 set.

 It seems like Yarn thinks the heap size is increasing according to the
 Yarn logs:

 2014-12-18 22:06:43,505 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory
 used; 13.8 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:46,516 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
 used; 13.9 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:49,524 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory
 used; 14.0 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:52,531 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory
 used; 14.1 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:55,538 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
 used; 14.2 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:58,549 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 24273 for container-id
 container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory
 used; 14.3 GB of 32.5 GB virtual memory used
 2014-12-18 22:06:58,549 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Process tree for container:
 container_1418928607193_0011_01_02 has processes older than 1 iteration
 running over the configured limit. Limit=6979321856, current usage =
 6995812352
 2014-12-18 22:06:58,549 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Container
 [pid=24273,containerID=container_1418928607193_0011_01_02] is running
 beyond physical memory limits. Current usage: 6.5 GB of 6.5 GB physical
 memory used; 14.3 GB of 32.5 GB virtual memory used. Killing container.
 Dump of the process-tree for container_1418928607193_0011_01_02 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 24273 4304 24273 24273 (bash) 0 0 115630080 302 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms6144m -Xmx6144m  -verbose:gc -XX:+HeapDumpOnOutOfMemoryError
 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
 -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
 -Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp

Yarn not running as many executors as I'd like

2014-12-19 Thread Jon Chase
Running on Amazon EMR w/Yarn and Spark 1.1.1, I have trouble getting Yarn
to use the number of executors that I specify in spark-submit:

--num-executors 2

In a cluster with two core nodes will typically only result in one executor
running at a time.  I can play with the memory settings and
num-cores-per-executor, and sometimes I can get 2 executors running at
once, but I'm not sure what the secret formula is to make this happen
consistently.


Downloads from S3 exceedingly slow when running on spark-ec2

2014-12-18 Thread Jon Chase
I'm running a very simple Spark application that downloads files from S3,
does a bit of mapping, then uploads new files.  Each file is roughly 2MB
and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
having any download speed issues (Amazon's EMR provides a custom
implementation of the s3n:// file system, FWIW).

When I say exceedingly slow, I mean that it takes about 2 minutes to
download and process a 2MB file (this was taking ~2 seconds on the same
instance types in Amazon's EMR).  When I download the same file from the
EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
other bandwidth checks for downloads from other external hosts - no speed
problems there.

Tried this w/Spark 1.1.0 and 1.1.1.

When I do a thread dump on a worker, I typically see this a lot:



Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000
nid=0x59e9 runnable [0x7fd1f7dfb000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.read(InputRecord.java:480)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
- locked 0x0007e44dd140 (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
- locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
- locked 0x0007e44ea800 (a java.io.BufferedInputStream)
at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
at
org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
at
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
at
org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
at
org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
at
org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
at
org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
at
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
at
org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
at
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
at
org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
at
org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
at 

Spark cluster with Java 8 using ./spark-ec2

2014-11-25 Thread Jon Chase
I'm trying to use the spark-ec2 command to launch a Spark cluster that runs
Java 8, but so far I haven't been able to get the Spark processes to use
the right JVM at start up.

Here's the command I use for launching the cluster.  Note I'm using the
user-data feature to install Java 8:

./spark-ec2 -k spark -i ~/.ssh/spark.pem \
  -t m3.large -s 1  \
  --user-data=java8.sh launch spark


After the cluster is running, I can SSH in and see that the default Java
version is indeed 8:

 ssh root@...

$ echo $JAVA_HOME
/usr/java/default

$ java -version
java version 1.8.0
Java(TM) SE Runtime Environment (build 1.8.0-b132)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b70, mixed mode)


It seems that the Spark processes are still using Java 7.  I've tried
running sbin/stop-all.sh and start-all.sh from master, but that doesn't
seem to help.

What magic incantation am I missing?






java8.sh user data script:

#!/bin/bash

# Check java version
JAVA_VER=$(java -version 21 | sed 's/java version
\(.*\)\.\(.*\)\..*/\1\2/; 1q')

if [ $JAVA_VER -lt 18 ]
then
# Download jdk 8
echo Downloading and installing jdk 8
wget --no-cookies --no-check-certificate --header Cookie:
gpw_e24=http%3A%2F%2Fwww.oracle.com%2F;
oraclelicense=accept-securebackup-cookie 
http://download.oracle.com/otn-pub/java/jdk/8-b132/jdk-8-linux-x64.rpm;

# Silent install
yum -y install jdk-8-linux-x64.rpm

# Figure out how many versions of Java we currently have
NR_OF_OPTIONS=$(echo 0 | alternatives --config java 2/dev/null | grep
'There ' | awk '{print $3}' | tail -1)

echo Found $NR_OF_OPTIONS existing versions of java. Adding new
version.

# Make the new java version available via /etc/alternatives
alternatives --install /usr/bin/java java /usr/java/default/bin/java 1

# Make java 8 the default
echo $(($NR_OF_OPTIONS + 1)) | alternatives --config java

# Set some variables
export JAVA_HOME=/usr/java/default/bin/java
export JRE_HOME=/usr/java/default/jre
export PATH=$PATH:/usr/java/default/bin
fi

# Check java version again
JAVA_VER=$(java -version 21 | sed 's/java version
\(.*\)\.\(.*\)\..*/\1\2/; 1q')

echo export JAVA_HOME=/usr/java/default  /root/.bash_profile


. ~/.bash_profile

echo Java version is $JAVA_VER
echo JAVA_HOME: $JAVA_HOME
echo JRE_HOME: $JRE_HOME
echo PATH: $PATH










Here's the stacktrace from stdout from the spark-submit command:



14/11/25 14:01:11 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 1.0
(TID 7) on executor ip-xx-xx-xxx-xx.eu-west-1.compute.internal:
java.lang.UnsupportedClassVersionError (foo/spark/Main : Unsupported
major.minor version 52.0) [duplicate 3]
14/11/25 14:01:11 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0
failed 4 times; aborting job
14/11/25 14:01:11 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
14/11/25 14:01:11 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled
14/11/25 14:01:11 INFO scheduler.DAGScheduler: Failed to run
saveAsHadoopFile at Main.java:146
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 1.0 (TID 7,
ip-xx-xx-xxx-xx.eu-west-1.compute.internal):
java.lang.UnsupportedClassVersionError: foo/spark/Main : Unsupported
major.minor version 52.0
java.lang.ClassLoader.defineClass1(Native Method)
java.lang.ClassLoader.defineClass(ClassLoader.java:800)

java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
java.net.URLClassLoader.access$100(URLClassLoader.java:71)
java.net.URLClassLoader$1.run(URLClassLoader.java:361)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
java.lang.ClassLoader.loadClass(ClassLoader.java:425)
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
java.lang.Class.forName0(Native Method)
java.lang.Class.forName(Class.java:274)

org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)

java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)