Re: Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Thanks Silvio.


On Mon, Jun 29, 2015 at 7:41 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>   Regarding 1 and 2, yes shuffle output is stored on the worker local
> disks and will be reused across jobs as long as they’re available. You can
> identify when they’re used by seeing skipped stages in the job UI. They are
> periodically cleaned up based on available space of the configured
> spark.local.dirs paths.
>
>   From: Thomas Gerber
> Date: Monday, June 29, 2015 at 10:12 PM
> To: user
> Subject: Shuffle files lifecycle
>
>   Hello,
>
>  It is my understanding that shuffle are written on disk and that they
> act as checkpoints.
>
>  I wonder if this is true only within a job, or across jobs. Please note
> that I use the words job and stage carefully here.
>
>  1. can a shuffle created during JobN be used to skip many stages from
> JobN+1? Or is the lifecycle of the shuffle files bound to the job that
> created them?
>
>  2. when are shuffle files actually deleted? Is it TTL based or is it
> cleaned when the job is over?
>
>  3. we have a very long batch application, and as it goes on, the number
> of total tasks for each job gets larger and larger. It is not really a
> problem, because most of those tasks will be skipped since we cache RDDs.
> We noticed however that there is a delay in the actual start of a job of 1
> min for every 2M tasks in your job. Are there suggested workarounds to
> avoid that delay? Maybe saving the RDD and re-loading it?
>
>  Thanks
> Thomas
>
>


Re: Shuffle files lifecycle

2015-06-29 Thread Silvio Fiorito
Regarding 1 and 2, yes shuffle output is stored on the worker local disks and 
will be reused across jobs as long as they’re available. You can identify when 
they’re used by seeing skipped stages in the job UI. They are periodically 
cleaned up based on available space of the configured spark.local.dirs paths.

From: Thomas Gerber
Date: Monday, June 29, 2015 at 10:12 PM
To: user
Subject: Shuffle files lifecycle

Hello,

It is my understanding that shuffle are written on disk and that they act as 
checkpoints.

I wonder if this is true only within a job, or across jobs. Please note that I 
use the words job and stage carefully here.

1. can a shuffle created during JobN be used to skip many stages from JobN+1? 
Or is the lifecycle of the shuffle files bound to the job that created them?

2. when are shuffle files actually deleted? Is it TTL based or is it cleaned 
when the job is over?

3. we have a very long batch application, and as it goes on, the number of 
total tasks for each job gets larger and larger. It is not really a problem, 
because most of those tasks will be skipped since we cache RDDs. We noticed 
however that there is a delay in the actual start of a job of 1 min for every 
2M tasks in your job. Are there suggested workarounds to avoid that delay? 
Maybe saving the RDD and re-loading it?

Thanks
Thomas



Re: Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Ah, for #3, maybe this is what *rdd.checkpoint *does!
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

Thomas


On Mon, Jun 29, 2015 at 7:12 PM, Thomas Gerber 
wrote:

> Hello,
>
> It is my understanding that shuffle are written on disk and that they act
> as checkpoints.
>
> I wonder if this is true only within a job, or across jobs. Please note
> that I use the words job and stage carefully here.
>
> 1. can a shuffle created during JobN be used to skip many stages from
> JobN+1? Or is the lifecycle of the shuffle files bound to the job that
> created them?
>
> 2. when are shuffle files actually deleted? Is it TTL based or is it
> cleaned when the job is over?
>
> 3. we have a very long batch application, and as it goes on, the number of
> total tasks for each job gets larger and larger. It is not really a
> problem, because most of those tasks will be skipped since we cache RDDs.
> We noticed however that there is a delay in the actual start of a job of 1
> min for every 2M tasks in your job. Are there suggested workarounds to
> avoid that delay? Maybe saving the RDD and re-loading it?
>
> Thanks
> Thomas
>
>


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-24 Thread N B
Hi TD,

That little experiment helped a bit. This time we did not see any
exceptions for about 16 hours but eventually it did throw the same
exceptions as before. The cleaning of the shuffle files also stopped much
before these exceptions happened - about 7-1/2 hours after startup.

I am not quite sure how to proceed from here. Any suggestions on how to
avoid these errors?

Thanks
NB


On Fri, Apr 24, 2015 at 12:57 AM, N B  wrote:

