How big is the data you're returning to the driver with collectAsMap? You
are probably running out of memory trying to copy too much data back to it.

If you're trying to force a map-side join, Spark can do that for you in
some cases within the regular DataFrame/RDD context. See
http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
and this talk by Michael Armbrust for example,
http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> Full Exception
> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
> VISummaryDataProvider.scala:37) failed in 884.087 s*
> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
> at VISummaryDataProvider.scala:37, took 1093.418249 s*
> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
> exception: Job aborted due to stage failure: Exception while getting task
> result: org.apache.spark.SparkException: Error sending message [message =
> GetLocations(taskresult_112)]
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Exception while getting task result: org.apache.spark.SparkException: Error
> sending message [message = GetLocations(taskresult_112)]
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception: Job aborted due to stage
> failure: Exception while getting task result:
> org.apache.spark.SparkException: Error sending message [message =
> GetLocations(taskresult_112)])
>
>
> *Code at line 37*
>
> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
> .collectAsMap
>
> Listing data set size is 26G (10 files) and my driver memory is 12G (I
> cant go beyond it). The reason i do collectAsMap is to brodcast it and do a
> map-side join instead of regular join.
>
>
> Please suggest ?
>
>
> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> My Spark Job is failing  and i see
>>
>> ==============================
>>
>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>> exception: Job aborted due to stage failure: Exception while getting task
>> result: org.apache.spark.SparkException: Error sending message [message =
>> GetLocations(taskresult_112)]
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Exception while getting task result: org.apache.spark.SparkException: Error
>> sending message [message = GetLocations(taskresult_112)]
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>
>> at scala.Option.foreach(Option.scala:236)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>
>>
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>
>>
>> I see multiple of these
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [30 seconds]
>>
>> And finally i see this
>> java.lang.OutOfMemoryError: Java heap space
>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> at
>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>> at
>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>> at
>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> at io.netty.util.concurrent.SingleThreadEven
>>
>>
>>
>>
>> Solutions
>>
>> 1)
>>
>>       .set("spark.akka.askTimeout", "6000")
>>
>>       .set("spark.akka.timeout", "6000")
>>
>>       .set("spark.worker.timeout", "6000")
>>
>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>
>> 12G is the limit imposed by YARN cluster, I cant go beyond this.
>>
>>
>> ANY suggestions ?
>>
>> Regards,
>>
>> Deepak
>>
>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> Did not work. Same problem.
>>>
>>>
>>>
>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> You could try increasing your heap space explicitly. like export
>>>> _JAVA_OPTIONS="-Xmx10g", its not the correct approach but try.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB
>>>>> size) and it takes 16 executors to do so.
>>>>>
>>>>> I wanted to run it against 10 files of each input type (10*3 files as
>>>>> there are three inputs that are transformed). [Input1 = 10*750 MB,
>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.
>>>>>
>>>>> I see multiple
>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with
>>>>> driver in heartbeater
>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>> after [30 seconds]
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>> at
>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>> ... 1 more
>>>>>
>>>>>
>>>>> When i searched deeper, i found OOM error.
>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block
>>>>> manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com,
>>>>> 36819)
>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
>>>>> BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
>>>>> 48304) with no recent heart beats: 121200ms exceeds 120000ms
>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block
>>>>> manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
>>>>> 48304)
>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
>>>>> task-result-getter-3
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>> at
>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>> at
>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>> at
>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError:
>>>>> Java heap space
>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>> at
>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>> at
>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>> at
>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>> at
>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> LogType: stdout
>>>>> LogLength: 96
>>>>> Log Contents:
>>>>>
>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>
>>>>>
>>>>> Spark Command:
>>>>>
>>>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>> --jars
>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 --queue
>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>
>>>>>
>>>>>
>>>>> There is 12G limit on memory that i can use as this Spark is running
>>>>> over YARN.
>>>>>
>>>>> Spark Version: 1.3.1
>>>>> Should i increase the number of executors form 32?
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>

Reply via email to