[ https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341957#comment-14341957 ]
SaintBacchus edited comment on SPARK-6056 at 3/1/15 6:01 AM: ------------------------------------------------------------- [~adav] Thx for comment. I can't understand what you say clearly. Do you mean if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only allocat the heap memory? Is it right? I test it again.And I had update the test program in the Description. I had set the _spark.shuffle.io.preferDirectBufs_ and when I type *test_driver(20,100)(test)* , the result is this: _ 76602 root 20 0 1597m 339m 25m S 0 0.1 0:04.89 java _ _76602 root 20 0 {color:red} 1777m {color} 1.0g 26m S 99 0.3 0:07.88 java _ _ 76602 root 20 0 1597m 880m 26m S 4 0.3 0:07.99 java_ The red num is visual memory and it had raised about 180mb in the moment and total transfor 200mb data (20 * 10MB) from executor to driver. I think it's a big problem. If I use 40 threads to get the result, it will need near 400mb momery and so exceed the limit of yarn fanally killed by yarn. If there is a way to limit the peek use of memory, it will be fine. In addtion, I though the user side number of remote fetch block threads is uncontrollable, it's better to be controlled in spark. [~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 release version. In my test case, I use the default memory: 1G executor and 384 overhead. But in the real case, momery is much more. was (Author: carlmartin): [~adav] Thx for comment. I can't understand what you say clearly. Do you mean if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only allocat the heap memory? Is it right? I test it again.And I had update the test program in the Description. I had set the _spark.shuffle.io.preferDirectBufs_ and when I type *test_driver(20,100)(test)* , the result is this: _ 76602 root 20 0 1597m 339m 25m S 0 0.1 0:04.89 java _ _76602 root 20 0 {color:red} 1777m {color} 1.0g 26m S 99 0.3 0:07.88 java _ _ 76602 root 20 0 1597m 880m 26m S 4 0.3 0:07.99 java_ The red num had raised about 180mb in the moment and total transfor 200mb data (20 * 10MB) from executor to driver. I think it's a big problem. If I use 40 threads to get the result, it will need near 400mb momery and so exceed the limit of yarn fanally killed by yarn. If there is a way to limit the peek use of memory, it will be fine. In addtion, I though the user side number of remote fetch block threads is uncontrollable, it's better to be controlled in spark. [~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 release version. In my test case, I use the default memory: 1G executor and 384 overhead. But in the real case, momery is much more. > Unlimit offHeap memory use cause RM killing the container > --------------------------------------------------------- > > Key: SPARK-6056 > URL: https://issues.apache.org/jira/browse/SPARK-6056 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core > Affects Versions: 1.2.1 > Reporter: SaintBacchus > > No matter set the `preferDirectBufs` or limit the number of thread or not > ,spark can not limit the use of offheap memory. > At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, > Netty had allocated a offheap memory buffer with the same size in heap. > So how many buffer you want to transfor, the same size offheap memory will be > allocated. > But once the allocated memory size reach the capacity of the overhead momery > set in yarn, this executor will be killed. > I wrote a simple code to test it: > {code:title=test.scala|borderStyle=solid} > import org.apache.spark.storage._ > import org.apache.spark._ > val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new > Array[Byte](10*1024*1024)).persist > bufferRdd.count > val part = bufferRdd.partitions(0) > val sparkEnv = SparkEnv.get > val blockMgr = sparkEnv.blockManager > def test = { > val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index)) > val resultIt = > blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]] > val len = resultIt.map(_.length).sum > println(s"[${Thread.currentThread.getId}] get block length = $len") > } > def test_driver(count:Int, parallel:Int)(f: => Unit) = { > val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel) > val taskSupport = new > scala.collection.parallel.ForkJoinTaskSupport(tpool) > val parseq = (1 to count).par > parseq.tasksupport = taskSupport > parseq.foreach(x=>f) > tpool.shutdown > tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS) > } > {code} > progress: > 1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1 > 2. :load test.scala in spark-shell > 3. use such comman to catch executor on slave node > {code} > pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p > $pid|grep $pid > {code} > 4. test_driver(20,100)(test) in spark-shell > 5. watch the output of the command on slave node > If use multi-thread to get len, the physical memery will soon exceed the > limit set by spark.yarn.executor.memoryOverhead -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org