> Hi TD,
>
> That may very well have been the case. There may be some delay on our
> output side. I have made a change just for testing that sends the output
> nowhere. I will see if that helps get rid of these errors. Then we can try
> to find out how we can optimize so that we do not lag.
>
> Questions: How can we ever be sure that a lag even if temporary never
> occur in the future? Also, should Spark not clean up any temp files that it
> still knows it might need in the (near?) future.
>
> Thanks
> Nikunj
>
>
> On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das 
> wrote:
>
>> What was the state of your streaming application? Was it falling behind
>> with a large increasing scheduling delay?
>>
>> TD
>>
>> On Thu, Apr 23, 2015 at 11:31 AM, N B  wrote:
>>
>>> Thanks for the response, Conor. I tried with those settings and for a
>>> while it seemed like it was cleaning up shuffle files after itself.
>>> However, after exactly 5 hours later it started throwing exceptions and
>>> eventually stopped working again. A sample stack trace is below. What is
>>> curious about 5 hours is that I set the cleaner ttl to 5 hours after
>>> changing the max window size to 1 hour (down from 6 hours in order to
>>> test). It also stopped cleaning the shuffle files after this started
>>> happening.
>>>
>>> Any idea why this could be happening?
>>>
>>> 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
>>> Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
>>> java.lang.Exception: Could not compute split, block
>>> input-0-1429706099000 not found
>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Thanks
>>> NB
>>>
>>>
>>> On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell <
>>> conor.fenn...@altocloud.com> wrote:
>>>
 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:

> We already do have a cron job in place to clean just the shuffle
> files. However, what I would really like to know is whether there is a
> "proper" way of telling spark to clean up these files once its done with
> them?
>
> Thanks
> NB
>
>
> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele <
> gangele...@gmail.com> wrote:
>
>> Write a crone job for this like below
>>
>> 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {}
>> \+
>> 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
>> -exec rm -rf {} \+
>> 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d
>> -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>>
>>
>> On 20 April 2015 at 23:12, N B  wrote:
>>
>>> Hi all,
>>>
>>> I had posed this query as part of a different thread but did not get
>>> a response there. So creating a new thread hoping to catch someone's
>>> attention.
>>>
>>> We are experiencing this issue of shuffle files being left behind
>>> and not being cleaned up by Spark. Since this is a Spark streaming
>>> application, it is expected to stay up indefinitely, so shuffle files 
>>> not
>>> being cleaned up is a big problem right now. 

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-24 Thread N B
Hi TD,

That may very well have been the case. There may be some delay on our
output side. I have made a change just for testing that sends the output
nowhere. I will see if that helps get rid of these errors. Then we can try
to find out how we can optimize so that we do not lag.

Questions: How can we ever be sure that a lag even if temporary never occur
in the future? Also, should Spark not clean up any temp files that it still
knows it might need in the (near?) future.

Thanks
Nikunj


On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das  wrote:

> What was the state of your streaming application? Was it falling behind
> with a large increasing scheduling delay?
>
> TD
>
> On Thu, Apr 23, 2015 at 11:31 AM, N B  wrote:
>
>> Thanks for the response, Conor. I tried with those settings and for a
>> while it seemed like it was cleaning up shuffle files after itself.
>> However, after exactly 5 hours later it started throwing exceptions and
>> eventually stopped working again. A sample stack trace is below. What is
>> curious about 5 hours is that I set the cleaner ttl to 5 hours after
>> changing the max window size to 1 hour (down from 6 hours in order to
>> test). It also stopped cleaning the shuffle files after this started
>> happening.
>>
>> Any idea why this could be happening?
>>
>> 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
>> Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
>> java.lang.Exception: Could not compute split, block input-0-1429706099000
>> not found
>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Thanks
>> NB
>>
>>
>> On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell <
>> conor.fenn...@altocloud.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> We set the spark.cleaner.ttl to some reasonable time and also
>>> set spark.streaming.unpersist=true.
>>>
>>>
>>> Those together cleaned up the shuffle files for us.
>>>
>>>
>>> -Conor
>>>
>>> On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:
>>>
 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a "proper"
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele <
 gangele...@gmail.com> wrote:

