I don't know the full context of what you're doing, but serialization
errors usually mean you're attempting to serialize something that can't be
serialized, like the SparkContext. Kryo won't help there.

The arguments to spark-submit you posted previously look good:

2)  --num-executors 96 --driver-memory 12g --driver-java-options
"-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4

I suspect you aren't getting the parallelism you need. For partitioning, if
your data is in HDFS and your block size is 128MB, then you'll get ~195
partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
have some other serious bottleneck. You should examine the web console and
the logs to determine where all the time is going. Questions you might
pursue:

   - How long does each task take to complete?
   - How many of those 195 partitions/tasks are processed at the same time?
   That is, how many "slots" are available?  Maybe you need more nodes if the
   number of slots is too low. Based on your command arguments, you should be
   able to process 1/2 of them at a time, unless the cluster is busy.
   - Is the cluster swamped with other work?
   - How much data does each task process? Is the data roughly the same
   from one task to the next? If not, then you might have serious key skew?

You may also need to research the details of how joins are implemented and
some of the common tricks for organizing data to minimize having to shuffle
all N by M records.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> Hello Deam,
> If I don;t use Kryo serializer i got Serialization error and hence am
> using it.
> If I don';t use partitionBy/reparition then the simply join never
> completed even after 7 hours and infact as next step i need to run it
> against 250G as that is my full dataset size. Someone here suggested to me
> to use repartition.
>
> Assuming reparition is mandatory , how do i decide whats the right number
> ? When i am using 400 i do not get NullPointerException that i talked
> about, which is strange. I never saw that exception against small random
> dataset but see it with 25G and again with 400 partitions , i do not see it.
>
>
> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <deanwamp...@gmail.com>
> wrote:
>
>> IMHO, you are trying waaay to hard to optimize work on what is really a
>> small data set. 25G, even 250G, is not that much data, especially if you've
>> spent a month trying to get something to work that should be simple. All
>> these errors are from optimization attempts.
>>
>> Kryo is great, but if it's not working reliably for some reason, then
>> don't use it. Rather than force 200 partitions, let Spark try to figure out
>> a good-enough number. (If you really need to force a partition count, use
>> the repartition method instead, unless you're overriding the partitioner.)
>>
>> So. I recommend that you eliminate all the optimizations: Kryo,
>> partitionBy, etc. Just use the simplest code you can. Make it work first.
>> Then, if it really isn't fast enough, look for actual evidence of
>> bottlenecks and optimize those.
>>
>>
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> Hello Dean & Others,
>>> Thanks for your suggestions.
>>> I have two data sets and all i want to do is a simple equi join. I have
>>> 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
>>> Hence i switched back to use .join() API instead of map-side broadcast
>>> join.
>>> I am repartitioning the data with 100,200 and i see a
>>> NullPointerException now.
>>>
>>> When i run against 25G of each input and with .partitionBy(new
>>> org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption
>>>
>>>
>>> this trace does not include a line from my code and hence i do not what
>>> is causing error ?
>>> I do have registered kryo serializer.
>>>
>>> val conf = new SparkConf()
>>>       .setAppName(detail)
>>> *      .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")*
>>>       .set("spark.kryoserializer.buffer.mb",
>>> arguments.get("buffersize").get)
>>>       .set("spark.kryoserializer.buffer.max.mb",
>>> arguments.get("maxbuffersize").get)
>>>       .set("spark.driver.maxResultSize",
>>> arguments.get("maxResultSize").get)
>>>       .set("spark.yarn.maxAppAttempts", "0")
>>> * 
>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
>>> lMetricSum]))
>>>     val sc = new SparkContext(conf)
>>>
>>> I see the exception when this task runs
>>>
>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
>>>
>>> Its a simple mapping of input records to (itemId, record)
>>>
>>> I found this
>>>
>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>> and
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>>>
>>> Looks like Kryo (2.21v)  changed something to stop using default
>>> constructors.
>>>
>>> (Kryo.DefaultInstantiatorStrategy) 
>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
>>> StdInstantiatorStrategy());
>>>
>>>
>>> Please suggest
>>>
>>>
>>> Trace:
>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
>>> stage 2.0 (TID 774)
>>> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>>> Serialization trace:
>>> values (org.apache.avro.generic.GenericData$Record)
>>> datum (org.apache.avro.mapred.AvroKey)
>>>     at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>     at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>     at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>     at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>     at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>>>     at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>> Regards,
>>>
>>>
>>> Any suggestions.
>>> I am not able to get this thing to work over a month now, its kind of
>>> getting frustrating.
>>>
>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <deanwamp...@gmail.com>
>>> wrote:
>>>
>>>> How big is the data you're returning to the driver with collectAsMap?
>>>> You are probably running out of memory trying to copy too much data back to
>>>> it.
>>>>
>>>> If you're trying to force a map-side join, Spark can do that for you in
>>>> some cases within the regular DataFrame/RDD context. See
>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>>>> and this talk by Michael Armbrust for example,
>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>>>
>>>>
>>>> dean
>>>>
>>>> Dean Wampler, Ph.D.
>>>> Author: Programming Scala, 2nd Edition
>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>> Typesafe <http://typesafe.com>
>>>> @deanwampler <http://twitter.com/deanwampler>
>>>> http://polyglotprogramming.com
>>>>
>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>> wrote:
>>>>
>>>>> Full Exception
>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap
>>>>> at VISummaryDataProvider.scala:37) failed in 884.087 s*
>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
>>>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s*
>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>>>> exception: Job aborted due to stage failure: Exception while getting task
>>>>> result: org.apache.spark.SparkException: Error sending message [message =
>>>>> GetLocations(taskresult_112)]
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Exception while getting task result: org.apache.spark.SparkException: 
>>>>> Error
>>>>> sending message [message = GetLocations(taskresult_112)]
>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>> at
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>> at scala.Option.foreach(Option.scala:236)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status:
>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due
>>>>> to stage failure: Exception while getting task result:
>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>> GetLocations(taskresult_112)])
>>>>>
>>>>>
>>>>> *Code at line 37*
>>>>>
>>>>> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong,
>>>>> lstg) }.collectAsMap
>>>>>
>>>>> Listing data set size is 26G (10 files) and my driver memory is 12G (I
>>>>> cant go beyond it). The reason i do collectAsMap is to brodcast it and do 
>>>>> a
>>>>> map-side join instead of regular join.
>>>>>
>>>>>
>>>>> Please suggest ?
>>>>>
>>>>>
>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My Spark Job is failing  and i see
>>>>>>
>>>>>> ==============================
>>>>>>
>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>>>>> exception: Job aborted due to stage failure: Exception while getting task
>>>>>> result: org.apache.spark.SparkException: Error sending message [message =
>>>>>> GetLocations(taskresult_112)]
>>>>>>
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Exception while getting task result: org.apache.spark.SparkException: 
>>>>>> Error
>>>>>> sending message [message = GetLocations(taskresult_112)]
>>>>>>
>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>
>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>
>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>
>>>>>>
>>>>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>>>>> seconds]
>>>>>>
>>>>>>
>>>>>> I see multiple of these
>>>>>>
>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>>> after [30 seconds]
>>>>>>
>>>>>> And finally i see this
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>> at
>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>>>> at
>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>>>> at
>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>>>> at
>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>>>> at
>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>>>> at
>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>> at
>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>> at
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>> at
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>> at
>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>> at
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>> at
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>> at
>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>> at
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>> at
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>> at
>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>>>> at
>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>>>> at
>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>> at
>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>> at
>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Solutions
>>>>>>
>>>>>> 1)
>>>>>>
>>>>>>       .set("spark.akka.askTimeout", "6000")
>>>>>>
>>>>>>       .set("spark.akka.timeout", "6000")
>>>>>>
>>>>>>       .set("spark.worker.timeout", "6000")
>>>>>>
>>>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>>>
>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this.
>>>>>>
>>>>>>
>>>>>> ANY suggestions ?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Deepak
>>>>>>
>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Did not work. Same problem.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <
>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> You could try increasing your heap space explicitly. like export
>>>>>>>> _JAVA_OPTIONS="-Xmx10g", its not the correct approach but try.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files
>>>>>>>>> (5*750MB size) and it takes 16 executors to do so.
>>>>>>>>>
>>>>>>>>> I wanted to run it against 10 files of each input type (10*3 files
>>>>>>>>> as there are three inputs that are transformed). [Input1 = 10*750 MB,
>>>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.
>>>>>>>>>
>>>>>>>>> I see multiple
>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with
>>>>>>>>> driver in heartbeater
>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed
>>>>>>>>> out after [30 seconds]
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>>>> ... 1 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> When i searched deeper, i found OOM error.
>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing
>>>>>>>>> block manager BlockManagerId(17,
>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
>>>>>>>>> BlockManager BlockManagerId(9,
>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent
>>>>>>>>> heart beats: 121200ms exceeds 120000ms
>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing
>>>>>>>>> block manager BlockManagerId(9,
>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
>>>>>>>>> task-result-getter-3
>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> LogType: stdout
>>>>>>>>> LogLength: 96
>>>>>>>>> Log Contents:
>>>>>>>>>
>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Spark Command:
>>>>>>>>>
>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>>>> --jars
>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 --queue
>>>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession 
>>>>>>>>> subcommand=viewItem
>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> There is 12G limit on memory that i can use as this Spark is
>>>>>>>>> running over YARN.
>>>>>>>>>
>>>>>>>>> Spark Version: 1.3.1
>>>>>>>>> Should i increase the number of executors form 32?
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>

Reply via email to