Hi Ilya,
thanks for your insight, this was the right clue. I had default parallelism 
already set but it was quite low (hundreds) and moreover the number of 
partitions of the input RDD was low as well so the chunks were really too big. 
Increased parallelism and repartitioning seems to be helping...
Thanks!Antony. 

     On Thursday, 19 February 2015, 16:45, Ilya Ganelin <ilgan...@gmail.com> 
wrote:
   
 

 Hi Anthony - you are seeing a problem that I ran into. The underlying issue is 
your default parallelism setting. What's happening is that within ALS certain 
RDD operations end up changing the number of partitions you have of your data. 
For example if you start with an RDD of 300 partitions, unless default 
parallelism is set while the algorithm executes you'll eventually get an RDD 
with something like 20 partitions. Consequently, your giant data set is now 
stored across a much smaller number of partitions so each partition is huge. 
Then, when a shuffle requires serialization you run out of heap space trying to 
serialize it. The solution should be as simple as setting the default 
parallelism setting. 

This is referenced in a JIRA I can't find at the moment. 
On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi <antonym...@yahoo.com.invalid> 
wrote:

now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC 
overhead limit exceeded:
=== spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 
7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC 
overhead limit exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in 
task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit 
exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn nodemanager ===2015-02-19 12:08:13,758 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id 
container_1424204221358_0013_01_000012: 29.8 GB of 32 GB physical memory used; 
31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id 
container_1424204221358_0013_01_000008: 1.2 MB of 32 GB physical memory used; 
103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0013_01_000008 is : 1432015-02-19 
12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0013_01_000008 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_000008
Antony.

 

     On Thursday, 19 February 2015, 11:54, Antony Mayi 
<antonym...@yahoo.com.INVALID> wrote:
   
 

 it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this "GC overhead limit exceeded" and "Java heap space" (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): 
ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR 
cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 
615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 
(TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), 
shuffleId=6, mapId=227, reduceId=6, 
message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
/192.168.1.92:54289        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
=== yarn log ===15/02/19 10:15:05 WARN executor.Executor: Told to re-register 
on heartbeat15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: 
RECEIVED SIGNAL 15: SIGTERM15/02/19 10:16:02 WARN 
server.TransportChannelHandler: Exception in connection from 
/192.168.1.92:45633io.netty.handler.codec.DecoderException: 
java.lang.OutOfMemoryError: Java heap space        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:280)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
=== yarn nodemanager log ===2015-02-19 10:16:45,146 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 20284 for container-id 
container_1424204221358_0012_01_000016: 28.5 GB of 32 GB physical memory used; 
29.1 GB of 67.2 GB virtual memory used2015-02-19 10:16:45,163 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 20273 for container-id 
container_1424204221358_0012_01_000020: 28.5 GB of 32 GB physical memory used; 
29.2 GB of 67.2 GB virtual memory used2015-02-19 10:16:46,621 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0012_01_000008 is : 1432015-02-19 
10:16:46,621 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0012_01_000008 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 10:16:46,621 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0012_01_000008

thanks for any help,Antony.








ps. could that be Java 8 related? 

     On Thursday, 19 February 2015, 11:25, Sean Owen <so...@cloudera.com> wrote:
   
 

 Oh OK you are saying you are requesting 25 executors and getting them,
got it. You can consider making fewer, bigger executors to pool rather
than split up your memory, but at some point it becomes
counter-productive. 32GB is a fine executor size.

So you have ~8GB available per task which seems like plenty. Something
else is at work here. Is this error form your code's stages or ALS?

On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi <antonym...@yahoo.com> wrote:
> based on spark UI I am running 25 executors for sure. why would you expect
> four? I submit the task with --num-executors 25 and I get 6-7 executors
> running per host (using more of smaller executors allows me better cluster
> utilization when running parallel spark sessions (which is not the case of
> this reported issue - for now using the cluster exclusively)).
>
> thx,
> Antony.
>
>
> On Thursday, 19 February 2015, 11:02, Sean Owen <so...@cloudera.com> wrote:
>
>
>
> This should result in 4 executors, not 25. They should be able to
> execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB
> of RAM, not 1TB.
>
> It still feels like this shouldn't be running out of memory, not by a
> long shot though. But just pointing out potential differences between
> what you are expecting and what you are configuring.
>
> On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi
> <antonym...@yahoo.com.invalid> wrote:
>> Hi,
>>
>> I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running
>> spark
>> 1.2.0 in yarn-client mode with following layout:
>>
>> spark.executor.cores=4
>> spark.executor.memory=28G
>> spark.yarn.executor.memoryOverhead=4096
>>
>> I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a
>> dataset with ~3 billion of ratings using 25 executors. At some point some
>> executor crashes with:
>>
>> 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>        at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>        at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>        at
>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>        at
>>
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>        at scala.concurrent.Await$.result(package.scala:107)
>>        at
>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
>>        at
>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
>> 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in
>> stage
>> 51.0 (TID 7259)
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>        at java.lang.reflect.Array.newInstance(Array.java:75)
>>        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>>        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>>
>> So the GC overhead limit exceeded is pretty clear and would suggest
>> running
>> out of memory. Since I have 1TB of RAM available this must be rather due
>> to
>> some config inoptimality.
>>
>> Can anyone please point me to some directions how to tackle this?
>>
>> Thanks,
>> Antony.
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



 
    

 
    


 
   

Reply via email to