Yeah, you are right. I ran the experiments locally not on YARN.

On Fri, Jul 27, 2018 at 11:54 PM, Vadim Semenov <va...@datadoghq.com> wrote:

> `spark.worker.cleanup.enabled=true` doesn't work for YARN.
> On Fri, Jul 27, 2018 at 8:52 AM dineshdharme <dineshdha...@gmail.com>
> wrote:
> >
> > I am trying to do few (union + reduceByKey) operations on a hiearchical
> > dataset in a iterative fashion in rdd. The first few loops run fine but
> on
> > the subsequent loops, the operations ends up using the whole scratch
> space
> > provided to it.
> >
> > I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> > having 100 GB space.
> > The heirarchical dataset, whose size is (< 400kB), remains constant
> > throughout the iterations.
> > I have tried the worker cleanup flag but it has no effect i.e.
> > "spark.worker.cleanup.enabled=true"
> >
> >
> >
> > Error :
> > Caused by: java.io.IOException: No space left on device
> > at java.io.FileOutputStream.writeBytes(Native Method)
> > at java.io.FileOutputStream.write(FileOutputStream.java:326)
> > at java.io.BufferedOutputStream.flushBuffer(
> BufferedOutputStream.java:82)
> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> > at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(
> IndexShuffleBlockResolver.scala:151)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(
> IndexShuffleBlockResolver.scala:149)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(
> IndexShuffleBlockResolver.scala:149)
> > at
> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> > at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.
> scala:149)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver.
> writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> > at
> > org.apache.spark.shuffle.sort.SortShuffleWriter.write(
> SortShuffleWriter.scala:73)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> > at org.apache.spark.scheduler.Task.run(Task.scala:109)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> >
> >
> > What I am trying to do (High Level):
> >
> > I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> > Child22 ) which are related in a hierarchical fashion as shown below.
> >
> > Parent-> Child1 -> Child2  -> Child21
> >
> > Parent-> Child1 -> Child2  -> Child22
> >
> > Each element in the tree has 14 columns (elementid, parentelement_id,
> cat1,
> > cat2, num1, num2,....., num10)
> >
> > I am trying to aggregate the values of one column of Child21 into Child1
> > (i.e. 2 levels up). I am doing the same for another column value of
> Child22
> > into Child1. Then I am merging these aggregated values at the same Child1
> > level.
> >
> > This is present in the code at location :
> >
> > spark.rddexample.dummyrdd.tree.child1.events.Function1
> >
> >
> > Code which replicates the issue:
> >
> > 1] https://github.com/dineshdharme/SparkRddShuffleIssue
> >
> >
> >
> > Steps to reproduce the issue :
> >
> > 1] Clone the above repository.
> >
> > 2] Put the csvs in the "issue-data" folder in the above repository at a
> > hadoop location "hdfs:///tree/dummy/data/"
> >
> > 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which
> has
> > large space. (> 100 GB)
> >
> > 4] Run "sbt assembly"
> >
> > 5] Run the following command at the project location
> >
> > /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
> > --class spark.rddexample.dummyrdd.FunctionExecutor \
> > --master local[2] \
> > --deploy-mode client \
> > --executor-memory 2G \
> > --driver-memory 2G \
> > target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
> > 20 \
> > hdfs:///tree/dummy/data/ \
> > hdfs:///tree/dummy/results/
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> --
> Sent from my iPhone
>

Reply via email to