Hello Dean & Others,
Thanks for the response.

I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
800 executors. Each time all the tasks of join complete in less than a
minute except one and that one tasks runs forever. I have a huge cluster at
my disposal.

The data for each of 1199 tasks is around 40MB/30k records and for 1 never
ending task is 1.5G/98million records. I see that there is data skew among
tasks. I had observed this a week earlier and i have no clue on how to fix
it and when someone suggested that repartition might make things more
parallel, but the problem is still persistent.

Please suggest on how to get the task to complete.
All i want to do is join two datasets. (dataset1 is in sequence file and
dataset2 is in avro format).



Ex:
Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0 SUCCESS
PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB / 29754
0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26
s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS PROCESS_LOCAL 320
/ host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512  0.0 B 0.0 B   4 3775
0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s  3 s  38.4 MB
/ 29169  0.0 B 0.0 B   3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04
01:27:44 24 s  2 s  38.5 MB / 29258  0.0 B 0.0 B   8 3779 0 SUCCESS
PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s  4 s  39.5 MB / 30008
0.0 B 0.0 B

There are 1200 tasks in total.


On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <deanwamp...@gmail.com> wrote:

> I don't know the full context of what you're doing, but serialization
> errors usually mean you're attempting to serialize something that can't be
> serialized, like the SparkContext. Kryo won't help there.
>
> The arguments to spark-submit you posted previously look good:
>
> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>
> I suspect you aren't getting the parallelism you need. For partitioning,
> if your data is in HDFS and your block size is 128MB, then you'll get ~195
> partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
> have some other serious bottleneck. You should examine the web console and
> the logs to determine where all the time is going. Questions you might
> pursue:
>
>    - How long does each task take to complete?
>    - How many of those 195 partitions/tasks are processed at the same
>    time? That is, how many "slots" are available?  Maybe you need more nodes
>    if the number of slots is too low. Based on your command arguments, you
>    should be able to process 1/2 of them at a time, unless the cluster is 
> busy.
>    - Is the cluster swamped with other work?
>    - How much data does each task process? Is the data roughly the same
>    from one task to the next? If not, then you might have serious key skew?
>
> You may also need to research the details of how joins are implemented and
> some of the common tricks for organizing data to minimize having to shuffle
> all N by M records.
>
>
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> Hello Deam,
>> If I don;t use Kryo serializer i got Serialization error and hence am
>> using it.
>> If I don';t use partitionBy/reparition then the simply join never
>> completed even after 7 hours and infact as next step i need to run it
>> against 250G as that is my full dataset size. Someone here suggested to me
>> to use repartition.
>>
>> Assuming reparition is mandatory , how do i decide whats the right number
>> ? When i am using 400 i do not get NullPointerException that i talked
>> about, which is strange. I never saw that exception against small random
>> dataset but see it with 25G and again with 400 partitions , i do not see it.
>>
>>
>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <deanwamp...@gmail.com>
>> wrote:
>>
>>> IMHO, you are trying waaay to hard to optimize work on what is really a
>>> small data set. 25G, even 250G, is not that much data, especially if you've
>>> spent a month trying to get something to work that should be simple. All
>>> these errors are from optimization attempts.
>>>
>>> Kryo is great, but if it's not working reliably for some reason, then
>>> don't use it. Rather than force 200 partitions, let Spark try to figure out
>>> a good-enough number. (If you really need to force a partition count, use
>>> the repartition method instead, unless you're overriding the partitioner.)
>>>
>>> So. I recommend that you eliminate all the optimizations: Kryo,
>>> partitionBy, etc. Just use the simplest code you can. Make it work first.
>>> Then, if it really isn't fast enough, look for actual evidence of
>>> bottlenecks and optimize those.
>>>
>>>
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Typesafe <http://typesafe.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>> wrote:
>>>
>>>> Hello Dean & Others,
>>>> Thanks for your suggestions.
>>>> I have two data sets and all i want to do is a simple equi join. I have
>>>> 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
>>>> Hence i switched back to use .join() API instead of map-side broadcast
>>>> join.
>>>> I am repartitioning the data with 100,200 and i see a
>>>> NullPointerException now.
>>>>
>>>> When i run against 25G of each input and with .partitionBy(new
>>>> org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption
>>>>
>>>>
>>>> this trace does not include a line from my code and hence i do not what
>>>> is causing error ?
>>>> I do have registered kryo serializer.
>>>>
>>>> val conf = new SparkConf()
>>>>       .setAppName(detail)
>>>> *      .set("spark.serializer",
>>>> "org.apache.spark.serializer.KryoSerializer")*
>>>>       .set("spark.kryoserializer.buffer.mb",
>>>> arguments.get("buffersize").get)
>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>> arguments.get("maxbuffersize").get)
>>>>       .set("spark.driver.maxResultSize",
>>>> arguments.get("maxResultSize").get)
>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>> * 
>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
>>>> lMetricSum]))
>>>>     val sc = new SparkContext(conf)
>>>>
>>>> I see the exception when this task runs
>>>>
>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi)
>>>> }
>>>>
>>>> Its a simple mapping of input records to (itemId, record)
>>>>
>>>> I found this
>>>>
>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>>> and
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>>>>
>>>> Looks like Kryo (2.21v)  changed something to stop using default
>>>> constructors.
>>>>
>>>> (Kryo.DefaultInstantiatorStrategy) 
>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
>>>> StdInstantiatorStrategy());
>>>>
>>>>
>>>> Please suggest
>>>>
>>>>
>>>> Trace:
>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
>>>> stage 2.0 (TID 774)
>>>> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>>>> Serialization trace:
>>>> values (org.apache.avro.generic.GenericData$Record)
>>>> datum (org.apache.avro.mapred.AvroKey)
>>>>     at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>>     at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>     at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>     at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>     at
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>>>>     at
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>> Regards,
>>>>
>>>>
>>>> Any suggestions.
>>>> I am not able to get this thing to work over a month now, its kind of
>>>> getting frustrating.
>>>>
>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <deanwamp...@gmail.com>
>>>> wrote:
>>>>
>>>>> How big is the data you're returning to the driver with collectAsMap?
>>>>> You are probably running out of memory trying to copy too much data back 
>>>>> to
>>>>> it.
>>>>>
>>>>> If you're trying to force a map-side join, Spark can do that for you
>>>>> in some cases within the regular DataFrame/RDD context. See
>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>>>>> and this talk by Michael Armbrust for example,
>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>>>>
>>>>>
>>>>> dean
>>>>>
>>>>> Dean Wampler, Ph.D.
>>>>> Author: Programming Scala, 2nd Edition
>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>> Typesafe <http://typesafe.com>
>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>> http://polyglotprogramming.com
>>>>>
>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Full Exception
>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap
>>>>>> at VISummaryDataProvider.scala:37) failed in 884.087 s*
>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
>>>>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s*
>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>>>>> exception: Job aborted due to stage failure: Exception while getting task
>>>>>> result: org.apache.spark.SparkException: Error sending message [message =
>>>>>> GetLocations(taskresult_112)]
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Exception while getting task result: org.apache.spark.SparkException: 
>>>>>> Error
>>>>>> sending message [message = GetLocations(taskresult_112)]
>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>> at
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status:
>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Job aborted 
>>>>>> due
>>>>>> to stage failure: Exception while getting task result:
>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>> GetLocations(taskresult_112)])
>>>>>>
>>>>>>
>>>>>> *Code at line 37*
>>>>>>
>>>>>> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong,
>>>>>> lstg) }.collectAsMap
>>>>>>
>>>>>> Listing data set size is 26G (10 files) and my driver memory is 12G
>>>>>> (I cant go beyond it). The reason i do collectAsMap is to brodcast it and
>>>>>> do a map-side join instead of regular join.
>>>>>>
>>>>>>
>>>>>> Please suggest ?
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> My Spark Job is failing  and i see
>>>>>>>
>>>>>>> ==============================
>>>>>>>
>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>>>>>> exception: Job aborted due to stage failure: Exception while getting 
>>>>>>> task
>>>>>>> result: org.apache.spark.SparkException: Error sending message [message 
>>>>>>> =
>>>>>>> GetLocations(taskresult_112)]
>>>>>>>
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Exception while getting task result: org.apache.spark.SparkException: 
>>>>>>> Error
>>>>>>> sending message [message = GetLocations(taskresult_112)]
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>
>>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>
>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>
>>>>>>>
>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>>>>>> seconds]
>>>>>>>
>>>>>>>
>>>>>>> I see multiple of these
>>>>>>>
>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>>>> after [30 seconds]
>>>>>>>
>>>>>>> And finally i see this
>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>>> at
>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>>>>> at
>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>>>>> at
>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>>>>> at
>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>>>>> at
>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>>>>> at
>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>> at
>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>> at
>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>> at
>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>> at
>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>> at
>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>> at
>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>> at
>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>> at
>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>> at
>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>> at
>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>>>>> at
>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>>>>> at
>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>>> at
>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>>> at
>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Solutions
>>>>>>>
>>>>>>> 1)
>>>>>>>
>>>>>>>       .set("spark.akka.askTimeout", "6000")
>>>>>>>
>>>>>>>       .set("spark.akka.timeout", "6000")
>>>>>>>
>>>>>>>       .set("spark.worker.timeout", "6000")
>>>>>>>
>>>>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>>>>
>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this.
>>>>>>>
>>>>>>>
>>>>>>> ANY suggestions ?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Deepak
>>>>>>>
>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Did not work. Same problem.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> You could try increasing your heap space explicitly. like export
>>>>>>>>> _JAVA_OPTIONS="-Xmx10g", its not the correct approach but try.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files
>>>>>>>>>> (5*750MB size) and it takes 16 executors to do so.
>>>>>>>>>>
>>>>>>>>>> I wanted to run it against 10 files of each input type (10*3
>>>>>>>>>> files as there are three inputs that are transformed). [Input1 = 
>>>>>>>>>> 10*750 MB,
>>>>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.
>>>>>>>>>>
>>>>>>>>>> I see multiple
>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with
>>>>>>>>>> driver in heartbeater
>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed
>>>>>>>>>> out after [30 seconds]
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>>>>> ... 1 more
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> When i searched deeper, i found OOM error.
>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing
>>>>>>>>>> block manager BlockManagerId(17,
>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
>>>>>>>>>> BlockManager BlockManagerId(9,
>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent
>>>>>>>>>> heart beats: 121200ms exceeds 120000ms
>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing
>>>>>>>>>> block manager BlockManagerId(9,
>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
>>>>>>>>>> task-result-getter-3
>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>> at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>
>>>>>>>>>> LogType: stdout
>>>>>>>>>> LogLength: 96
>>>>>>>>>> Log Contents:
>>>>>>>>>>
>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Spark Command:
>>>>>>>>>>
>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>>>>> --jars
>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 --queue
>>>>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession 
>>>>>>>>>> subcommand=viewItem
>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> There is 12G limit on memory that i can use as this Spark is
>>>>>>>>>> running over YARN.
>>>>>>>>>>
>>>>>>>>>> Spark Version: 1.3.1
>>>>>>>>>> Should i increase the number of executors form 32?
>>>>>>>>>> --
>>>>>>>>>> Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak

Reply via email to