> Write a crone job for this like below
>
> 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
> 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
> -exec rm -rf {} \+
> 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d
> -cmin +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>
>
> On 20 April 2015 at 23:12, N B  wrote:
>
>> Hi all,
>>
>> I had posed this query as part of a different thread but did not get
>> a response there. So creating a new thread hoping to catch someone's
>> attention.
>>
>> We are experiencing this issue of shuffle files being left behind and
>> not being cleaned up by Spark. Since this is a Spark streaming 
>> application,
>> it is expected to stay up indefinitely, so shuffle files not being 
>> cleaned
>> up is a big problem right now. Our max window size is 6 hours, so we have
>> set up a cron job to clean up shuffle files older than 12 hours otherwise
>> it will eat up all our disk space.
>>
>> Please see the following. It seems the non-cleaning of shuffle files
>> is being documented in 1.3.1.
>>
>> https://github.com/apache/spark/pull/5074/files
>> https://issues.apache.org/jira/browse/SPARK-5836
>>
>>
>> Also, for some reason, the following JIRAs that were reported as
>> functional issues were closed as Duplicates of the above Documentation 
>> bug.

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread Tathagata Das
What was the state of your streaming application? Was it falling behind
with a large increasing scheduling delay?

TD

On Thu, Apr 23, 2015 at 11:31 AM, N B  wrote:

> Thanks for the response, Conor. I tried with those settings and for a
> while it seemed like it was cleaning up shuffle files after itself.
> However, after exactly 5 hours later it started throwing exceptions and
> eventually stopped working again. A sample stack trace is below. What is
> curious about 5 hours is that I set the cleaner ttl to 5 hours after
> changing the max window size to 1 hour (down from 6 hours in order to
> test). It also stopped cleaning the shuffle files after this started
> happening.
>
> Any idea why this could be happening?
>
> 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
> Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
> java.lang.Exception: Could not compute split, block input-0-1429706099000
> not found
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks
> NB
>
>
> On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell <
> conor.fenn...@altocloud.com> wrote:
>
>> Hi,
>>
>>
>> We set the spark.cleaner.ttl to some reasonable time and also
>> set spark.streaming.unpersist=true.
>>
>>
>> Those together cleaned up the shuffle files for us.
>>
>>
>> -Conor
>>
>> On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:
>>
>>> We already do have a cron job in place to clean just the shuffle files.
>>> However, what I would really like to know is whether there is a "proper"
>>> way of telling spark to clean up these files once its done with them?
>>>
>>> Thanks
>>> NB
>>>
>>>
>>> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele <
>>> gangele...@gmail.com> wrote:
>>>
 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B  wrote:

> Hi all,
>
> I had posed this query as part of a different thread but did not get a
> response there. So creating a new thread hoping to catch someone's
> attention.
>
> We are experiencing this issue of shuffle files being left behind and
> not being cleaned up by Spark. Since this is a Spark streaming 
> application,
> it is expected to stay up indefinitely, so shuffle files not being cleaned
> up is a big problem right now. Our max window size is 6 hours, so we have
> set up a cron job to clean up shuffle files older than 12 hours otherwise
> it will eat up all our disk space.
>
> Please see the following. It seems the non-cleaning of shuffle files
> is being documented in 1.3.1.
>
> https://github.com/apache/spark/pull/5074/files
> https://issues.apache.org/jira/browse/SPARK-5836
>
>
> Also, for some reason, the following JIRAs that were reported as
> functional issues were closed as Duplicates of the above Documentation 
> bug.
> Does this mean that this issue won't be tackled at all?
>
> https://issues.apache.org/jira/browse/SPARK-3563
> https://issues.apache.org/jira/browse/SPARK-4796
> https://issues.apache.org/jira/browse/SPARK-6011
>
> Any further insight into whether this is being looked into and
> meanwhile how to handle shuffle files will be greatly appreciated.
>
> Thanks
> NB
>
>




>>>
>>
>


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread N B
Thanks for the response, Conor. I tried with those settings and for a while
it seemed like it was cleaning up shuffle files after itself. However,
after exactly 5 hours later it started throwing exceptions and eventually
stopped working again. A sample stack trace is below. What is curious about
5 hours is that I set the cleaner ttl to 5 hours after changing the max
window size to 1 hour (down from 6 hours in order to test). It also stopped
cleaning the shuffle files after this started happening.

