[ https://issues.apache.org/jira/browse/SPARK-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159450#comment-15159450 ]
JESSE CHEN commented on SPARK-13288: ------------------------------------ I enabled "spark.memory.useLegacyMode", and out of memory still occurred. So looks like auto memory management had nothing to do with this streaming memory leak issue. See details below: Spark properties used, including those specified through --conf and those from the properties file /TestAutomation/spark-1.6.0/conf/spark-defaults.conf: spark.yarn.queue -> default spark.history.kerberos.principal -> none spark.driver.memory -> 12g spark.yarn.max.executor.failures -> 3 spark.yarn.historyServer.address -> bigaperf132.svl.ibm.com:18080 spark.eventLog.enabled -> true spark.history.ui.port -> 18080 spark.yarn.applicationMaster.waitTries -> 10 spark.yarn.scheduler.heartbeat.interval-ms -> 5000 spark.yarn.executor.memoryOverhead -> 384 spark.yarn.submit.file.replication -> 3 spark.driver.extraJavaOptions -> -Diop.version={{iop_full_version}} spark.yarn.containerLauncherMaxThreads -> 25 spark.memory.useLegacyMode -> true spark.yarn.driver.memoryOverhead -> 384 spark.history.kerberos.keytab -> none spark.yarn.am.extraJavaOptions -> -Diop.version={{iop_full_version}} spark.eventLog.dir -> hdfs://bigaperf132.svl.ibm.com:8020/iop/apps/4.1.0.0/spark/logs/history-server spark.yarn.preserve.staging.files -> false 22 miniutes into the test, got containers "killed by yarn for exceeding memory limit" errors: 16/02/22 17:15:21 ERROR cluster.YarnScheduler: Lost executor 1 on bigaperf132.svl.ibm.com: Container killed by YARN for exceeding memory limits. 4.5 GB of 4.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 16/02/22 17:15:21 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1940.0 (TID 5619, bigaperf132.svl.ibm.com): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 4.5 GB of 4.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. ^M[Stage 1940:===> (1 + 1) / 5][Stage 1948:=======> (5 + 0) / 10]^M[Stage 1940:===> (1 + 4) / 5][Stage 1948:=======> (5 + 5) / 10]^M[Stage 1940:===> (1 + 4) / 5][Stage 1948:=========> (6 + 4) / 10]^M[Stage 1940:===> (1 + 4) / 5][Stage 1948:==========> (7 + 3) / 10]^M[Stage 1940:======> (2 + 3) / 5][Stage 1948:==========> (7 + 3) / 10]16/02/22 17:15:24 WARN scheduler.TaskSetManager: Lost task 9.0 in stage 1948.0 (TID 5633, bigaperf134.svl.ibm.com): FetchFailed(null, shuffleId=330, mapId=-1, reduceId=9, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 330 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136) >From executor logs 16/02/22 18:17:07 INFO spark.MapOutputTrackerWorker: Got the output locations 16/02/22 18:17:07 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 5 blocks 16/02/22 18:17:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to bigaperf133.svl.ibm.com/9.30.104.155:53729 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:316) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:263) at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:43) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > [1.6.0] Memory leak in Spark streaming > -------------------------------------- > > Key: SPARK-13288 > URL: https://issues.apache.org/jira/browse/SPARK-13288 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.6.0 > Environment: Bare metal cluster > RHEL 6.6 > Reporter: JESSE CHEN > Labels: streaming > > Streaming in 1.6 seems to have a memory leak. > Running the same streaming app in Spark 1.5.1 and 1.6, all things equal, 1.6 > showed a gradual increasing processing time. > The app is simple: 1 Kafka receiver of tweet stream and 20 executors > processing the tweets in 5-second batches. > Spark 1.5.0 handles this smoothly and did not show increasing processing time > in the 40-minute test; but 1.6 showed increasing time about 8 minutes into > the test. Please see chart here: > https://ibm.box.com/s/7q4ulik70iwtvyfhoj1dcl4nc469b116 > I captured heap dumps in two version and did a comparison. I noticed the Byte > is using 50X more space in 1.5.1. > Here are some top classes in heap histogram and references. > Heap Histogram > > All Classes (excluding platform) > 1.6.0 Streaming 1.5.1 Streaming > Class Instance Count Total Size Class Instance Count Total > Size > class [B 8453 3,227,649,599 class [B 5095 > 62,938,466 > class [C 44682 4,255,502 class [C 130482 > 12,844,182 > class java.lang.reflect.Method 9059 1,177,670 class > java.lang.String 130171 1,562,052 > > > References by Type References by Type > > class [B [0x640039e38] class [B [0x6c020bb08] > > > Referrers by Type Referrers by Type > > Class Count Class Count > java.nio.HeapByteBuffer 3239 > sun.security.util.DerInputBuffer 1233 > sun.security.util.DerInputBuffer 1233 > sun.security.util.ObjectIdentifier 620 > sun.security.util.ObjectIdentifier 620 [[B 397 > [Ljava.lang.Object; 408 java.lang.reflect.Method > 326 > ---- > The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0. > The Java.nio.HeapByteBuffer referencing class did not show up in top in > 1.5.1. > I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get them > here > https://ibm.box.com/sparkstreaming-jstack160 > https://ibm.box.com/sparkstreaming-jstack151 > Jesse -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org