Re: Shuffle files lifecycle
Thanks Silvio. On Mon, Jun 29, 2015 at 7:41 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Regarding 1 and 2, yes shuffle output is stored on the worker local > disks and will be reused across jobs as long as they’re available. You can > identify when they’re used by seeing skipped stages in the job UI. They are > periodically cleaned up based on available space of the configured > spark.local.dirs paths. > > From: Thomas Gerber > Date: Monday, June 29, 2015 at 10:12 PM > To: user > Subject: Shuffle files lifecycle > > Hello, > > It is my understanding that shuffle are written on disk and that they > act as checkpoints. > > I wonder if this is true only within a job, or across jobs. Please note > that I use the words job and stage carefully here. > > 1. can a shuffle created during JobN be used to skip many stages from > JobN+1? Or is the lifecycle of the shuffle files bound to the job that > created them? > > 2. when are shuffle files actually deleted? Is it TTL based or is it > cleaned when the job is over? > > 3. we have a very long batch application, and as it goes on, the number > of total tasks for each job gets larger and larger. It is not really a > problem, because most of those tasks will be skipped since we cache RDDs. > We noticed however that there is a delay in the actual start of a job of 1 > min for every 2M tasks in your job. Are there suggested workarounds to > avoid that delay? Maybe saving the RDD and re-loading it? > > Thanks > Thomas > >
Re: Shuffle files lifecycle
Regarding 1 and 2, yes shuffle output is stored on the worker local disks and will be reused across jobs as long as they’re available. You can identify when they’re used by seeing skipped stages in the job UI. They are periodically cleaned up based on available space of the configured spark.local.dirs paths. From: Thomas Gerber Date: Monday, June 29, 2015 at 10:12 PM To: user Subject: Shuffle files lifecycle Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files lifecycle
Ah, for #3, maybe this is what *rdd.checkpoint *does! https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD Thomas On Mon, Jun 29, 2015 at 7:12 PM, Thomas Gerber wrote: > Hello, > > It is my understanding that shuffle are written on disk and that they act > as checkpoints. > > I wonder if this is true only within a job, or across jobs. Please note > that I use the words job and stage carefully here. > > 1. can a shuffle created during JobN be used to skip many stages from > JobN+1? Or is the lifecycle of the shuffle files bound to the job that > created them? > > 2. when are shuffle files actually deleted? Is it TTL based or is it > cleaned when the job is over? > > 3. we have a very long batch application, and as it goes on, the number of > total tasks for each job gets larger and larger. It is not really a > problem, because most of those tasks will be skipped since we cache RDDs. > We noticed however that there is a delay in the actual start of a job of 1 > min for every 2M tasks in your job. Are there suggested workarounds to > avoid that delay? Maybe saving the RDD and re-loading it? > > Thanks > Thomas > >
Re: Shuffle files not cleaned up (Spark 1.2.1)
Hi TD, That little experiment helped a bit. This time we did not see any exceptions for about 16 hours but eventually it did throw the same exceptions as before. The cleaning of the shuffle files also stopped much before these exceptions happened - about 7-1/2 hours after startup. I am not quite sure how to proceed from here. Any suggestions on how to avoid these errors? Thanks NB On Fri, Apr 24, 2015 at 12:57 AM, N B wrote: > Hi TD, > > That may very well have been the case. There may be some delay on our > output side. I have made a change just for testing that sends the output > nowhere. I will see if that helps get rid of these errors. Then we can try > to find out how we can optimize so that we do not lag. > > Questions: How can we ever be sure that a lag even if temporary never > occur in the future? Also, should Spark not clean up any temp files that it > still knows it might need in the (near?) future. > > Thanks > Nikunj > > > On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das > wrote: > >> What was the state of your streaming application? Was it falling behind >> with a large increasing scheduling delay? >> >> TD >> >> On Thu, Apr 23, 2015 at 11:31 AM, N B wrote: >> >>> Thanks for the response, Conor. I tried with those settings and for a >>> while it seemed like it was cleaning up shuffle files after itself. >>> However, after exactly 5 hours later it started throwing exceptions and >>> eventually stopped working again. A sample stack trace is below. What is >>> curious about 5 hours is that I set the cleaner ttl to 5 hours after >>> changing the max window size to 1 hour (down from 6 hours in order to >>> test). It also stopped cleaning the shuffle files after this started >>> happening. >>> >>> Any idea why this could be happening? >>> >>> 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 >>> Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) >>> java.lang.Exception: Could not compute split, block >>> input-0-1429706099000 not found >>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Thanks >>> NB >>> >>> >>> On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell < >>> conor.fenn...@altocloud.com> wrote: >>> Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B wrote: > We already do have a cron job in place to clean just the shuffle > files. However, what I would really like to know is whether there is a > "proper" way of telling spark to clean up these files once its done with > them? > > Thanks > NB > > > On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele < > gangele...@gmail.com> wrote: > >> Write a crone job for this like below >> >> 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} >> \+ >> 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune >> -exec rm -rf {} \+ >> 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d >> -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ >> >> >> On 20 April 2015 at 23:12, N B wrote: >> >>> Hi all, >>> >>> I had posed this query as part of a different thread but did not get >>> a response there. So creating a new thread hoping to catch someone's >>> attention. >>> >>> We are experiencing this issue of shuffle files being left behind >>> and not being cleaned up by Spark. Since this is a Spark streaming >>> application, it is expected to stay up indefinitely, so shuffle files >>> not >>> being cleaned up is a big problem right now.
Re: Shuffle files not cleaned up (Spark 1.2.1)
Hi TD, That may very well have been the case. There may be some delay on our output side. I have made a change just for testing that sends the output nowhere. I will see if that helps get rid of these errors. Then we can try to find out how we can optimize so that we do not lag. Questions: How can we ever be sure that a lag even if temporary never occur in the future? Also, should Spark not clean up any temp files that it still knows it might need in the (near?) future. Thanks Nikunj On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das wrote: > What was the state of your streaming application? Was it falling behind > with a large increasing scheduling delay? > > TD > > On Thu, Apr 23, 2015 at 11:31 AM, N B wrote: > >> Thanks for the response, Conor. I tried with those settings and for a >> while it seemed like it was cleaning up shuffle files after itself. >> However, after exactly 5 hours later it started throwing exceptions and >> eventually stopped working again. A sample stack trace is below. What is >> curious about 5 hours is that I set the cleaner ttl to 5 hours after >> changing the max window size to 1 hour (down from 6 hours in order to >> test). It also stopped cleaning the shuffle files after this started >> happening. >> >> Any idea why this could be happening? >> >> 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 >> Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) >> java.lang.Exception: Could not compute split, block input-0-1429706099000 >> not found >> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Thanks >> NB >> >> >> On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell < >> conor.fenn...@altocloud.com> wrote: >> >>> Hi, >>> >>> >>> We set the spark.cleaner.ttl to some reasonable time and also >>> set spark.streaming.unpersist=true. >>> >>> >>> Those together cleaned up the shuffle files for us. >>> >>> >>> -Conor >>> >>> On Tue, Apr 21, 2015 at 8:18 AM, N B wrote: >>> We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a "proper" way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele < gangele...@gmail.com> wrote: > Write a crone job for this like below > > 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ > 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune > -exec rm -rf {} \+ > 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d > -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ > > > On 20 April 2015 at 23:12, N B wrote: > >> Hi all, >> >> I had posed this query as part of a different thread but did not get >> a response there. So creating a new thread hoping to catch someone's >> attention. >> >> We are experiencing this issue of shuffle files being left behind and >> not being cleaned up by Spark. Since this is a Spark streaming >> application, >> it is expected to stay up indefinitely, so shuffle files not being >> cleaned >> up is a big problem right now. Our max window size is 6 hours, so we have >> set up a cron job to clean up shuffle files older than 12 hours otherwise >> it will eat up all our disk space. >> >> Please see the following. It seems the non-cleaning of shuffle files >> is being documented in 1.3.1. >> >> https://github.com/apache/spark/pull/5074/files >> https://issues.apache.org/jira/browse/SPARK-5836 >> >> >> Also, for some reason, the following JIRAs that were reported as >> functional issues were closed as Duplicates of the above Documentation >> bug.
Re: Shuffle files not cleaned up (Spark 1.2.1)
What was the state of your streaming application? Was it falling behind with a large increasing scheduling delay? TD On Thu, Apr 23, 2015 at 11:31 AM, N B wrote: > Thanks for the response, Conor. I tried with those settings and for a > while it seemed like it was cleaning up shuffle files after itself. > However, after exactly 5 hours later it started throwing exceptions and > eventually stopped working again. A sample stack trace is below. What is > curious about 5 hours is that I set the cleaner ttl to 5 hours after > changing the max window size to 1 hour (down from 6 hours in order to > test). It also stopped cleaning the shuffle files after this started > happening. > > Any idea why this could be happening? > > 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 > Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) > java.lang.Exception: Could not compute split, block input-0-1429706099000 > not found > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Thanks > NB > > > On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell < > conor.fenn...@altocloud.com> wrote: > >> Hi, >> >> >> We set the spark.cleaner.ttl to some reasonable time and also >> set spark.streaming.unpersist=true. >> >> >> Those together cleaned up the shuffle files for us. >> >> >> -Conor >> >> On Tue, Apr 21, 2015 at 8:18 AM, N B wrote: >> >>> We already do have a cron job in place to clean just the shuffle files. >>> However, what I would really like to know is whether there is a "proper" >>> way of telling spark to clean up these files once its done with them? >>> >>> Thanks >>> NB >>> >>> >>> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele < >>> gangele...@gmail.com> wrote: >>> Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B wrote: > Hi all, > > I had posed this query as part of a different thread but did not get a > response there. So creating a new thread hoping to catch someone's > attention. > > We are experiencing this issue of shuffle files being left behind and > not being cleaned up by Spark. Since this is a Spark streaming > application, > it is expected to stay up indefinitely, so shuffle files not being cleaned > up is a big problem right now. Our max window size is 6 hours, so we have > set up a cron job to clean up shuffle files older than 12 hours otherwise > it will eat up all our disk space. > > Please see the following. It seems the non-cleaning of shuffle files > is being documented in 1.3.1. > > https://github.com/apache/spark/pull/5074/files > https://issues.apache.org/jira/browse/SPARK-5836 > > > Also, for some reason, the following JIRAs that were reported as > functional issues were closed as Duplicates of the above Documentation > bug. > Does this mean that this issue won't be tackled at all? > > https://issues.apache.org/jira/browse/SPARK-3563 > https://issues.apache.org/jira/browse/SPARK-4796 > https://issues.apache.org/jira/browse/SPARK-6011 > > Any further insight into whether this is being looked into and > meanwhile how to handle shuffle files will be greatly appreciated. > > Thanks > NB > > >>> >> >
Re: Shuffle files not cleaned up (Spark 1.2.1)
Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell wrote: > Hi, > > > We set the spark.cleaner.ttl to some reasonable time and also > set spark.streaming.unpersist=true. > > > Those together cleaned up the shuffle files for us. > > > -Conor > > On Tue, Apr 21, 2015 at 8:18 AM, N B wrote: > >> We already do have a cron job in place to clean just the shuffle files. >> However, what I would really like to know is whether there is a "proper" >> way of telling spark to clean up these files once its done with them? >> >> Thanks >> NB >> >> >> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele > > wrote: >> >>> Write a crone job for this like below >>> >>> 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ >>> 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune >>> -exec rm -rf {} \+ >>> 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin >>> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ >>> >>> >>> On 20 April 2015 at 23:12, N B wrote: >>> Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB >>> >>> >>> >>> >> >
Re: Shuffle files not cleaned up (Spark 1.2.1)
Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B wrote: > We already do have a cron job in place to clean just the shuffle files. > However, what I would really like to know is whether there is a "proper" > way of telling spark to clean up these files once its done with them? > > Thanks > NB > > > On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele > wrote: > >> Write a crone job for this like below >> >> 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ >> 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune >> -exec rm -rf {} \+ >> 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin >> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ >> >> >> On 20 April 2015 at 23:12, N B wrote: >> >>> Hi all, >>> >>> I had posed this query as part of a different thread but did not get a >>> response there. So creating a new thread hoping to catch someone's >>> attention. >>> >>> We are experiencing this issue of shuffle files being left behind and >>> not being cleaned up by Spark. Since this is a Spark streaming application, >>> it is expected to stay up indefinitely, so shuffle files not being cleaned >>> up is a big problem right now. Our max window size is 6 hours, so we have >>> set up a cron job to clean up shuffle files older than 12 hours otherwise >>> it will eat up all our disk space. >>> >>> Please see the following. It seems the non-cleaning of shuffle files is >>> being documented in 1.3.1. >>> >>> https://github.com/apache/spark/pull/5074/files >>> https://issues.apache.org/jira/browse/SPARK-5836 >>> >>> >>> Also, for some reason, the following JIRAs that were reported as >>> functional issues were closed as Duplicates of the above Documentation bug. >>> Does this mean that this issue won't be tackled at all? >>> >>> https://issues.apache.org/jira/browse/SPARK-3563 >>> https://issues.apache.org/jira/browse/SPARK-4796 >>> https://issues.apache.org/jira/browse/SPARK-6011 >>> >>> Any further insight into whether this is being looked into and meanwhile >>> how to handle shuffle files will be greatly appreciated. >>> >>> Thanks >>> NB >>> >>> >> >> >> >> >
Re: Shuffle files not cleaned up (Spark 1.2.1)
We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a "proper" way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele wrote: > Write a crone job for this like below > > 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ > 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune -exec > rm -rf {} \+ > 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin > +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ > > > On 20 April 2015 at 23:12, N B wrote: > >> Hi all, >> >> I had posed this query as part of a different thread but did not get a >> response there. So creating a new thread hoping to catch someone's >> attention. >> >> We are experiencing this issue of shuffle files being left behind and not >> being cleaned up by Spark. Since this is a Spark streaming application, it >> is expected to stay up indefinitely, so shuffle files not being cleaned up >> is a big problem right now. Our max window size is 6 hours, so we have set >> up a cron job to clean up shuffle files older than 12 hours otherwise it >> will eat up all our disk space. >> >> Please see the following. It seems the non-cleaning of shuffle files is >> being documented in 1.3.1. >> >> https://github.com/apache/spark/pull/5074/files >> https://issues.apache.org/jira/browse/SPARK-5836 >> >> >> Also, for some reason, the following JIRAs that were reported as >> functional issues were closed as Duplicates of the above Documentation bug. >> Does this mean that this issue won't be tackled at all? >> >> https://issues.apache.org/jira/browse/SPARK-3563 >> https://issues.apache.org/jira/browse/SPARK-4796 >> https://issues.apache.org/jira/browse/SPARK-6011 >> >> Any further insight into whether this is being looked into and meanwhile >> how to handle shuffle files will be greatly appreciated. >> >> Thanks >> NB >> >> > > > >
Re: Shuffle files not cleaned up (Spark 1.2.1)
Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B wrote: > Hi all, > > I had posed this query as part of a different thread but did not get a > response there. So creating a new thread hoping to catch someone's > attention. > > We are experiencing this issue of shuffle files being left behind and not > being cleaned up by Spark. Since this is a Spark streaming application, it > is expected to stay up indefinitely, so shuffle files not being cleaned up > is a big problem right now. Our max window size is 6 hours, so we have set > up a cron job to clean up shuffle files older than 12 hours otherwise it > will eat up all our disk space. > > Please see the following. It seems the non-cleaning of shuffle files is > being documented in 1.3.1. > > https://github.com/apache/spark/pull/5074/files > https://issues.apache.org/jira/browse/SPARK-5836 > > > Also, for some reason, the following JIRAs that were reported as > functional issues were closed as Duplicates of the above Documentation bug. > Does this mean that this issue won't be tackled at all? > > https://issues.apache.org/jira/browse/SPARK-3563 > https://issues.apache.org/jira/browse/SPARK-4796 > https://issues.apache.org/jira/browse/SPARK-6011 > > Any further insight into whether this is being looked into and meanwhile > how to handle shuffle files will be greatly appreciated. > > Thanks > NB > >
RE: Shuffle files
Hi Song, For what I know in sort-based shuffle. Normally parallel opened file numbers for sort-based shuffle is much smaller than hash-based shuffle. In hash based shuffle, parallel opened file numbers is C * R (where C is core number used and R is the reducer number), as you can see the file numbers are related to reducer number, no matter how large the shuffle size is. While in sort-based shuffle, final map output file is only 1, to achieve this we need to do by-partition sorting, this will generate some intermediate spilling files, but spilled file numbers are related to shuffle size and memory size for shuffle, no relation to reducer number. So If you met “too many open files” in sort-based shuffle, I guess that you have so many spilled files while doing shuffle write, one possible way to alleviate this is to increase the shuffle memory usage, also change the ulimit is a possible way. I guess in Yarn you have to do system configuration manually, Spark cannot set ulimit automatically for you, I don’t think it’s an issue Spark should take care. Thanks Jerry From: Chen Song [mailto:chen.song...@gmail.com] Sent: Tuesday, October 21, 2014 9:10 AM To: Andrew Ash Cc: Sunny Khatri; Lisonbee, Todd; u...@spark.incubator.apache.org Subject: Re: Shuffle files My observation is opposite. When my job runs under default spark.shuffle.manager, I don't see this exception. However, when it runs with SORT based, I start seeing this error? How would that be possible? I am running my job in YARN, and I noticed that the YARN process limits (cat /proc/$PID/limits) are not consistent with system wide limits (shown by limit -a), I don't know how that happened. Is there a way to let Spark driver to propagate this setting (limit -n ) to spark executors before startup? On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash mailto:and...@andrewash.com>> wrote: You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri mailto:sunny.k...@gmail.com>> wrote: @SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd mailto:todd.lison...@intel.com>> wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? "In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem." Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com<mailto:skrishna...@gmail.com>] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org> Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same "too many open files" warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> -- Chen Song
Re: Shuffle files
My observation is opposite. When my job runs under default spark.shuffle.manager, I don't see this exception. However, when it runs with SORT based, I start seeing this error? How would that be possible? I am running my job in YARN, and I noticed that the YARN process limits (cat /proc/$PID/limits) are not consistent with system wide limits (shown by limit -a), I don't know how that happened. Is there a way to let Spark driver to propagate this setting (limit -n ) to spark executors before startup? On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash wrote: > You will need to restart your Mesos workers to pick up the new limits as > well. > > On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri wrote: > >> @SK: >> Make sure ulimit has taken effect as Todd mentioned. You can verify via >> ulimit -a. Also make sure you have proper kernel parameters set in >> /etc/sysctl.conf (MacOSX) >> >> On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd >> wrote: >> >>> >>> Are you sure the new ulimit has taken effect? >>> >>> How many cores are you using? How many reducers? >>> >>> "In general if a node in your cluster has C assigned cores and >>> you run >>> a job with X reducers then Spark will open C*X files in parallel >>> and >>> start writing. Shuffle consolidation will help decrease the total >>> number of files created but the number of file handles open at >>> any >>> time doesn't change so it won't help the ulimit problem." >>> >>> Quoted from Patrick at: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html >>> >>> Thanks, >>> >>> Todd >>> >>> -Original Message- >>> From: SK [mailto:skrishna...@gmail.com] >>> Sent: Tuesday, October 7, 2014 2:12 PM >>> To: u...@spark.incubator.apache.org >>> Subject: Re: Shuffle files >>> >>> - We set ulimit to 50. But I still get the same "too many open files" >>> warning. >>> >>> - I tried setting consolidateFiles to True, but that did not help either. >>> >>> I am using a Mesos cluster. Does Mesos have any limit on the number of >>> open files? >>> >>> thanks >>> >>> >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > -- Chen Song
Re: Shuffle files
You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri wrote: > @SK: > Make sure ulimit has taken effect as Todd mentioned. You can verify via > ulimit -a. Also make sure you have proper kernel parameters set in > /etc/sysctl.conf (MacOSX) > > On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd > wrote: > >> >> Are you sure the new ulimit has taken effect? >> >> How many cores are you using? How many reducers? >> >> "In general if a node in your cluster has C assigned cores and >> you run >> a job with X reducers then Spark will open C*X files in parallel >> and >> start writing. Shuffle consolidation will help decrease the total >> number of files created but the number of file handles open at any >> time doesn't change so it won't help the ulimit problem." >> >> Quoted from Patrick at: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html >> >> Thanks, >> >> Todd >> >> -Original Message- >> From: SK [mailto:skrishna...@gmail.com] >> Sent: Tuesday, October 7, 2014 2:12 PM >> To: u...@spark.incubator.apache.org >> Subject: Re: Shuffle files >> >> - We set ulimit to 50. But I still get the same "too many open files" >> warning. >> >> - I tried setting consolidateFiles to True, but that did not help either. >> >> I am using a Mesos cluster. Does Mesos have any limit on the number of >> open files? >> >> thanks >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Shuffle files
@SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd wrote: > > Are you sure the new ulimit has taken effect? > > How many cores are you using? How many reducers? > > "In general if a node in your cluster has C assigned cores and you > run > a job with X reducers then Spark will open C*X files in parallel > and > start writing. Shuffle consolidation will help decrease the total > number of files created but the number of file handles open at any > time doesn't change so it won't help the ulimit problem." > > Quoted from Patrick at: > > http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html > > Thanks, > > Todd > > -Original Message- > From: SK [mailto:skrishna...@gmail.com] > Sent: Tuesday, October 7, 2014 2:12 PM > To: u...@spark.incubator.apache.org > Subject: Re: Shuffle files > > - We set ulimit to 50. But I still get the same "too many open files" > warning. > > - I tried setting consolidateFiles to True, but that did not help either. > > I am using a Mesos cluster. Does Mesos have any limit on the number of > open files? > > thanks > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
RE: Shuffle files
Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? "In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem." Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same "too many open files" warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
- We set ulimit to 50. But I still get the same "too many open files" warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
Hi SK, For the problem with lots of shuffle files and the "too many open files" exception there are a couple options: 1. The linux kernel has a limit on the number of open files at once. This is set with ulimit -n, and can be set permanently in /etc/sysctl.conf or /etc/sysctl.d/. Try increasing this to a large value, at the bare minimum the square of your partition count. 2. Try using shuffle consolidation -- spark.shuffle.consolidateFiles=true This option writes fewer files to disk so shouldn't hit limits nearly as much 3. Try using the sort-based shuffle by setting spark.shuffle.manager=SORT. You should likely hold off on this until https://issues.apache.org/jira/browse/SPARK-3032 is fixed, hopefully in 1.1.1 Hope that helps! Andrew On Thu, Sep 25, 2014 at 4:20 PM, SK wrote: > Hi, > > I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a > directory (I am using sc.textfile("dir/*") ) to read in the files. I am > getting the following warning: > > WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99, > mesos12-dev.sccps.net): java.io.FileNotFoundException: > /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open > files) > > basically I think a lot of shuffle files are being created. > > 1) The tasks eventually fail and the job just hangs (after taking very > long, > more than an hour). If I read these 30 files in a for loop, the same job > completes in a few minutes. However, I need to specify the files names, > which is not convenient. I am assuming that sc.textfile("dir/*") creates a > large RDD for all the 30 files. Is there a way to make the operation on > this > large RDD efficient so as to avoid creating too many shuffle files? > > > 2) Also, I am finding that all the shuffle files for my other completed > jobs > are not being automatically deleted even after days. I thought that > sc.stop() clears the intermediate files. Is there some way to > programmatically delete these temp shuffle files upon job completion? > > > thanks > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Shuffle Files
>From BlockManager code + ShuffleMapTask code, it writes under spark.local.dir or java.io.tmpdir. val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) On Mon, Mar 3, 2014 at 10:45 PM, Usman Ghani wrote: > Where on the filesystem does spark write the shuffle files? > -- "...:::Aniket:::... Quetzalco@tl"