Any idea why this could be happening?

2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
java.lang.Exception: Could not compute split, block input-0-1429706099000
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks
NB


On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
wrote:

> Hi,
>
>
> We set the spark.cleaner.ttl to some reasonable time and also
> set spark.streaming.unpersist=true.
>
>
> Those together cleaned up the shuffle files for us.
>
>
> -Conor
>
> On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:
>
>> We already do have a cron job in place to clean just the shuffle files.
>> However, what I would really like to know is whether there is a "proper"
>> way of telling spark to clean up these files once its done with them?
>>
>> Thanks
>> NB
>>
>>
>> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele > > wrote:
>>
>>> Write a crone job for this like below
>>>
>>> 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
>>> 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
>>> -exec rm -rf {} \+
>>> 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
>>> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>>>
>>>
>>> On 20 April 2015 at 23:12, N B  wrote:
>>>
 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB


>>>
>>>
>>>
>>>
>>
>


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-21 Thread Conor Fennell
Hi,


We set the spark.cleaner.ttl to some reasonable time and also
set spark.streaming.unpersist=true.


Those together cleaned up the shuffle files for us.


-Conor

On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:

> We already do have a cron job in place to clean just the shuffle files.
> However, what I would really like to know is whether there is a "proper"
> way of telling spark to clean up these files once its done with them?
>
> Thanks
> NB
>
>
> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
> wrote:
>
>> Write a crone job for this like below
>>
>> 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
>> 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
>> -exec rm -rf {} \+
>> 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
>> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>>
>>
>> On 20 April 2015 at 23:12, N B  wrote:
>>
>>> Hi all,
>>>
>>> I had posed this query as part of a different thread but did not get a
>>> response there. So creating a new thread hoping to catch someone's
>>> attention.
>>>
>>> We are experiencing this issue of shuffle files being left behind and
>>> not being cleaned up by Spark. Since this is a Spark streaming application,
>>> it is expected to stay up indefinitely, so shuffle files not being cleaned
>>> up is a big problem right now. Our max window size is 6 hours, so we have
>>> set up a cron job to clean up shuffle files older than 12 hours otherwise
>>> it will eat up all our disk space.
>>>
>>> Please see the following. It seems the non-cleaning of shuffle files is
>>> being documented in 1.3.1.
>>>
>>> https://github.com/apache/spark/pull/5074/files
>>> https://issues.apache.org/jira/browse/SPARK-5836
>>>
>>>
>>> Also, for some reason, the following JIRAs that were reported as
>>> functional issues were closed as Duplicates of the above Documentation bug.
>>> Does this mean that this issue won't be tackled at all?
>>>
>>> https://issues.apache.org/jira/browse/SPARK-3563
>>> https://issues.apache.org/jira/browse/SPARK-4796
>>> https://issues.apache.org/jira/browse/SPARK-6011
>>>
>>> Any further insight into whether this is being looked into and meanwhile
>>> how to handle shuffle files will be greatly appreciated.
>>>
>>> Thanks
>>> NB
>>>
>>>
>>
>>
>>
>>
>


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-21 Thread N B
We already do have a cron job in place to clean just the shuffle files.
However, what I would really like to know is whether there is a "proper"
way of telling spark to clean up these files once its done with them?

Thanks
NB


On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
wrote:

> Write a crone job for this like below
>
> 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
> 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune -exec
> rm -rf {} \+
> 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>
>
> On 20 April 2015 at 23:12, N B  wrote:
>
>> Hi all,
>>
>> I had posed this query as part of a different thread but did not get a
>> response there. So creating a new thread hoping to catch someone's
>> attention.
>>
>> We are experiencing this issue of shuffle files being left behind and not
>> being cleaned up by Spark. Since this is a Spark streaming application, it
>> is expected to stay up indefinitely, so shuffle files not being cleaned up
>> is a big problem right now. Our max window size is 6 hours, so we have set
>> up a cron job to clean up shuffle files older than 12 hours otherwise it
>> will eat up all our disk space.
>>
>> Please see the following. It seems the non-cleaning of shuffle files is
>> being documented in 1.3.1.
>>
>> https://github.com/apache/spark/pull/5074/files
>> https://issues.apache.org/jira/browse/SPARK-5836
>>
>>
>> Also, for some reason, the following JIRAs that were reported as
>> functional issues were closed as Duplicates of the above Documentation bug.
>> Does this mean that this issue won't be tackled at all?
>>
>> https://issues.apache.org/jira/browse/SPARK-3563
>> https://issues.apache.org/jira/browse/SPARK-4796
>> https://issues.apache.org/jira/browse/SPARK-6011
>>
>> Any further insight into whether this is being looked into and meanwhile
>> how to handle shuffle files will be greatly appreciated.
>>
>> Thanks
>> NB
>>
>>
>
>
>
>


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread Jeetendra Gangele
Write a crone job for this like below

