[ 
https://issues.apache.org/jira/browse/SPARK-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159450#comment-15159450
 ] 

JESSE CHEN commented on SPARK-13288:
------------------------------------

I enabled "spark.memory.useLegacyMode", and out of memory still occurred.  So 
looks like auto memory management had nothing to do with this streaming memory 
leak issue. See details below:

Spark properties used, including those specified through
 --conf and those from the properties file 
/TestAutomation/spark-1.6.0/conf/spark-defaults.conf:
  spark.yarn.queue -> default
  spark.history.kerberos.principal -> none
  spark.driver.memory -> 12g
  spark.yarn.max.executor.failures -> 3
  spark.yarn.historyServer.address -> bigaperf132.svl.ibm.com:18080
  spark.eventLog.enabled -> true
  spark.history.ui.port -> 18080
  spark.yarn.applicationMaster.waitTries -> 10
  spark.yarn.scheduler.heartbeat.interval-ms -> 5000
  spark.yarn.executor.memoryOverhead -> 384
  spark.yarn.submit.file.replication -> 3
  spark.driver.extraJavaOptions -> -Diop.version={{iop_full_version}}
  spark.yarn.containerLauncherMaxThreads -> 25
  spark.memory.useLegacyMode -> true
  spark.yarn.driver.memoryOverhead -> 384
  spark.history.kerberos.keytab -> none
  spark.yarn.am.extraJavaOptions -> -Diop.version={{iop_full_version}}
  spark.eventLog.dir -> 
hdfs://bigaperf132.svl.ibm.com:8020/iop/apps/4.1.0.0/spark/logs/history-server
  spark.yarn.preserve.staging.files -> false

22 miniutes into the test, got containers "killed by yarn for exceeding memory 
limit" errors:

16/02/22 17:15:21 ERROR cluster.YarnScheduler: Lost executor 1 on 
bigaperf132.svl.ibm.com: Container killed by YARN for exceeding memory limits. 
4.5 GB of 4.4 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead.
16/02/22 17:15:21 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1940.0 
(TID 5619, bigaperf132.svl.ibm.com): ExecutorLostFailure (executor 1 exited 
caused by one of the running tasks) Reason: Container killed by YARN for 
exceeding memory limits. 4.5 GB of 4.4 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
^M[Stage 1940:===>            (1 + 1) / 5][Stage 1948:=======>       (5 + 0) / 
10]^M[Stage 1940:===>            (1 + 4) / 5][Stage 1948:=======>       (5 + 5) 
/ 10]^M[Stage 1940:===>            (1 + 4) / 5][Stage 1948:=========>     (6 + 
4) / 10]^M[Stage 1940:===>            (1 + 4) / 5][Stage 1948:==========>    (7 
+ 3) / 10]^M[Stage 1940:======>         (2 + 3) / 5][Stage 1948:==========>    
(7 + 3) / 10]16/02/22 17:15:24 WARN scheduler.TaskSetManager: Lost task 9.0 in 
stage 1948.0 (TID 5633, bigaperf134.svl.ibm.com): FetchFailed(null, 
shuffleId=330, mapId=-1, reduceId=9, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 330
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at 
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)


>From executor logs

16/02/22 18:17:07 INFO spark.MapOutputTrackerWorker: Got the output locations
16/02/22 18:17:07 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty 
blocks out of 5 blocks
16/02/22 18:17:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning 
fetch of 1 outstanding blocks 
java.io.IOException: Failed to connect to 
bigaperf133.svl.ibm.com/9.30.104.155:53729
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:316)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:263)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)





> [1.6.0] Memory leak in Spark streaming
> --------------------------------------
>
>                 Key: SPARK-13288
>                 URL: https://issues.apache.org/jira/browse/SPARK-13288
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.0
>         Environment: Bare metal cluster
> RHEL 6.6
>            Reporter: JESSE CHEN
>              Labels: streaming
>
> Streaming in 1.6 seems to have a memory leak.
> Running the same streaming app in Spark 1.5.1 and 1.6, all things equal, 1.6 
> showed a gradual increasing processing time. 
> The app is simple: 1 Kafka receiver of tweet stream and 20 executors 
> processing the tweets in 5-second batches. 
> Spark 1.5.0 handles this smoothly and did not show increasing processing time 
> in the 40-minute test; but 1.6 showed increasing time about 8 minutes into 
> the test. Please see chart here:
> https://ibm.box.com/s/7q4ulik70iwtvyfhoj1dcl4nc469b116
> I captured heap dumps in two version and did a comparison. I noticed the Byte 
> is using 50X more space in 1.5.1.
> Here are some top classes in heap histogram and references. 
> Heap Histogram                                                
>                                               
> All Classes (excluding platform)                                              
>       1.6.0 Streaming                 1.5.1 Streaming         
> Class Instance Count  Total Size              Class   Instance Count  Total 
> Size
> class [B      8453    3,227,649,599           class [B        5095    
> 62,938,466
> class [C      44682   4,255,502               class [C        130482  
> 12,844,182
> class java.lang.reflect.Method        9059    1,177,670               class 
> java.lang.String  130171  1,562,052
>                                               
>                                               
> References by Type                            References by Type              
>                                       
> class [B [0x640039e38]                                class [B [0x6c020bb08]  
>         
>                                               
> Referrers by Type                             Referrers by Type               
>                                               
> Class Count                   Class   Count   
> java.nio.HeapByteBuffer       3239                    
> sun.security.util.DerInputBuffer        1233    
> sun.security.util.DerInputBuffer      1233                    
> sun.security.util.ObjectIdentifier      620     
> sun.security.util.ObjectIdentifier    620                     [[B     397     
> [Ljava.lang.Object;   408                     java.lang.reflect.Method        
> 326     
> ----
> The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
> The Java.nio.HeapByteBuffer referencing class did not show up in top in 
> 1.5.1. 
> I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get them 
> here
> https://ibm.box.com/sparkstreaming-jstack160
> https://ibm.box.com/sparkstreaming-jstack151
> Jesse 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to