Yeah, you are right. I ran the experiments locally not on YARN. On Fri, Jul 27, 2018 at 11:54 PM, Vadim Semenov <va...@datadoghq.com> wrote:
> `spark.worker.cleanup.enabled=true` doesn't work for YARN. > On Fri, Jul 27, 2018 at 8:52 AM dineshdharme <dineshdha...@gmail.com> > wrote: > > > > I am trying to do few (union + reduceByKey) operations on a hiearchical > > dataset in a iterative fashion in rdd. The first few loops run fine but > on > > the subsequent loops, the operations ends up using the whole scratch > space > > provided to it. > > > > I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one > > having 100 GB space. > > The heirarchical dataset, whose size is (< 400kB), remains constant > > throughout the iterations. > > I have tried the worker cleanup flag but it has no effect i.e. > > "spark.worker.cleanup.enabled=true" > > > > > > > > Error : > > Caused by: java.io.IOException: No space left on device > > at java.io.FileOutputStream.writeBytes(Native Method) > > at java.io.FileOutputStream.write(FileOutputStream.java:326) > > at java.io.BufferedOutputStream.flushBuffer( > BufferedOutputStream.java:82) > > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > > at java.io.DataOutputStream.writeLong(DataOutputStream.java:224) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp( > IndexShuffleBlockResolver.scala:151) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply( > IndexShuffleBlockResolver.scala:149) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply( > IndexShuffleBlockResolver.scala:149) > > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized. > scala:33) > > at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver. > scala:149) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$ > writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > > at > > org.apache.spark.shuffle.IndexShuffleBlockResolver. > writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153) > > at > > org.apache.spark.shuffle.sort.SortShuffleWriter.write( > SortShuffleWriter.scala:73) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:96) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:53) > > at org.apache.spark.scheduler.Task.run(Task.scala:109) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > > > What I am trying to do (High Level): > > > > I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, > > Child22 ) which are related in a hierarchical fashion as shown below. > > > > Parent-> Child1 -> Child2 -> Child21 > > > > Parent-> Child1 -> Child2 -> Child22 > > > > Each element in the tree has 14 columns (elementid, parentelement_id, > cat1, > > cat2, num1, num2,....., num10) > > > > I am trying to aggregate the values of one column of Child21 into Child1 > > (i.e. 2 levels up). I am doing the same for another column value of > Child22 > > into Child1. Then I am merging these aggregated values at the same Child1 > > level. > > > > This is present in the code at location : > > > > spark.rddexample.dummyrdd.tree.child1.events.Function1 > > > > > > Code which replicates the issue: > > > > 1] https://github.com/dineshdharme/SparkRddShuffleIssue > > > > > > > > Steps to reproduce the issue : > > > > 1] Clone the above repository. > > > > 2] Put the csvs in the "issue-data" folder in the above repository at a > > hadoop location "hdfs:///tree/dummy/data/" > > > > 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which > has > > large space. (> 100 GB) > > > > 4] Run "sbt assembly" > > > > 5] Run the following command at the project location > > > > /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \ > > --class spark.rddexample.dummyrdd.FunctionExecutor \ > > --master local[2] \ > > --deploy-mode client \ > > --executor-memory 2G \ > > --driver-memory 2G \ > > target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \ > > 20 \ > > hdfs:///tree/dummy/data/ \ > > hdfs:///tree/dummy/results/ > > > > > > > > -- > > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > -- > Sent from my iPhone >