12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune -exec
rm -rf {} \+
52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
+1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+

On 20 April 2015 at 23:12, N B  wrote:

> Hi all,
>
> I had posed this query as part of a different thread but did not get a
> response there. So creating a new thread hoping to catch someone's
> attention.
>
> We are experiencing this issue of shuffle files being left behind and not
> being cleaned up by Spark. Since this is a Spark streaming application, it
> is expected to stay up indefinitely, so shuffle files not being cleaned up
> is a big problem right now. Our max window size is 6 hours, so we have set
> up a cron job to clean up shuffle files older than 12 hours otherwise it
> will eat up all our disk space.
>
> Please see the following. It seems the non-cleaning of shuffle files is
> being documented in 1.3.1.
>
> https://github.com/apache/spark/pull/5074/files
> https://issues.apache.org/jira/browse/SPARK-5836
>
>
> Also, for some reason, the following JIRAs that were reported as
> functional issues were closed as Duplicates of the above Documentation bug.
> Does this mean that this issue won't be tackled at all?
>
> https://issues.apache.org/jira/browse/SPARK-3563
> https://issues.apache.org/jira/browse/SPARK-4796
> https://issues.apache.org/jira/browse/SPARK-6011
>
> Any further insight into whether this is being looked into and meanwhile
> how to handle shuffle files will be greatly appreciated.
>
> Thanks
> NB
>
>


RE: Shuffle files

2014-10-20 Thread Shao, Saisai
Hi Song,

For what I know in sort-based shuffle.

Normally parallel opened file numbers for sort-based shuffle is much smaller 
than hash-based shuffle.

In hash based shuffle, parallel opened file numbers is C * R (where C is core 
number used and R is the reducer number), as you can see the file numbers are 
related to reducer number, no matter how large the shuffle size is.

While in sort-based shuffle, final map output file is only 1, to achieve this 
we need to do by-partition sorting, this will generate some intermediate 
spilling files, but spilled file numbers are related to shuffle size and memory 
size for shuffle, no relation to reducer number.

So If you met “too many open files” in sort-based shuffle, I guess that you 
have so many spilled files while doing shuffle write, one possible way to 
alleviate this is to increase the shuffle memory usage, also change the ulimit 
is a possible way.

I guess in Yarn you have to do system configuration manually, Spark cannot set 
ulimit automatically for you, I don’t think it’s an issue Spark should take 
care.

Thanks
Jerry

From: Chen Song [mailto:chen.song...@gmail.com]
Sent: Tuesday, October 21, 2014 9:10 AM
To: Andrew Ash
Cc: Sunny Khatri; Lisonbee, Todd; u...@spark.incubator.apache.org
Subject: Re: Shuffle files

My observation is opposite. When my job runs under default 
spark.shuffle.manager, I don't see this exception. However, when it runs with 
SORT based, I start seeing this error? How would that be possible?

I am running my job in YARN, and I noticed that the YARN process limits (cat 
/proc/$PID/limits) are not consistent with system wide limits (shown by limit 
-a), I don't know how that happened. Is there a way to let Spark driver to 
propagate this setting (limit -n ) to spark executors before startup?




On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash 
mailto:and...@andrewash.com>> wrote:
You will need to restart your Mesos workers to pick up the new limits as well.

On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri 
mailto:sunny.k...@gmail.com>> wrote:
@SK:
Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit 
-a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf 
(MacOSX)

On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd 
mailto:todd.lison...@intel.com>> wrote:

Are you sure the new ulimit has taken effect?

How many cores are you using?  How many reducers?

"In general if a node in your cluster has C assigned cores and you run
a job with X reducers then Spark will open C*X files in parallel and
start writing. Shuffle consolidation will help decrease the total
number of files created but the number of file handles open at any
time doesn't change so it won't help the ulimit problem."

Quoted from Patrick at:
http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

Thanks,

Todd

-Original Message-
From: SK [mailto:skrishna...@gmail.com<mailto:skrishna...@gmail.com>]
Sent: Tuesday, October 7, 2014 2:12 PM
To: u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Re: Shuffle files

- We set ulimit to 50. But I still get the same "too many open files"
warning.

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>





--
Chen Song



Re: Shuffle files

2014-10-20 Thread Chen Song
My observation is opposite. When my job runs under default
spark.shuffle.manager, I don't see this exception. However, when it runs
with SORT based, I start seeing this error? How would that be possible?

I am running my job in YARN, and I noticed that the YARN process limits
(cat /proc/$PID/limits) are not consistent with system wide limits (shown
by limit -a), I don't know how that happened. Is there a way to let Spark
driver to propagate this setting (limit -n ) to spark executors
before startup?




On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash  wrote:

> You will need to restart your Mesos workers to pick up the new limits as
> well.
>
> On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri  wrote:
>
>> @SK:
>> Make sure ulimit has taken effect as Todd mentioned. You can verify via
>> ulimit -a. Also make sure you have proper kernel parameters set in
>> /etc/sysctl.conf (MacOSX)
>>
>> On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd 
>> wrote:
>>
>>>
>>> Are you sure the new ulimit has taken effect?
>>>
>>> How many cores are you using?  How many reducers?
>>>
>>> "In general if a node in your cluster has C assigned cores and
>>> you run
>>> a job with X reducers then Spark will open C*X files in parallel
>>> and
>>> start writing. Shuffle consolidation will help decrease the total
>>> number of files created but the number of file handles open at
>>> any
>>> time doesn't change so it won't help the ulimit problem."
>>>
>>> Quoted from Patrick at:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html
>>>
>>> Thanks,
>>>
>>> Todd
>>>
>>> -Original Message-
>>> From: SK [mailto:skrishna...@gmail.com]
>>> Sent: Tuesday, October 7, 2014 2:12 PM
>>> To: u...@spark.incubator.apache.org
>>> Subject: Re: Shuffle files
>>>
>>> - We set ulimit to 50. But I still get the same "too many open files"
>>> warning.
>>>
>>> - I tried setting consolidateFiles to True, but that did not help either.
>>>
>>> I am using a Mesos cluster.   Does Mesos have any limit on the number of
>>> open files?
>>>
>>> thanks
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


-- 
Chen Song


Re: Shuffle files

2014-10-07 Thread Andrew Ash
You will need to restart your Mesos workers to pick up the new limits as
well.

On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri  wrote:

> @SK:
> Make sure ulimit has taken effect as Todd mentioned. You can verify via
> ulimit -a. Also make sure you have proper kernel parameters set in
> /etc/sysctl.conf (MacOSX)
>
> On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd 
> wrote:
>
>>
>> Are you sure the new ulimit has taken effect?
>>
>> How many cores are you using?  How many reducers?
>>
>> "In general if a node in your cluster has C assigned cores and
>> you run
>> a job with X reducers then Spark will open C*X files in parallel
>> and
>> start writing. Shuffle consolidation will help decrease the total
>> number of files created but the number of file handles open at any
>> time doesn't change so it won't help the ulimit problem."
>>
>> Quoted from Patrick at:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html
>>
>> Thanks,
>>
>> Todd
>>
>> -Original Message-
>> From: SK [mailto:skrishna...@gmail.com]
>> Sent: Tuesday, October 7, 2014 2:12 PM
>> To: u...@spark.incubator.apache.org
>> Subject: Re: Shuffle files
>>
>> - We set ulimit to 50. But I still get the same "too many open files"
>> warning.
>>
>> - I tried setting consolidateFiles to True, but that did not help either.
>>
>> I am using a Mesos cluster.   Does Mesos have any limit on the number of
>> open files?
>>
>> thanks
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Shuffle files

2014-10-07 Thread Sunny Khatri
@SK:
Make sure ulimit has taken effect as Todd mentioned. You can verify via
ulimit -a. Also make sure you have proper kernel parameters set in
/etc/sysctl.conf (MacOSX)

On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd 
wrote:

>
> Are you sure the new ulimit has taken effect?
>
> How many cores are you using?  How many reducers?
>
> "In general if a node in your cluster has C assigned cores and you
> run
> a job with X reducers then Spark will open C*X files in parallel
> and
> start writing. Shuffle consolidation will help decrease the total
> number of files created but the number of file handles open at any
> time doesn't change so it won't help the ulimit problem."
>
> Quoted from Patrick at:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html
>
> Thanks,
>
> Todd
>
> -Original Message-
> From: SK [mailto:skrishna...@gmail.com]
> Sent: Tuesday, October 7, 2014 2:12 PM
> To: u...@spark.incubator.apache.org
> Subject: Re: Shuffle files
>
> - We set ulimit to 50. But I still get the same "too many open files"
> warning.
>
> - I tried setting consolidateFiles to True, but that did not help either.
>
> I am using a Mesos cluster.   Does Mesos have any limit on the number of
> open files?
>
> thanks
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Shuffle files

2014-10-07 Thread Lisonbee, Todd

Are you sure the new ulimit has taken effect?
 
How many cores are you using?  How many reducers?

"In general if a node in your cluster has C assigned cores and you run 
a job with X reducers then Spark will open C*X files in parallel and 
start writing. Shuffle consolidation will help decrease the total 
number of files created but the number of file handles open at any 
time doesn't change so it won't help the ulimit problem."

Quoted from Patrick at:
http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

Thanks,

Todd

-Original Message-
From: SK [mailto:skrishna...@gmail.com] 
Sent: Tuesday, October 7, 2014 2:12 PM
To: u...@spark.incubator.apache.org
Subject: Re: Shuffle files

- We set ulimit to 50. But I still get the same "too many open files"
warning. 

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files

2014-10-07 Thread SK
- We set ulimit to 50. But I still get the same "too many open files"
warning. 

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files

2014-09-25 Thread Andrew Ash
Hi SK,

For the problem with lots of shuffle files and the "too many open files"
exception there are a couple options:

1. The linux kernel has a limit on the number of open files at once.  This
is set with ulimit -n, and can be set permanently in /etc/sysctl.conf or
/etc/sysctl.d/.  Try increasing this to a large value, at the bare minimum
the square of your partition count.
2. Try using shuffle consolidation -- spark.shuffle.consolidateFiles=true This
option writes fewer files to disk so shouldn't hit limits nearly as much
3. Try using the sort-based shuffle by setting spark.shuffle.manager=SORT.
You should likely hold off on this until
https://issues.apache.org/jira/browse/SPARK-3032 is fixed, hopefully in
1.1.1

Hope that helps!
Andrew

On Thu, Sep 25, 2014 at 4:20 PM, SK  wrote:

> Hi,
>
> I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a
> directory (I am using  sc.textfile("dir/*") ) to read in the files.  I am
> getting the following warning:
>
> WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99,
> mesos12-dev.sccps.net): java.io.FileNotFoundException:
> /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open
> files)
>
> basically I think a lot of shuffle files are being created.
>
> 1) The tasks eventually fail and the job just hangs (after taking very
> long,
> more than an hour).  If I read these 30 files in a for loop, the same job
> completes in a few minutes. However, I need to specify the files names,
> which is not convenient. I am assuming that sc.textfile("dir/*") creates a
> large RDD for all the 30 files. Is there a way to make the operation on
> this
> large RDD efficient so as to avoid creating too many shuffle files?
>
>
> 2) Also, I am finding that all the shuffle files for my other completed
> jobs
> are not being automatically deleted even after days. I thought that
> sc.stop() clears the intermediate files.  Is there some way to
> programmatically delete these temp shuffle files upon job completion?
>
>
> thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Shuffle Files

2014-03-04 Thread Aniket Mokashi
>From BlockManager code + ShuffleMapTask code, it writes under
spark.local.dir or java.io.tmpdir.

val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")))




On Mon, Mar 3, 2014 at 10:45 PM, Usman Ghani  wrote:

> Where on the filesystem does spark write the shuffle files?
>



-- 
"...:::Aniket:::... Quetzalco@tl"