Re: Re: [spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-22 Thread Mich Talebzadeh
Just to correct the last sentence, if we end up starting a new instance of
Spark, I don't think it will be able to read the shuffle data from storage
from another instance, I stand corrected.


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 22 May 2023 at 15:27, Mich Talebzadeh 
wrote:

> Hi Maksym.
>
> Let us understand the basics here first
> My thoughtsSpark replicates the partitions among multiple nodes. If one
> executor fails, it moves the processing over to the other executor.
> However, if the data is lost, it re-executes the processing that generated
> the data,
> and might have to go back to the source. In case of failure, there will
> be delay in getting the results. The amount of delay depends on how much
> reprocessing Spark needs to do.
> Spark, by itself, doesn't add executors when executors fail. It just moves
> the tasks to other executors. If you are installing plain vanilla Spark
> on your own cluster, you need to figure out how to bring back executors.
> Most of the popular platforms built on top of Spark (Glue, EMR, GKS) will
> replace failed nodes. However, I don't think that applies to your case.
>
> With regard to below point you raised
>
> "" One of the offerings from the service we use is EBS migration which
> basically means if a host is about to get evicted, a new host is created
> and the EBS volume is attached to it. When Spark assigns a new executor
> to the newly created instance, it basically can recover all the shuffle
> files that are already persisted in the migrated EBS volume Is this how
> it works? Do executors recover / re-register the shuffle files that they
> found?"""
>
> My understanding is that RDD lineage keeps track of records of what needs
> to be re-executed. It uses RDD lineage to figure out what needs to be
> re-executed in the same Spark instance. For example, if you have done a
> groupBy Key , you will have 2 stages. After the first stage, the data will
> be shuffled by hashing the groupBy key , so that data for the same value of
> key lands in same partition. Now, if one of those partitions is lost
> during execution of second stage, I am guessing Spark will have to go back
> and re-execute all the tasks in the first stage.
>
>
> So it your case stating a new instance will not have the same issue,
> re-executes the job.
>
>
> HTH
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 22 May 2023 at 13:19, Maksym M 
> wrote:
>
>> Hey vaquar,
>>
>> The link does't explain the crucial detail we're interested in - does
>> executor
>> re-use the data that exists on a node from previous executor and if not,
>> how
>> can we configure it to do so?
>>
>> We are not running on kubernetes, so EKS/Kubernetes-specific advice isn't
>> very relevant.
>>
>> We are running spark standalone mode.
>>
>> Best regards,
>> maksym
>>
>> On 2023/05/17 12:28:35 vaquar khan wrote:
>> > Following link you will get all required details
>> >
>> >
>> https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/
>> >
>> > Let me know if you required further informations.
>> >
>> >
>> > Regards,
>> > Vaquar khan
>> >
>> >
>> >
>> >
>> > On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh 
>> > wrote:
>> >
>> > > Couple of points
>> > >
>> > > Why use spot or pre-empt intantes when your application as you stated
>> > &

Re: Re: [spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-22 Thread Mich Talebzadeh
Hi Maksym.

Let us understand the basics here first
My thoughtsSpark replicates the partitions among multiple nodes. If one
executor fails, it moves the processing over to the other executor.
However, if the data is lost, it re-executes the processing that generated
the data,
and might have to go back to the source. In case of failure, there will be
delay in getting the results. The amount of delay depends on how much
reprocessing Spark needs to do.
Spark, by itself, doesn't add executors when executors fail. It just moves
the tasks to other executors. If you are installing plain vanilla Spark on
your own cluster, you need to figure out how to bring back executors.
Most of the popular platforms built on top of Spark (Glue, EMR, GKS) will
replace failed nodes. However, I don't think that applies to your case.

With regard to below point you raised

"" One of the offerings from the service we use is EBS migration which
basically means if a host is about to get evicted, a new host is created
and the EBS volume is attached to it. When Spark assigns a new executor to
the newly created instance, it basically can recover all the shuffle files
that are already persisted in the migrated EBS volume Is this how it works?
Do executors recover / re-register the shuffle files that they found?"""

My understanding is that RDD lineage keeps track of records of what needs
to be re-executed. It uses RDD lineage to figure out what needs to be
re-executed in the same Spark instance. For example, if you have done a
groupBy Key , you will have 2 stages. After the first stage, the data will
be shuffled by hashing the groupBy key , so that data for the same value of
key lands in same partition. Now, if one of those partitions is lost
during execution of second stage, I am guessing Spark will have to go back
and re-execute all the tasks in the first stage.


So it your case stating a new instance will not have the same issue,
re-executes the job.


HTH


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 22 May 2023 at 13:19, Maksym M 
wrote:

> Hey vaquar,
>
> The link does't explain the crucial detail we're interested in - does
> executor
> re-use the data that exists on a node from previous executor and if not,
> how
> can we configure it to do so?
>
> We are not running on kubernetes, so EKS/Kubernetes-specific advice isn't
> very relevant.
>
> We are running spark standalone mode.
>
> Best regards,
> maksym
>
> On 2023/05/17 12:28:35 vaquar khan wrote:
> > Following link you will get all required details
> >
> >
> https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/
> >
> > Let me know if you required further informations.
> >
> >
> > Regards,
> > Vaquar khan
> >
> >
> >
> >
> > On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh 
> > wrote:
> >
> > > Couple of points
> > >
> > > Why use spot or pre-empt intantes when your application as you stated
> > > shuffles heavily.
> > > Have you looked at why you are having these shuffles? What is the
> cause of
> > > these large transformations ending up in shuffle
> > >
> > > Also on your point:
> > > "..then ideally we should expect that when an executor is killed/OOM'd
> > > and a new executor is spawned on the same host, the new executor
> registers
> > > the shuffle files to itself. Is that so?"
> > >
> > > What guarantee is that the new executor with inherited shuffle files
> will
> > > succeed?
> > >
> > > Also OOM is often associated with some form of skewed data
> > >
> > > HTH
> > > .
> > > Mich Talebzadeh,
> > > Lead Solutions Architect/Engineering Lead
> > > Palantir Technologies Limited
> > > London
> > > United Kingdom
> > >
> > >
> > >view my Linkedin profile
> > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> > >
> > >
> > >  https://en.everybodywiki.com/Mich_Talebzadeh
> > >
> > >
> > >
> > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any
&

RE: Re: [spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-22 Thread Maksym M
Hey vaquar,

The link does't explain the crucial detail we're interested in - does executor
re-use the data that exists on a node from previous executor and if not, how
can we configure it to do so?

We are not running on kubernetes, so EKS/Kubernetes-specific advice isn't
very relevant.

We are running spark standalone mode.

Best regards,
maksym

On 2023/05/17 12:28:35 vaquar khan wrote:
> Following link you will get all required details
> 
> https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/
> 
> Let me know if you required further informations.
> 
> 
> Regards,
> Vaquar khan
> 
> 
> 
> 
> On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh 
> wrote:
> 
> > Couple of points
> >
> > Why use spot or pre-empt intantes when your application as you stated
> > shuffles heavily.
> > Have you looked at why you are having these shuffles? What is the cause of
> > these large transformations ending up in shuffle
> >
> > Also on your point:
> > "..then ideally we should expect that when an executor is killed/OOM'd
> > and a new executor is spawned on the same host, the new executor registers
> > the shuffle files to itself. Is that so?"
> >
> > What guarantee is that the new executor with inherited shuffle files will
> > succeed?
> >
> > Also OOM is often associated with some form of skewed data
> >
> > HTH
> > .
> > Mich Talebzadeh,
> > Lead Solutions Architect/Engineering Lead
> > Palantir Technologies Limited
> > London
> > United Kingdom
> >
> >
> >view my Linkedin profile
> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> >
> >
> >  https://en.everybodywiki.com/Mich_Talebzadeh
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Mon, 15 May 2023 at 13:11, Faiz Halde 
> > wrote:
> >
> >> Hello,
> >>
> >> We've been in touch with a few spark specialists who suggested us a
> >> potential solution to improve the reliability of our jobs that are shuffle
> >> heavy
> >>
> >> Here is what our setup looks like
> >>
> >>- Spark version: 3.3.1
> >>- Java version: 1.8
> >>- We do not use external shuffle service
> >>- We use spot instances
> >>
> >> We run spark jobs on clusters that use Amazon EBS volumes. The
> >> spark.local.dir is mounted on this EBS volume. One of the offerings from
> >> the service we use is EBS migration which basically means if a host is
> >> about to get evicted, a new host is created and the EBS volume is attached
> >> to it
> >>
> >> When Spark assigns a new executor to the newly created instance, it
> >> basically can recover all the shuffle files that are already persisted in
> >> the migrated EBS volume
> >>
> >> Is this how it works? Do executors recover / re-register the shuffle
> >> files that they found?
> >>
> >> So far I have not come across any recovery mechanism. I can only see
> >>
> >> KubernetesLocalDiskShuffleDataIO
> >>
> >>  that has a pre-init step where it tries to register the available
> >> shuffle files to itself
> >>
> >> A natural follow-up on this,
> >>
> >> If what they claim is true, then ideally we should expect that when an
> >> executor is killed/OOM'd and a new executor is spawned on the same host,
> >> the new executor registers the shuffle files to itself. Is that so?
> >>
> >> Thanks
> >>
> >> --
> >> Confidentiality note: This e-mail may contain confidential information
> >> from Nu Holdings Ltd and/or its affiliates. If you have received it by
> >> mistake, please let us know by e-mail reply and delete it from your system;
> >> you may not copy this message or disclose its contents to anyone; for
> >> details about what personal information we collect and why, please refer to
> >> our privacy policy
> >> <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>
> >> .
> >>
> >
> 
-- 

Confidentiality note: This e-mail may contain confidential information 
from Nu Holdings Ltd and/or its affiliates. If you have received it by 
mistake, please let us know by e-mail reply and delete it from your system; 
you may not copy this message or disclose its contents to anyone; for 
details about what personal information we collect and why, please refer to 
our privacy policy 
<https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-17 Thread vaquar khan
Following link you will get all required details

https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/

Let me know if you required further informations.


Regards,
Vaquar khan




On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh 
wrote:

> Couple of points
>
> Why use spot or pre-empt intantes when your application as you stated
> shuffles heavily.
> Have you looked at why you are having these shuffles? What is the cause of
> these large transformations ending up in shuffle
>
> Also on your point:
> "..then ideally we should expect that when an executor is killed/OOM'd
> and a new executor is spawned on the same host, the new executor registers
> the shuffle files to itself. Is that so?"
>
> What guarantee is that the new executor with inherited shuffle files will
> succeed?
>
> Also OOM is often associated with some form of skewed data
>
> HTH
> .
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 15 May 2023 at 13:11, Faiz Halde 
> wrote:
>
>> Hello,
>>
>> We've been in touch with a few spark specialists who suggested us a
>> potential solution to improve the reliability of our jobs that are shuffle
>> heavy
>>
>> Here is what our setup looks like
>>
>>- Spark version: 3.3.1
>>- Java version: 1.8
>>- We do not use external shuffle service
>>- We use spot instances
>>
>> We run spark jobs on clusters that use Amazon EBS volumes. The
>> spark.local.dir is mounted on this EBS volume. One of the offerings from
>> the service we use is EBS migration which basically means if a host is
>> about to get evicted, a new host is created and the EBS volume is attached
>> to it
>>
>> When Spark assigns a new executor to the newly created instance, it
>> basically can recover all the shuffle files that are already persisted in
>> the migrated EBS volume
>>
>> Is this how it works? Do executors recover / re-register the shuffle
>> files that they found?
>>
>> So far I have not come across any recovery mechanism. I can only see
>>
>> KubernetesLocalDiskShuffleDataIO
>>
>>  that has a pre-init step where it tries to register the available
>> shuffle files to itself
>>
>> A natural follow-up on this,
>>
>> If what they claim is true, then ideally we should expect that when an
>> executor is killed/OOM'd and a new executor is spawned on the same host,
>> the new executor registers the shuffle files to itself. Is that so?
>>
>> Thanks
>>
>> --
>> Confidentiality note: This e-mail may contain confidential information
>> from Nu Holdings Ltd and/or its affiliates. If you have received it by
>> mistake, please let us know by e-mail reply and delete it from your system;
>> you may not copy this message or disclose its contents to anyone; for
>> details about what personal information we collect and why, please refer to
>> our privacy policy
>> <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>
>> .
>>
>


Re: [spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-15 Thread Mich Talebzadeh
Couple of points

Why use spot or pre-empt intantes when your application as you stated
shuffles heavily.
Have you looked at why you are having these shuffles? What is the cause of
these large transformations ending up in shuffle

Also on your point:
"..then ideally we should expect that when an executor is killed/OOM'd and
a new executor is spawned on the same host, the new executor registers the
shuffle files to itself. Is that so?"

What guarantee is that the new executor with inherited shuffle files will
succeed?

Also OOM is often associated with some form of skewed data

HTH
.
Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 15 May 2023 at 13:11, Faiz Halde 
wrote:

> Hello,
>
> We've been in touch with a few spark specialists who suggested us a
> potential solution to improve the reliability of our jobs that are shuffle
> heavy
>
> Here is what our setup looks like
>
>- Spark version: 3.3.1
>- Java version: 1.8
>- We do not use external shuffle service
>- We use spot instances
>
> We run spark jobs on clusters that use Amazon EBS volumes. The
> spark.local.dir is mounted on this EBS volume. One of the offerings from
> the service we use is EBS migration which basically means if a host is
> about to get evicted, a new host is created and the EBS volume is attached
> to it
>
> When Spark assigns a new executor to the newly created instance, it
> basically can recover all the shuffle files that are already persisted in
> the migrated EBS volume
>
> Is this how it works? Do executors recover / re-register the shuffle files
> that they found?
>
> So far I have not come across any recovery mechanism. I can only see
>
> KubernetesLocalDiskShuffleDataIO
>
>  that has a pre-init step where it tries to register the available shuffle
> files to itself
>
> A natural follow-up on this,
>
> If what they claim is true, then ideally we should expect that when an
> executor is killed/OOM'd and a new executor is spawned on the same host,
> the new executor registers the shuffle files to itself. Is that so?
>
> Thanks
>
> --
> Confidentiality note: This e-mail may contain confidential information
> from Nu Holdings Ltd and/or its affiliates. If you have received it by
> mistake, please let us know by e-mail reply and delete it from your system;
> you may not copy this message or disclose its contents to anyone; for
> details about what personal information we collect and why, please refer to
> our privacy policy
> <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>
> .
>


[spark-core] Can executors recover/reuse shuffle files upon failure?

2023-05-15 Thread Faiz Halde
Hello,

We've been in touch with a few spark specialists who suggested us a
potential solution to improve the reliability of our jobs that are shuffle
heavy

Here is what our setup looks like

   - Spark version: 3.3.1
   - Java version: 1.8
   - We do not use external shuffle service
   - We use spot instances

We run spark jobs on clusters that use Amazon EBS volumes. The
spark.local.dir is mounted on this EBS volume. One of the offerings from
the service we use is EBS migration which basically means if a host is
about to get evicted, a new host is created and the EBS volume is attached
to it

When Spark assigns a new executor to the newly created instance, it
basically can recover all the shuffle files that are already persisted in
the migrated EBS volume

Is this how it works? Do executors recover / re-register the shuffle files
that they found?

So far I have not come across any recovery mechanism. I can only see

KubernetesLocalDiskShuffleDataIO

 that has a pre-init step where it tries to register the available shuffle
files to itself

A natural follow-up on this,

If what they claim is true, then ideally we should expect that when an
executor is killed/OOM'd and a new executor is spawned on the same host,
the new executor registers the shuffle files to itself. Is that so?

Thanks

-- 

Confidentiality note: This e-mail may contain confidential information 
from Nu Holdings Ltd and/or its affiliates. If you have received it by 
mistake, please let us know by e-mail reply and delete it from your system; 
you may not copy this message or disclose its contents to anyone; for 
details about what personal information we collect and why, please refer to 
our privacy policy 
<https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>.


Re: [EXTERNAL] spark re-use shuffle files not happening

2022-07-16 Thread Koert Kuipers
ok thanks. guess i am simply misremembering that i saw the shuffle files
getting re-used across jobs (actions). it was probably across stages for
the same job.

in structured streaming this is a pretty big deal. if you join a streaming
dataframe with a large static dataframe each microbatch becomes a job
(action), so the large static dataframe gets reshuffled for every
microbatch. observing this performance issue was actually why i did the
little basic experiment in this post.


On Sat, Jul 16, 2022 at 12:33 PM Shay Elbaz  wrote:

> Spark can reuse shuffle stages *in the same job *(action), not cross jobs.
> --
> *From:* Koert Kuipers 
> *Sent:* Saturday, July 16, 2022 6:43 PM
> *To:* user 
> *Subject:* [EXTERNAL] spark re-use shuffle files not happening
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>
> i have seen many jobs where spark re-uses shuffle files (and skips a stage
> of a job), which is an awesome feature given how expensive shuffles are,
> and i generally now assume this will happen.
>
> however i feel like i am going a little crazy today. i did the simplest
> test in spark 3.3.0, basically i run 2 jobs within same spark shell, so
> using same spark session, and broadcast join is disabled so we get
> shuffles:
> 1) job1 joins dataframe1 with dataframe0 and writes results out.
> 2) job2 joins dataframe2 with dataframe0 and writes results out.
>
> i would expect job2 to skip the stage where dataframe0 is getting
> shuffled, but its not skipping it! what am i missing?
> is shuffle re-use only enabled within same job/action? that goes against
> what i remember...
>
> code:
> $ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
> scala> val data0 = spark.read.format("csv").option("header",
> true).load("data0.csv")
> scala> val data1 = spark.read.format("csv").option("header",
> true).load("data1.csv")
> scala> val data2 = spark.read.format("csv").option("header",
> true).load("data2.csv")
> scala> data1.join(data0, "key").write.format("parquet").save("out1")
> scala> data2.join(data0, "key").write.format("parquet").save("out2") //
> should skip stage that scans csv for data0 and writes shuffle files... but
> it doesn't
>
>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: [EXTERNAL] spark re-use shuffle files not happening

2022-07-16 Thread Shay Elbaz
Spark can reuse shuffle stages in the same job (action), not cross jobs.

From: Koert Kuipers 
Sent: Saturday, July 16, 2022 6:43 PM
To: user 
Subject: [EXTERNAL] spark re-use shuffle files not happening


ATTENTION: This email originated from outside of GM.


i have seen many jobs where spark re-uses shuffle files (and skips a stage of a 
job), which is an awesome feature given how expensive shuffles are, and i 
generally now assume this will happen.

however i feel like i am going a little crazy today. i did the simplest test in 
spark 3.3.0, basically i run 2 jobs within same spark shell, so using same 
spark session, and broadcast join is disabled so we get shuffles:
1) job1 joins dataframe1 with dataframe0 and writes results out.
2) job2 joins dataframe2 with dataframe0 and writes results out.

i would expect job2 to skip the stage where dataframe0 is getting shuffled, but 
its not skipping it! what am i missing?
is shuffle re-use only enabled within same job/action? that goes against what i 
remember...

code:
$ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
scala> val data0 = spark.read.format("csv").option("header", 
true).load("data0.csv")
scala> val data1 = spark.read.format("csv").option("header", 
true).load("data1.csv")
scala> val data2 = spark.read.format("csv").option("header", 
true).load("data2.csv")
scala> data1.join(data0, "key").write.format("parquet").save("out1")
scala> data2.join(data0, "key").write.format("parquet").save("out2") // should 
skip stage that scans csv for data0 and writes shuffle files... but it doesn't



CONFIDENTIALITY NOTICE: This electronic communication and any files transmitted 
with it are confidential, privileged and intended solely for the use of the 
individual or entity to whom they are addressed. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, distribution 
(electronic or otherwise) or forwarding of, or the taking of any action in 
reliance on the contents of this transmission is strictly prohibited. Please 
notify the sender immediately by e-mail if you have received this email by 
mistake and delete this email from your system.

Is it necessary to print this email? If you care about the environment like we 
do, please refrain from printing emails. It helps to keep the environment 
forested and litter-free.


spark re-use shuffle files not happening

2022-07-16 Thread Koert Kuipers
i have seen many jobs where spark re-uses shuffle files (and skips a stage
of a job), which is an awesome feature given how expensive shuffles are,
and i generally now assume this will happen.

however i feel like i am going a little crazy today. i did the simplest
test in spark 3.3.0, basically i run 2 jobs within same spark shell, so
using same spark session, and broadcast join is disabled so we get
shuffles:
1) job1 joins dataframe1 with dataframe0 and writes results out.
2) job2 joins dataframe2 with dataframe0 and writes results out.

i would expect job2 to skip the stage where dataframe0 is getting shuffled,
but its not skipping it! what am i missing?
is shuffle re-use only enabled within same job/action? that goes against
what i remember...

code:
$ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
scala> val data0 = spark.read.format("csv").option("header",
true).load("data0.csv")
scala> val data1 = spark.read.format("csv").option("header",
true).load("data1.csv")
scala> val data2 = spark.read.format("csv").option("header",
true).load("data2.csv")
scala> data1.join(data0, "key").write.format("parquet").save("out1")
scala> data2.join(data0, "key").write.format("parquet").save("out2") //
should skip stage that scans csv for data0 and writes shuffle files... but
it doesn't

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: Spark with External Shuffle Service - using saved shuffle files in the event of executor failure

2021-05-12 Thread Attila Zsolt Piros
Hello,

I have answered it on the Stack Overflow.

Best Regards,
Attila


On Wed, May 12, 2021 at 4:57 PM Chris Thomas 
wrote:

> Hi,
>
> I am pretty confident I have observed Spark configured with the Shuffle
> Service continuing to fetch shuffle files on a node in the event of
> executor failure, rather than recompute the shuffle files as happens
> without the Shuffle Service. Can anyone confirm this?
>
> (I have a SO question
> <https://stackoverflow.com/questions/67466878/can-spark-with-external-shuffle-service-use-saved-shuffle-files-in-the-event-of>open
> on the same if you would rather answer directly there).
>
> Kind regards,
>
> Chris
>
>
>


Spark with External Shuffle Service - using saved shuffle files in the event of executor failure

2021-05-12 Thread Chris Thomas
Hi, 

I am pretty confident I have observed Spark configured with the Shuffle Service 
continuing to fetch shuffle files on a node in the event of executor failure, 
rather than recompute the shuffle files as happens without the Shuffle Service. 
Can anyone confirm this?

(I have a SO question  
<https://stackoverflow.com/questions/67466878/can-spark-with-external-shuffle-service-use-saved-shuffle-files-in-the-event-of>open
 on the same if you would rather answer directly there).

Kind regards,

Chris




Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-23 Thread Holden Karau
You can also look at the shuffle file cleanup tricks we do inside of the
ALS algorithm in Spark.

On Fri, Feb 23, 2018 at 6:20 PM, vijay.bvp  wrote:

> have you looked at
> http://apache-spark-user-list.1001560.n3.nabble.com/Limit-
> Spark-Shuffle-Disk-Usage-td23279.html
>
> and the post mentioned there
> https://forums.databricks.com/questions/277/how-do-i-avoid-
> the-no-space-left-on-device-error.html
>
> also try compressing the output
> https://spark.apache.org/docs/latest/configuration.html#
> compression-and-serialization
> spark.shuffle.compress
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Twitter: https://twitter.com/holdenkarau


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-23 Thread vijay.bvp
have you looked at 
http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-td23279.html

and the post mentioned there
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

also try compressing the output
https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization
spark.shuffle.compress

thanks
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread naresh Goud
Got it. I understood issue in different way.



On Thu, Feb 22, 2018 at 9:19 PM Keith Chapman <keithgchap...@gmail.com>
wrote:

> My issue is that there is not enough pressure on GC, hence GC is not
> kicking in fast enough to delete the shuffle files of previous iterations.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com>
> wrote:
>
>> It would be very difficult to tell without knowing what is your
>> application code doing, what kind of transformation/actions performing.
>> From my previous experience tuning application code which avoids
>> unnecessary objects reduce pressure on GC.
>>
>>
>> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm benchmarking a spark application by running it for multiple
>>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>>> machine with a very large hear (~200GB). The system has a SSD. When running
>>> for 3 to 4 iterations I get into a situation that I run out of disk space
>>> on the /tmp directory. On further investigation I was able to figure out
>>> that the reason for this is that the shuffle files are still around,
>>> because I have a very large hear GC has not happen and hence the shuffle
>>> files are not deleted. I was able to confirm this by lowering the heap size
>>> and I see GC kicking in more often and the size of /tmp stays under
>>> control. Is there any way I could configure spark to handle this issue?
>>>
>>> One option that I have is to have GC run more often by
>>> setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
>>> cleaner solution?
>>>
>>> Regards,
>>> Keith.
>>>
>>> http://keith-chapman.com
>>>
>>
>>
>


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
My issue is that there is not enough pressure on GC, hence GC is not
kicking in fast enough to delete the shuffle files of previous iterations.

Regards,
Keith.

http://keith-chapman.com

On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> It would be very difficult to tell without knowing what is your
> application code doing, what kind of transformation/actions performing.
> From my previous experience tuning application code which avoids
> unnecessary objects reduce pressure on GC.
>
>
> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm benchmarking a spark application by running it for multiple
>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>> machine with a very large hear (~200GB). The system has a SSD. When running
>> for 3 to 4 iterations I get into a situation that I run out of disk space
>> on the /tmp directory. On further investigation I was able to figure out
>> that the reason for this is that the shuffle files are still around,
>> because I have a very large hear GC has not happen and hence the shuffle
>> files are not deleted. I was able to confirm this by lowering the heap size
>> and I see GC kicking in more often and the size of /tmp stays under
>> control. Is there any way I could configure spark to handle this issue?
>>
>> One option that I have is to have GC run more often by
>> setting spark.cleaner.periodicGC.interval to a much lower value. Is
>> there a cleaner solution?
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>
>


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread naresh Goud
It would be very difficult to tell without knowing what is your application
code doing, what kind of transformation/actions performing. From my
previous experience tuning application code which avoids unnecessary
objects reduce pressure on GC.


On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com>
wrote:

> Hi,
>
> I'm benchmarking a spark application by running it for multiple
> iterations, its a benchmark thats heavy on shuffle and I run it on a local
> machine with a very large hear (~200GB). The system has a SSD. When running
> for 3 to 4 iterations I get into a situation that I run out of disk space
> on the /tmp directory. On further investigation I was able to figure out
> that the reason for this is that the shuffle files are still around,
> because I have a very large hear GC has not happen and hence the shuffle
> files are not deleted. I was able to confirm this by lowering the heap size
> and I see GC kicking in more often and the size of /tmp stays under
> control. Is there any way I could configure spark to handle this issue?
>
> One option that I have is to have GC run more often by
> setting spark.cleaner.periodicGC.interval to a much lower value. Is there
> a cleaner solution?
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
Hi,

I'm benchmarking a spark application by running it for multiple iterations,
its a benchmark thats heavy on shuffle and I run it on a local machine with
a very large hear (~200GB). The system has a SSD. When running for 3 to 4
iterations I get into a situation that I run out of disk space on the /tmp
directory. On further investigation I was able to figure out that the
reason for this is that the shuffle files are still around, because I have
a very large hear GC has not happen and hence the shuffle files are not
deleted. I was able to confirm this by lowering the heap size and I see GC
kicking in more often and the size of /tmp stays under control. Is there
any way I could configure spark to handle this issue?

One option that I have is to have GC run more often by
setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
cleaner solution?

Regards,
Keith.

http://keith-chapman.com


Re: Spark shuffle files

2017-03-27 Thread Mark Hamstra
When the RDD using them goes out of scope.

On Mon, Mar 27, 2017 at 3:13 PM, Ashwin Sai Shankar <ashan...@netflix.com>
wrote:

> Thanks Mark! follow up question, do you know when shuffle files are
> usually un-referenced?
>
> On Mon, Mar 27, 2017 at 2:35 PM, Mark Hamstra <m...@clearstorydata.com>
> wrote:
>
>> Shuffle files are cleaned when they are no longer referenced. See
>> https://github.com/apache/spark/blob/master/core/src/mai
>> n/scala/org/apache/spark/ContextCleaner.scala
>>
>> On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar <
>> ashan...@netflix.com.invalid> wrote:
>>
>>> Hi!
>>>
>>> In spark on yarn, when are shuffle files on local disk removed? (Is it
>>> when the app completes or
>>> once all the shuffle files are fetched or end of the stage?)
>>>
>>> Thanks,
>>> Ashwin
>>>
>>
>>
>


Re: Spark shuffle files

2017-03-27 Thread Ashwin Sai Shankar
Thanks Mark! follow up question, do you know when shuffle files are usually
un-referenced?

On Mon, Mar 27, 2017 at 2:35 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> Shuffle files are cleaned when they are no longer referenced. See
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/ContextCleaner.scala
>
> On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar <
> ashan...@netflix.com.invalid> wrote:
>
>> Hi!
>>
>> In spark on yarn, when are shuffle files on local disk removed? (Is it
>> when the app completes or
>> once all the shuffle files are fetched or end of the stage?)
>>
>> Thanks,
>> Ashwin
>>
>
>


Re: Spark shuffle files

2017-03-27 Thread Mark Hamstra
Shuffle files are cleaned when they are no longer referenced. See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ContextCleaner.scala

On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar <
ashan...@netflix.com.invalid> wrote:

> Hi!
>
> In spark on yarn, when are shuffle files on local disk removed? (Is it
> when the app completes or
> once all the shuffle files are fetched or end of the stage?)
>
> Thanks,
> Ashwin
>


Spark shuffle files

2017-03-27 Thread Ashwin Sai Shankar
Hi!

In spark on yarn, when are shuffle files on local disk removed? (Is it when
the app completes or
once all the shuffle files are fetched or end of the stage?)

Thanks,
Ashwin


Fwd: Connection failure followed by bad shuffle files during shuffle

2016-03-15 Thread Eric Martin
Hi,

I'm running into consistent failures during a shuffle read while trying to
do a group-by followed by a count aggregation (using the DataFrame API on
Spark 1.5.2).

The shuffle read (in stage 1) fails with

org.apache.spark.shuffle.FetchFailedException: Failed to send RPC
7719188499899260109 to host_a/ip_a:35946:
java.nio.channels.ClosedChannelException
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)


Looking into executor logs shows first shows

ERROR TransportChannelHandler: Connection to host_b/ip_b:38804 has been
quiet for 12 ms while there are outstanding requests. Assuming
connection is dead; please adjust spark.network.timeout if this is wrong.

on the node that threw the FetchFailedException (host_a) and

ERROR TransportRequestHandler: Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=207789700738,
chunkIndex=894},
buffer=FileSegmentManagedBuffer{file=/local_disk/spark-ed6667d4-445b-4d65-bfda-e4540b7215aa/executor-d03e5e7e-57d4-40e2-9021-c20d0b84bf75/blockmgr-05d5f2b6-142e-415c-a08b-58d16a10b8bf/27/shuffle_1_13732_0.data,
offset=18960736, length=19477}} to /ip_a:32991; closing connection

on the node referenced in the exception (host_b). The error in the host_b
logs occurred a few seconds after the error in the host_a logs. I noticed
there was a lot of spilling going on during the shuffle read, so I
attempted to work around this problem by increasing the number of shuffle
partitions (to decrease spilling) as well as increasing
spark.network.timeout. Neither of these got rid of these connection
failures.

This causes some of stage 0 to recompute (which runs successfully). Stage 1
retry 1 then always fails with

java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)

Changing the spark.io.compression.codec to lz4 changes this error to

java.io.IOException: Stream is corrupted
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:153)

which leads me to believe that the timeout during the shuffle read failure
leaves invalid files on disk.

Notably, these failures do not occur when I run on smaller subsets of data.
The failure is occurring while attempting to group ~100 billion rows into
20 billion groups (with key size of 24 bytes and count as the only
aggregation) on a 16 node cluster. I've replicated this failure on 2
completely separate clusters (both running with standalone cluster manager).

Does anyone have suggestions about how I could make this crash go away or
how I could try to make a smaller failing test case so the bug can be more
easily investigated?

Best,
Eric Martin


RE: FileNotFoundException in appcache shuffle files

2015-12-10 Thread kendal
I have similar issues... Exception only with very large data. 
And I tried to double the memory or partition as suggested by some google
search, but in vain..
any idea?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FileNotFoundException in appcache shuffle files

2015-12-10 Thread Jiří Syrový
Usually there is another error or log message before FileNotFoundException.
Try to check your logs for something like that.

2015-12-10 10:47 GMT+01:00 kendal <ken...@163.com>:

> I have similar issues... Exception only with very large data.
> And I tried to double the memory or partition as suggested by some google
> search, but in vain..
> any idea?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Checkpointing not removing shuffle files from local disk

2015-12-03 Thread Ewan Higgs
Hi all,
We are running a class with Pyspark notebook for data analysis. Some of
the books are fairly long and have a lot of operations. Through the
course of the notebook, the shuffle storage expands considerably and
often exceeds quota (e.g. 1.5GB input expands to 24GB in shuffle
files). Closing and reopening the notebook doesn't clean out the
shuffle directory.

FWIW, the shuffle memory really explodes when we use ALS.

There is a ticket to make sure this is well documented, but there are
also suggestions that the problem should have gone away with Spark 1.0:

https://issues.apache.org/jira/browse/SPARK-5836

Yours,
Ewan

On Tue, 2015-09-29 at 01:18 -0700, ramibatal wrote:
> Hi all,
> 
> I am applying MLlib LDA for topic modelling. I am setting up the the
> lda
> parameter as follow:
> 
> lda.setOptimizer(optimizer)
>   .setK(params.k)
>   .setMaxIterations(params.maxIterations)
>   .setDocConcentration(params.docConcentration)
>   .setTopicConcentration(params.topicConcentration)
>   .setCheckpointInterval(params.checkpointInterval)
>   if (params.checkpointDir.nonEmpty) {
>   sc.setCheckpointDir(params.checkpointDir.get)
>  }
> 
> 
> I am running the LDA algorithm on my local MacOS machine, on a corpus
> of
> 800,000 english text documents (total size 9GB), and my machine has 8
> cores
> with 16GB or RAM and 500GB or hard disk.
> 
> Here are my Spark configurations:
> 
> val conf = new
> SparkConf().setMaster("local[6]").setAppName("LDAExample")
> val sc = new SparkContext(conf)
> 
> 
> When calling the LDA with a large number of iteration (100) (i.e. by
> calling
> val ldaModel = lda.run(corpus)), the algorithm start to create
> shuffle files
> on my disk at at point that it fills it up till there is space left.
> 
> I am using spark-submit to run my program as follow:
> 
> spark-submit --driver-memory 14G --class
> com.heystaks.spark.ml.topicmodelling.LDAExample
> ./target/scala-2.10/lda-assembly-1.0.jar path/to/copurs/file --k 100
> --maxIterations 100 --checkpointDir /Users/ramialbatal/checkpoints
> --checkpointInterval 1
> 
> 
> Where 'K' is the number of topics to extract, when the number of
> iterations
> and topics are small everything is fine, but when there is large
> iteration
> number like 100, no matter what is the value of --checkpointInterval
> the
> phenomenon is the same: disk will fill up after about 25 iteration.
> 
> Everything seems to run correctly and the checkpoints files are
> created on
> my disk but the shuffle files are not removed at all.
> 
> I am using Spark and MLlib 1.5.0, and my machine is Mac Yosemite
> 10.10.5.
> 
> Any help is highly appreciated. Thanks
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n
> 3.nabble.com/Checkpointing-not-removing-shuffle-files-from-local-
> disk-tp24857.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Hello,

It is my understanding that shuffle are written on disk and that they act
as checkpoints.

I wonder if this is true only within a job, or across jobs. Please note
that I use the words job and stage carefully here.

1. can a shuffle created during JobN be used to skip many stages from
JobN+1? Or is the lifecycle of the shuffle files bound to the job that
created them?

2. when are shuffle files actually deleted? Is it TTL based or is it
cleaned when the job is over?

3. we have a very long batch application, and as it goes on, the number of
total tasks for each job gets larger and larger. It is not really a
problem, because most of those tasks will be skipped since we cache RDDs.
We noticed however that there is a delay in the actual start of a job of 1
min for every 2M tasks in your job. Are there suggested workarounds to
avoid that delay? Maybe saving the RDD and re-loading it?

Thanks
Thomas


Re: Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Ah, for #3, maybe this is what *rdd.checkpoint *does!
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

Thomas


On Mon, Jun 29, 2015 at 7:12 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 It is my understanding that shuffle are written on disk and that they act
 as checkpoints.

 I wonder if this is true only within a job, or across jobs. Please note
 that I use the words job and stage carefully here.

 1. can a shuffle created during JobN be used to skip many stages from
 JobN+1? Or is the lifecycle of the shuffle files bound to the job that
 created them?

 2. when are shuffle files actually deleted? Is it TTL based or is it
 cleaned when the job is over?

 3. we have a very long batch application, and as it goes on, the number of
 total tasks for each job gets larger and larger. It is not really a
 problem, because most of those tasks will be skipped since we cache RDDs.
 We noticed however that there is a delay in the actual start of a job of 1
 min for every 2M tasks in your job. Are there suggested workarounds to
 avoid that delay? Maybe saving the RDD and re-loading it?

 Thanks
 Thomas




Re: Shuffle files lifecycle

2015-06-29 Thread Thomas Gerber
Thanks Silvio.


On Mon, Jun 29, 2015 at 7:41 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   Regarding 1 and 2, yes shuffle output is stored on the worker local
 disks and will be reused across jobs as long as they’re available. You can
 identify when they’re used by seeing skipped stages in the job UI. They are
 periodically cleaned up based on available space of the configured
 spark.local.dirs paths.

   From: Thomas Gerber
 Date: Monday, June 29, 2015 at 10:12 PM
 To: user
 Subject: Shuffle files lifecycle

   Hello,

  It is my understanding that shuffle are written on disk and that they
 act as checkpoints.

  I wonder if this is true only within a job, or across jobs. Please note
 that I use the words job and stage carefully here.

  1. can a shuffle created during JobN be used to skip many stages from
 JobN+1? Or is the lifecycle of the shuffle files bound to the job that
 created them?

  2. when are shuffle files actually deleted? Is it TTL based or is it
 cleaned when the job is over?

  3. we have a very long batch application, and as it goes on, the number
 of total tasks for each job gets larger and larger. It is not really a
 problem, because most of those tasks will be skipped since we cache RDDs.
 We noticed however that there is a delay in the actual start of a job of 1
 min for every 2M tasks in your job. Are there suggested workarounds to
 avoid that delay? Maybe saving the RDD and re-loading it?

  Thanks
 Thomas




Re: Shuffle files lifecycle

2015-06-29 Thread Silvio Fiorito
Regarding 1 and 2, yes shuffle output is stored on the worker local disks and 
will be reused across jobs as long as they’re available. You can identify when 
they’re used by seeing skipped stages in the job UI. They are periodically 
cleaned up based on available space of the configured spark.local.dirs paths.

From: Thomas Gerber
Date: Monday, June 29, 2015 at 10:12 PM
To: user
Subject: Shuffle files lifecycle

Hello,

It is my understanding that shuffle are written on disk and that they act as 
checkpoints.

I wonder if this is true only within a job, or across jobs. Please note that I 
use the words job and stage carefully here.

1. can a shuffle created during JobN be used to skip many stages from JobN+1? 
Or is the lifecycle of the shuffle files bound to the job that created them?

2. when are shuffle files actually deleted? Is it TTL based or is it cleaned 
when the job is over?

3. we have a very long batch application, and as it goes on, the number of 
total tasks for each job gets larger and larger. It is not really a problem, 
because most of those tasks will be skipped since we cache RDDs. We noticed 
however that there is a delay in the actual start of a job of 1 min for every 
2M tasks in your job. Are there suggested workarounds to avoid that delay? 
Maybe saving the RDD and re-loading it?

Thanks
Thomas



Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-24 Thread N B
Hi TD,

That little experiment helped a bit. This time we did not see any
exceptions for about 16 hours but eventually it did throw the same
exceptions as before. The cleaning of the shuffle files also stopped much
before these exceptions happened - about 7-1/2 hours after startup.

I am not quite sure how to proceed from here. Any suggestions on how to
avoid these errors?

Thanks
NB


On Fri, Apr 24, 2015 at 12:57 AM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 That may very well have been the case. There may be some delay on our
 output side. I have made a change just for testing that sends the output
 nowhere. I will see if that helps get rid of these errors. Then we can try
 to find out how we can optimize so that we do not lag.

 Questions: How can we ever be sure that a lag even if temporary never
 occur in the future? Also, should Spark not clean up any temp files that it
 still knows it might need in the (near?) future.

 Thanks
 Nikunj


 On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das t...@databricks.com
 wrote:

 What was the state of your streaming application? Was it falling behind
 with a large increasing scheduling delay?

 TD

 On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote:

 Thanks for the response, Conor. I tried with those settings and for a
 while it seemed like it was cleaning up shuffle files after itself.
 However, after exactly 5 hours later it started throwing exceptions and
 eventually stopped working again. A sample stack trace is below. What is
 curious about 5 hours is that I set the cleaner ttl to 5 hours after
 changing the max window size to 1 hour (down from 6 hours in order to
 test). It also stopped cleaning the shuffle files after this started
 happening.

 Any idea why this could be happening?

 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
 java.lang.Exception: Could not compute split, block
 input-0-1429706099000 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Thanks
 NB


 On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
 conor.fenn...@altocloud.com wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle
 files. However, what I would really like to know is whether there is a
 proper way of telling spark to clean up these files once its done with
 them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {}
 \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d
 -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get
 a response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind
 and not being cleaned up by Spark. Since this is a Spark streaming
 application, it is expected to stay up indefinitely, so shuffle files 
 not
 being cleaned up is a big problem right now. Our max window size is 6
 hours, so we have set up a cron job to clean up shuffle files older 
 than 12
 hours otherwise it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files
 is being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread N B
Thanks for the response, Conor. I tried with those settings and for a while
it seemed like it was cleaning up shuffle files after itself. However,
after exactly 5 hours later it started throwing exceptions and eventually
stopped working again. A sample stack trace is below. What is curious about
5 hours is that I set the cleaner ttl to 5 hours after changing the max
window size to 1 hour (down from 6 hours in order to test). It also stopped
cleaning the shuffle files after this started happening.

Any idea why this could be happening?

2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
java.lang.Exception: Could not compute split, block input-0-1429706099000
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks
NB


On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com
wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a proper
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB










Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread Tathagata Das
What was the state of your streaming application? Was it falling behind
with a large increasing scheduling delay?

TD

On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote:

 Thanks for the response, Conor. I tried with those settings and for a
 while it seemed like it was cleaning up shuffle files after itself.
 However, after exactly 5 hours later it started throwing exceptions and
 eventually stopped working again. A sample stack trace is below. What is
 curious about 5 hours is that I set the cleaner ttl to 5 hours after
 changing the max window size to 1 hour (down from 6 hours in order to
 test). It also stopped cleaning the shuffle files after this started
 happening.

 Any idea why this could be happening?

 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
 java.lang.Exception: Could not compute split, block input-0-1429706099000
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Thanks
 NB


 On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
 conor.fenn...@altocloud.com wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a proper
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming 
 application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files
 is being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation 
 bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB











Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-21 Thread N B
We already do have a cron job in place to clean just the shuffle files.
However, what I would really like to know is whether there is a proper
way of telling spark to clean up these files once its done with them?

Thanks
NB


On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec
 rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and not
 being cleaned up by Spark. Since this is a Spark streaming application, it
 is expected to stay up indefinitely, so shuffle files not being cleaned up
 is a big problem right now. Our max window size is 6 hours, so we have set
 up a cron job to clean up shuffle files older than 12 hours otherwise it
 will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and meanwhile
 how to handle shuffle files will be greatly appreciated.

 Thanks
 NB








Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread N B
Hi all,

I had posed this query as part of a different thread but did not get a
response there. So creating a new thread hoping to catch someone's
attention.

We are experiencing this issue of shuffle files being left behind and not
being cleaned up by Spark. Since this is a Spark streaming application, it
is expected to stay up indefinitely, so shuffle files not being cleaned up
is a big problem right now. Our max window size is 6 hours, so we have set
up a cron job to clean up shuffle files older than 12 hours otherwise it
will eat up all our disk space.

Please see the following. It seems the non-cleaning of shuffle files is
being documented in 1.3.1.

https://github.com/apache/spark/pull/5074/files
https://issues.apache.org/jira/browse/SPARK-5836


Also, for some reason, the following JIRAs that were reported as functional
issues were closed as Duplicates of the above Documentation bug. Does this
mean that this issue won't be tackled at all?

https://issues.apache.org/jira/browse/SPARK-3563
https://issues.apache.org/jira/browse/SPARK-4796
https://issues.apache.org/jira/browse/SPARK-6011

Any further insight into whether this is being looked into and meanwhile
how to handle shuffle files will be greatly appreciated.

Thanks
NB


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread Jeetendra Gangele
Write a crone job for this like below

12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec
rm -rf {} \+
52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
+1440 -name spark-*-*-* -prune -exec rm -rf {} \+

On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and not
 being cleaned up by Spark. Since this is a Spark streaming application, it
 is expected to stay up indefinitely, so shuffle files not being cleaned up
 is a big problem right now. Our max window size is 6 hours, so we have set
 up a cron job to clean up shuffle files older than 12 hours otherwise it
 will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and meanwhile
 how to handle shuffle files will be greatly appreciated.

 Thanks
 NB




Re: Missing shuffle files

2015-02-28 Thread Corey Nolet
Just wanted to point out- raising the memory-head (as I saw in the logs)
was the fix for this issue and I have not seen dying executors since this
calue was increased

On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg arp...@spotify.com wrote:

 If you thinking of the yarn memory overhead, then yes, I have increased
 that as well. However, I'm glad to say that my job finished successfully
 finally. Besides the timeout and memory settings, performing repartitioning
 (with shuffling) at the right time seems to be the key to make this large
 job succeed. With all the transformations in the job, the partition
 distribution was becoming increasingly skewed. Not easy to figure out when
 and to what number of partitions to set, and takes forever to tweak these
 settings since it's works perfectly for small datasets and you'll have to
 experiment with large time-consuming jobs. Imagine if there was an
 automatic partition reconfiguration function that automagically did that...


 On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got
 over 6000 partitions for some of these RDDs which immediately blows the
 heap somehow- I'm still not exactly sure how. If I coalesce them down to
 about 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems
 to at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this 
 period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on 
 memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just

Re: Missing shuffle files

2015-02-24 Thread Anders Arpteg
If you thinking of the yarn memory overhead, then yes, I have increased
that as well. However, I'm glad to say that my job finished successfully
finally. Besides the timeout and memory settings, performing repartitioning
(with shuffling) at the right time seems to be the key to make this large
job succeed. With all the transformations in the job, the partition
distribution was becoming increasingly skewed. Not easy to figure out when
and to what number of partitions to set, and takes forever to tweak these
settings since it's works perfectly for small datasets and you'll have to
experiment with large time-consuming jobs. Imagine if there was an
automatic partition reconfiguration function that automagically did that...

On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got over
 6000 partitions for some of these RDDs which immediately blows the heap
 somehow- I'm still not exactly sure how. If I coalesce them down to about
 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do 
 some
 more advanced partition tuning to make this job work, as it's

Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
No, unfortunately we're not making use of dynamic allocation or the
external shuffle service. Hoping that we could reconfigure our cluster to
make use of it, but since it requires changes to the cluster itself (and
not just the Spark app), it could take some time.

Unsure if task 450 was acting as a reducer or not, but seems possible.
Probably due to a crashed executor as you say. Seems like I need to do some
more advanced partition tuning to make this job work, as it's currently
rather high number of partitions.

Thanks for the help so far! It's certainly a frustrating task to debug when
everything's working perfectly on sample data locally and crashes hard when
running on the full dataset on the cluster...

On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors do
 you have total? And what is the memory size for each? You can change what
 fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing that
 executors are being lost as well. Thing is, I can't figure out how they are
 dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
 allocated for the application. I was thinking perhaps it was possible that
 a single executor was getting a single or a couple large partitions but
 shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

  TIA,
 Anders







Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I'm looking @ my yarn container logs for some of the executors which appear
to be failing (with the missing shuffle files). I see exceptions that say
client.TransportClientFactor: Found inactive connection to host/ip:port,
closing it.

Right after that I see shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
connect to host/ip:port

Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

Finally, following the sigterm, I see FileNotFoundExcception:
/hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
such file for directory)

I'm looking @ the nodemanager and application master logs and I see no
indications whatsoever that there were any memory issues during this period
of time. The Spark UI is telling me none of the executors are really using
too much memory when this happens. It is a big job that's catching several
100's of GB but each node manager on the cluster has 64gb of ram just for
yarn containers (physical nodes have 128gb). On this cluster, we have 128
nodes. I've also tried using DISK_ONLY storage level but to no avail.

Any further ideas on how to track this down? Again, we're able to run this
same job on about 1/5th of the data just fine.The only thing that's
pointing me towards a memory issue is that it seems to be happening in the
same stages each time and when I lower the memory that each executor has
allocated it happens in earlier stages but I can't seem to find anything
that says an executor (or container for that matter) has run low on memory.



On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
 wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors do
 you have total? And what is the memory size for each? You can change what
 fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method

Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
Sounds very similar to what I experienced Corey. Something that seems to at
least help with my problems is to have more partitions. Am already fighting
between ending up with too many partitions in the end and having too few in
the beginning. By coalescing at late as possible and avoiding too few in
the beginning, the problems seems to decrease. Also, increasing
spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
significantly (~700 secs), the problems seems to almost disappear. Don't
wont to celebrate yet, still long way left before the job complete but it's
looking better...

On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run this
 same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
 wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
 what fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I've got the opposite problem with regards to partitioning. I've got over
6000 partitions for some of these RDDs which immediately blows the heap
somehow- I'm still not exactly sure how. If I coalesce them down to about
600-800 partitions, I get the problems where the executors are dying
without any other error messages (other than telling me the executor was
lost in the UI). If I don't coalesce, I pretty immediately get Java heap
space exceptions that kill the job altogether.

Putting in the timeouts didn't seem to help the case where I am coalescing.
Also, I don't see any dfferences between 'disk only' and 'memory and disk'
storage levels- both of them are having the same problems. I notice large
shuffle files (30-40gb) that only seem to spill a few hundred mb.

On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
  wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
 what fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I *think* this may have been related to the default memory overhead setting
being too low. I raised the value to 1G it and tried my job again but i had
to leave the office before it finished. It did get further but I'm not
exactly sure if that's just because i raised the memory. I'll see tomorrow-
but i have a suspicion this may have been the cause of the executors being
killed by the application master.
On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got over
 6000 partitions for some of these RDDs which immediately blows the heap
 somehow- I'm still not exactly sure how. If I coalesce them down to about
 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui 
 same...@databricks.com wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching
 the Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could
 cause the stage to fail. Sometimes this forces the previous stage to also
 be re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines

Re: Missing shuffle files

2015-02-22 Thread Sameer Farooqui
Do you guys have dynamic allocation turned on for YARN?

Anders, was Task 450 in your job acting like a Reducer and fetching the Map
spill output data from a different node?

If a Reducer task can't read the remote data it needs, that could cause the
stage to fail. Sometimes this forces the previous stage to also be
re-computed if it's a wide dependency.

But like Petar said, if you turn the external shuffle service on, YARN
NodeManager process on the slave machines will serve out the map spill
data, instead of the Executor JVMs (by default unless you turn external
shuffle on, the Executor JVM itself serves out the shuffle data which
causes problems if an Executor dies).

Core, how often are Executors crashing in your app? How many Executors do
you have total? And what is the memory size for each? You can change what
fraction of the Executor heap will be used for your user code vs the
shuffle vs RDD caching with the spark.storage.memoryFraction setting.

On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing that
 executors are being lost as well. Thing is, I can't figure out how they are
 dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
 allocated for the application. I was thinking perhaps it was possible that
 a single executor was getting a single or a couple large partitions but
 shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

  TIA,
 Anders






Missing shuffle files

2015-02-21 Thread Anders Arpteg
For large jobs, the following error message is shown that seems to indicate
that shuffle files for some reason are missing. It's a rather large job
with many partitions. If the data size is reduced, the problem disappears.
I'm running a build from Spark master post 1.2 (build at 2015-01-16) and
running on Yarn 2.2. Any idea of how to resolve this problem?

User class threw exception: Job aborted due to stage failure: Task 450 in
stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
java.io.FileNotFoundException:
/disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
(No such file or directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.(FileOutputStream.java:221)
 at java.io.FileOutputStream.(FileOutputStream.java:171)
 at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
 at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
 at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
 at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

TIA,
Anders


Re: Missing shuffle files

2015-02-21 Thread Corey Nolet
I'm experiencing the same issue. Upon closer inspection I'm noticing that
executors are being lost as well. Thing is, I can't figure out how they are
dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
allocated for the application. I was thinking perhaps it was possible that
a single executor was getting a single or a couple large partitions but
shouldn't the disk persistence kick in at that point?

On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

 User class threw exception: Job aborted due to stage failure: Task 450 in
 stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

 TIA,
 Anders




Re: Missing shuffle files

2015-02-21 Thread Petar Zecevic


Could you try to turn on the external shuffle service?

spark.shuffle.service.enable= true


On 21.2.2015. 17:50, Corey Nolet wrote:
I'm experiencing the same issue. Upon closer inspection I'm noticing 
that executors are being lost as well. Thing is, I can't figure out 
how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 
1.3TB of memory allocated for the application. I was thinking perhaps 
it was possible that a single executor was getting a single or a 
couple large partitions but shouldn't the disk persistence kick in at 
that point?


On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com 
mailto:arp...@spotify.com wrote:


For large jobs, the following error message is shown that seems to
indicate that shuffle files for some reason are missing. It's a
rather large job with many partitions. If the data size is
reduced, the problem disappears. I'm running a build from Spark
master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any
idea of how to resolve this problem?

User class threw exception: Job aborted due to stage failure: Task
450 in stage 450.1 failed 4 times, most recent failure: Lost task
450.3 in stage 450.1 (TID 167370,
lon4-hadoopslave-b77.lon4.spotify.net
http://lon4-hadoopslave-b77.lon4.spotify.net):
java.io.FileNotFoundException:

/disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
(No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

TIA,
Anders






Re: FileNotFoundException in appcache shuffle files

2015-01-10 Thread lucio raimondo
Hey, 

I am having a similar issue, did you manage to find a solution yet? Please
check my post below for reference:

http://apache-spark-user-list.1001560.n3.nabble.com/IOError-Errno-2-No-such-file-or-directory-tmp-spark-9e23f17e-2e23-4c26-9621-3cb4d8b832da-tmp3i3xno-td21076.html

Thank you,
Lucio



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p21077.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FileNotFoundException in appcache shuffle files

2015-01-10 Thread Aaron Davidson
As Jerry said, this is not related to shuffle file consolidation.

The unique thing about this problem is that it's failing to find a file
while trying to _write_ to it, in append mode. The simplest explanation for
this would be that the file is deleted in between some check for existence
and opening the file for append.

The deletion of such files as a race condition with writing them (on the
map side) would be most easily explained by a JVM shutdown event, for
instance caused by a fatal error such as OutOfMemoryError. So, as Ilya
said, please look for another exception possibly preceding this one.

On Sat, Jan 10, 2015 at 12:16 PM, lucio raimondo luxmea...@hotmail.com
wrote:

 Hey,

 I am having a similar issue, did you manage to find a solution yet?
 Please
 check my post below for reference:


 http://apache-spark-user-list.1001560.n3.nabble.com/IOError-Errno-2-No-such-file-or-directory-tmp-spark-9e23f17e-2e23-4c26-9621-3cb4d8b832da-tmp3i3xno-td21076.html

 Thank you,
 Lucio



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p21077.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: FileNotFoundException in appcache shuffle files

2014-10-29 Thread Shaocun Tian
- hopefully fixed in 1.1 with this patch:

 https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd

- 78f2af5 https://github.com/apache/spark/commit/78f2af5[3]
   implements pieces of #1609
   https://github.com/apache/spark/pull/1609[4], on which mridulm
   has a comment
   https://github.com/apache/spark/pull/1609#issuecomment-54393908[5]
   saying: it got split into four issues, two of which got committed, not
   sure of the other other two  And the first one was regressed upon in
   1.1.already.
   - Until 1.0.3 or 1.1 are released, the simplest solution is to
disable spark.shuffle.consolidateFiles.
- I've not tried this yet as I'm waiting on a re-run with some other
   parameters tweaked first.
   - Also, I can't tell if it's expected that this was fixed, known
   that it subsequently regressed, etc., so hoping for some guidance there.

 So! Anyone else seen this? Is this related to the bug in shuffle file
 consolidation? Was it fixed? Did it regress? Are my confs or other steps
 unreasonable in some way? Any assistance would be appreciated, thanks.

 -Ryan


 [1] https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0
 [2]
 http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CCANGvG8qtK57frWS+kaqTiUZ9jSLs5qJKXXjXTTQ9eh2-GsrmpA@...%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E
 [3] https://github.com/apache/spark/commit/78f2af5
 [4] https://github.com/apache/spark/pull/1609
 [5] https://github.com/apache/spark/pull/1609#issuecomment-54393908




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=dGlhbnNoYW9jdW5AZ21haWwuY29tfDF8NjkzNjc2OTQ4
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p17610.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: FileNotFoundException in appcache shuffle files

2014-10-29 Thread Ganelin, Ilya
Hi Ryan - I've been fighting the exact same issue for well over a month now. I 
initially saw the issue in 1.02 but it persists in 1.1.

Jerry - I believe you are correct that this happens during a pause on 
long-running jobs on a large data set. Are there any parameters that you 
suggest tuning to mitigate these situations?

Also, you ask if there are any other exceptions - for me this error has tended 
to follow an earlier exception, which supports the theory that it is a symptom 
of an earlier problem.

My understanding is as follows - during a shuffle step an executor fails and 
doesn't report its output - next, during the reduce step, that output can't be 
found where expected and rather than rerunning the failed execution, Spark goes 
down.

We can add my email thread to your reference list :
https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201410.mbox/CAM-S9zS-+-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd=h...@mail.gmail.com

-Original Message-
From: Shao, Saisai [saisai.s...@intel.commailto:saisai.s...@intel.com]
Sent: Wednesday, October 29, 2014 01:46 AM Eastern Standard Time
To: Ryan Williams
Cc: user
Subject: RE: FileNotFoundException in appcache shuffle files

Hi Ryan,

This is an issue from sort-based shuffle, not consolidated hash-based shuffle. 
I guess mostly this issue occurs when Spark cluster is in abnormal situation, 
maybe long time of GC pause or some others, you can check the system status or 
if there’s any other exceptions beside this one.

Thanks
Jerry

From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of 
Ryan Williams
Sent: Wednesday, October 29, 2014 1:31 PM
To: user
Subject: FileNotFoundException in appcache shuffle files

My job is failing with the following error:
14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 
(TID 6266, 
demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu):
 java.io.FileNotFoundException: 
/data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index
 (No such file or directory)
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)

org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732)
scala.collection.Iterator$class.foreach(Iterator.scala:727)

org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:56)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 
task-1543 failures are a few instances of this failure on another task. Here is 
the entire App Master stdout 
dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack 
traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}.

Here's a summary of the RDD manipulations I've done up to the point of failure:

 *   val A = [read a file in 1419 shards]

 *   the file is 177GB compressed but ends up being ~5TB uncompressed / 
hydrated into scala objects (I think; see below for more discussion on this 
point).
 *   some relevant Spark options:

 *   spark.default.parallelism=2000
 *   --master yarn-client
 *   --executor-memory 50g
 *   --driver-memory 10g
 *   --num-executors 100
 *   --executor-cores 4

 *   A.repartition(3000)

 *   3000 was chosen in an attempt to mitigate shuffle-disk-spillage that 
previous job attempts with 1000 or 1419 shards were mired

RE: FileNotFoundException in appcache shuffle files

2014-10-28 Thread Shao, Saisai
Hi Ryan,

This is an issue from sort-based shuffle, not consolidated hash-based shuffle. 
I guess mostly this issue occurs when Spark cluster is in abnormal situation, 
maybe long time of GC pause or some others, you can check the system status or 
if there’s any other exceptions beside this one.

Thanks
Jerry

From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of 
Ryan Williams
Sent: Wednesday, October 29, 2014 1:31 PM
To: user
Subject: FileNotFoundException in appcache shuffle files

My job is failing with the following error:
14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 
(TID 6266, 
demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu):
 java.io.FileNotFoundException: 
/data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index
 (No such file or directory)
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)

org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732)
scala.collection.Iterator$class.foreach(Iterator.scala:727)

org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:56)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 
task-1543 failures are a few instances of this failure on another task. Here is 
the entire App Master stdout 
dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack 
traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}.

Here's a summary of the RDD manipulations I've done up to the point of failure:

  *   val A = [read a file in 1419 shards]

 *   the file is 177GB compressed but ends up being ~5TB uncompressed / 
hydrated into scala objects (I think; see below for more discussion on this 
point).
 *   some relevant Spark options:

*   spark.default.parallelism=2000
*   --master yarn-client
*   --executor-memory 50g
*   --driver-memory 10g
*   --num-executors 100
*   --executor-cores 4

  *   A.repartition(3000)

 *   3000 was chosen in an attempt to mitigate shuffle-disk-spillage that 
previous job attempts with 1000 or 1419 shards were mired in

  *   A.persist()

  *   A.count()  // succeeds

 *   screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v
 *   I don't know why each task reports 8 TB of Input; that metric 
seems like it is always ludicrously high and I don't pay attention to it 
typically.
 *   Each task shuffle-writes 3.5GB, for a total of 4.9TB

*   Does that mean that 4.9TB is the uncompressed size of the file that 
A was read from?
*   4.9TB is pretty close to the total amount of memory I've configured 
the job to use: (50GB/executor) * (100 executors) ~= 5TB.
*   Is that a coincidence, or are my executors shuffle-writing an 
amount equal to all of their memory for some reason?

  *   val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x = 
x).persist()

 *   my expectation is that ~all elements pass the filter step, so B should 
~equal to A, just to give a sense of the expected memory blowup.

  *   B.count()

 *   this fails while executing .groupBy(...) above

I've found a few discussions of issues whose manifestations look *like* this, 
but nothing that is obviously the same issue

Re: Shuffle files

2014-10-20 Thread Chen Song
My observation is opposite. When my job runs under default
spark.shuffle.manager, I don't see this exception. However, when it runs
with SORT based, I start seeing this error? How would that be possible?

I am running my job in YARN, and I noticed that the YARN process limits
(cat /proc/$PID/limits) are not consistent with system wide limits (shown
by limit -a), I don't know how that happened. Is there a way to let Spark
driver to propagate this setting (limit -n number) to spark executors
before startup?




On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash and...@andrewash.com wrote:

 You will need to restart your Mesos workers to pick up the new limits as
 well.

 On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.com wrote:

 @SK:
 Make sure ulimit has taken effect as Todd mentioned. You can verify via
 ulimit -a. Also make sure you have proper kernel parameters set in
 /etc/sysctl.conf (MacOSX)

 On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com
 wrote:


 Are you sure the new ulimit has taken effect?

 How many cores are you using?  How many reducers?

 In general if a node in your cluster has C assigned cores and
 you run
 a job with X reducers then Spark will open C*X files in parallel
 and
 start writing. Shuffle consolidation will help decrease the total
 number of files created but the number of file handles open at
 any
 time doesn't change so it won't help the ulimit problem.

 Quoted from Patrick at:

 http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

 Thanks,

 Todd

 -Original Message-
 From: SK [mailto:skrishna...@gmail.com]
 Sent: Tuesday, October 7, 2014 2:12 PM
 To: u...@spark.incubator.apache.org
 Subject: Re: Shuffle files

 - We set ulimit to 50. But I still get the same too many open files
 warning.

 - I tried setting consolidateFiles to True, but that did not help either.

 I am using a Mesos cluster.   Does Mesos have any limit on the number of
 open files?

 thanks






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-- 
Chen Song


RE: Shuffle files

2014-10-20 Thread Shao, Saisai
Hi Song,

For what I know in sort-based shuffle.

Normally parallel opened file numbers for sort-based shuffle is much smaller 
than hash-based shuffle.

In hash based shuffle, parallel opened file numbers is C * R (where C is core 
number used and R is the reducer number), as you can see the file numbers are 
related to reducer number, no matter how large the shuffle size is.

While in sort-based shuffle, final map output file is only 1, to achieve this 
we need to do by-partition sorting, this will generate some intermediate 
spilling files, but spilled file numbers are related to shuffle size and memory 
size for shuffle, no relation to reducer number.

So If you met “too many open files” in sort-based shuffle, I guess that you 
have so many spilled files while doing shuffle write, one possible way to 
alleviate this is to increase the shuffle memory usage, also change the ulimit 
is a possible way.

I guess in Yarn you have to do system configuration manually, Spark cannot set 
ulimit automatically for you, I don’t think it’s an issue Spark should take 
care.

Thanks
Jerry

From: Chen Song [mailto:chen.song...@gmail.com]
Sent: Tuesday, October 21, 2014 9:10 AM
To: Andrew Ash
Cc: Sunny Khatri; Lisonbee, Todd; u...@spark.incubator.apache.org
Subject: Re: Shuffle files

My observation is opposite. When my job runs under default 
spark.shuffle.manager, I don't see this exception. However, when it runs with 
SORT based, I start seeing this error? How would that be possible?

I am running my job in YARN, and I noticed that the YARN process limits (cat 
/proc/$PID/limits) are not consistent with system wide limits (shown by limit 
-a), I don't know how that happened. Is there a way to let Spark driver to 
propagate this setting (limit -n number) to spark executors before startup?




On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash 
and...@andrewash.commailto:and...@andrewash.com wrote:
You will need to restart your Mesos workers to pick up the new limits as well.

On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri 
sunny.k...@gmail.commailto:sunny.k...@gmail.com wrote:
@SK:
Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit 
-a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf 
(MacOSX)

On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd 
todd.lison...@intel.commailto:todd.lison...@intel.com wrote:

Are you sure the new ulimit has taken effect?

How many cores are you using?  How many reducers?

In general if a node in your cluster has C assigned cores and you run
a job with X reducers then Spark will open C*X files in parallel and
start writing. Shuffle consolidation will help decrease the total
number of files created but the number of file handles open at any
time doesn't change so it won't help the ulimit problem.

Quoted from Patrick at:
http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

Thanks,

Todd

-Original Message-
From: SK [mailto:skrishna...@gmail.commailto:skrishna...@gmail.com]
Sent: Tuesday, October 7, 2014 2:12 PM
To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: Shuffle files

- We set ulimit to 50. But I still get the same too many open files
warning.

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





--
Chen Song



Re: Shuffle files

2014-10-07 Thread SK
- We set ulimit to 50. But I still get the same too many open files
warning. 

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Storing shuffle files on a Tachyon

2014-10-07 Thread Soumya Simanta
Is it possible to store spark shuffle files on Tachyon ?


RE: Shuffle files

2014-10-07 Thread Lisonbee, Todd

Are you sure the new ulimit has taken effect?
 
How many cores are you using?  How many reducers?

In general if a node in your cluster has C assigned cores and you run 
a job with X reducers then Spark will open C*X files in parallel and 
start writing. Shuffle consolidation will help decrease the total 
number of files created but the number of file handles open at any 
time doesn't change so it won't help the ulimit problem.

Quoted from Patrick at:
http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

Thanks,

Todd

-Original Message-
From: SK [mailto:skrishna...@gmail.com] 
Sent: Tuesday, October 7, 2014 2:12 PM
To: u...@spark.incubator.apache.org
Subject: Re: Shuffle files

- We set ulimit to 50. But I still get the same too many open files
warning. 

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files

2014-10-07 Thread Sunny Khatri
@SK:
Make sure ulimit has taken effect as Todd mentioned. You can verify via
ulimit -a. Also make sure you have proper kernel parameters set in
/etc/sysctl.conf (MacOSX)

On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com
wrote:


 Are you sure the new ulimit has taken effect?

 How many cores are you using?  How many reducers?

 In general if a node in your cluster has C assigned cores and you
 run
 a job with X reducers then Spark will open C*X files in parallel
 and
 start writing. Shuffle consolidation will help decrease the total
 number of files created but the number of file handles open at any
 time doesn't change so it won't help the ulimit problem.

 Quoted from Patrick at:

 http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

 Thanks,

 Todd

 -Original Message-
 From: SK [mailto:skrishna...@gmail.com]
 Sent: Tuesday, October 7, 2014 2:12 PM
 To: u...@spark.incubator.apache.org
 Subject: Re: Shuffle files

 - We set ulimit to 50. But I still get the same too many open files
 warning.

 - I tried setting consolidateFiles to True, but that did not help either.

 I am using a Mesos cluster.   Does Mesos have any limit on the number of
 open files?

 thanks






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Shuffle files

2014-10-07 Thread Andrew Ash
You will need to restart your Mesos workers to pick up the new limits as
well.

On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.com wrote:

 @SK:
 Make sure ulimit has taken effect as Todd mentioned. You can verify via
 ulimit -a. Also make sure you have proper kernel parameters set in
 /etc/sysctl.conf (MacOSX)

 On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com
 wrote:


 Are you sure the new ulimit has taken effect?

 How many cores are you using?  How many reducers?

 In general if a node in your cluster has C assigned cores and
 you run
 a job with X reducers then Spark will open C*X files in parallel
 and
 start writing. Shuffle consolidation will help decrease the total
 number of files created but the number of file handles open at any
 time doesn't change so it won't help the ulimit problem.

 Quoted from Patrick at:

 http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

 Thanks,

 Todd

 -Original Message-
 From: SK [mailto:skrishna...@gmail.com]
 Sent: Tuesday, October 7, 2014 2:12 PM
 To: u...@spark.incubator.apache.org
 Subject: Re: Shuffle files

 - We set ulimit to 50. But I still get the same too many open files
 warning.

 - I tried setting consolidateFiles to True, but that did not help either.

 I am using a Mesos cluster.   Does Mesos have any limit on the number of
 open files?

 thanks






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Shuffle files

2014-09-25 Thread SK
Hi,

I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a
directory (I am using  sc.textfile(dir/*) ) to read in the files.  I am
getting the following warning:

WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99,
mesos12-dev.sccps.net): java.io.FileNotFoundException:
/tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open
files)

basically I think a lot of shuffle files are being created. 

1) The tasks eventually fail and the job just hangs (after taking very long,
more than an hour).  If I read these 30 files in a for loop, the same job
completes in a few minutes. However, I need to specify the files names,
which is not convenient. I am assuming that sc.textfile(dir/*) creates a
large RDD for all the 30 files. Is there a way to make the operation on this
large RDD efficient so as to avoid creating too many shuffle files?


2) Also, I am finding that all the shuffle files for my other completed jobs
are not being automatically deleted even after days. I thought that
sc.stop() clears the intermediate files.  Is there some way to
programmatically delete these temp shuffle files upon job completion?


thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files

2014-09-25 Thread Andrew Ash
Hi SK,

For the problem with lots of shuffle files and the too many open files
exception there are a couple options:

1. The linux kernel has a limit on the number of open files at once.  This
is set with ulimit -n, and can be set permanently in /etc/sysctl.conf or
/etc/sysctl.d/.  Try increasing this to a large value, at the bare minimum
the square of your partition count.
2. Try using shuffle consolidation -- spark.shuffle.consolidateFiles=true This
option writes fewer files to disk so shouldn't hit limits nearly as much
3. Try using the sort-based shuffle by setting spark.shuffle.manager=SORT.
You should likely hold off on this until
https://issues.apache.org/jira/browse/SPARK-3032 is fixed, hopefully in
1.1.1

Hope that helps!
Andrew

On Thu, Sep 25, 2014 at 4:20 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a
 directory (I am using  sc.textfile(dir/*) ) to read in the files.  I am
 getting the following warning:

 WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99,
 mesos12-dev.sccps.net): java.io.FileNotFoundException:
 /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open
 files)

 basically I think a lot of shuffle files are being created.

 1) The tasks eventually fail and the job just hangs (after taking very
 long,
 more than an hour).  If I read these 30 files in a for loop, the same job
 completes in a few minutes. However, I need to specify the files names,
 which is not convenient. I am assuming that sc.textfile(dir/*) creates a
 large RDD for all the 30 files. Is there a way to make the operation on
 this
 large RDD efficient so as to avoid creating too many shuffle files?


 2) Also, I am finding that all the shuffle files for my other completed
 jobs
 are not being automatically deleted even after days. I thought that
 sc.stop() clears the intermediate files.  Is there some way to
 programmatically delete these temp shuffle files upon job completion?


 thanks





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spilled shuffle files not being cleared

2014-06-13 Thread Michael Chang
Thanks Saisai, I think I will just try lowering my spark.cleaner.ttl value
- I've set it to an hour.


On Thu, Jun 12, 2014 at 7:32 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Michael,



 I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata
 cleaner, which will clean old un-used shuffle data when it is timeout.



 For Spark 1.0 another way is to clean shuffle data using weak reference
 (reference tracking based, configuration is
 spark.cleaner.referenceTracking), and it is enabled by default.



 Thanks

 Saisai



 *From:* Michael Chang [mailto:m...@tellapart.com]
 *Sent:* Friday, June 13, 2014 10:15 AM
 *To:* user@spark.apache.org
 *Subject:* Re: Spilled shuffle files not being cleared



 Bump



 On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote:

 Hi all,



 I'm seeing exceptions that look like the below in Spark 0.9.1.  It looks
 like I'm running out of inodes on my machines (I have around 300k each in a
 12 machine cluster).  I took a quick look and I'm seeing some shuffle spill
 files that are around even around 12 minutes after they are created.  Can
 someone help me understand when these shuffle spill files should be cleaned
 up (Is it as soon as they are used?)



 Thanks,

 Michael





 java.io.FileNotFoundException:
 /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0
 (No space left on device)

 at java.io.FileOutputStream.open(Native Method)

 at java.io.FileOutputStream.init(FileOutputStream.java:221)

 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118)

 at
 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)

 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)

 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)

 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)

 at org.apache.spark.scheduler.Task.run(Task.scala:53)

 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)

 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:415)

 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)

 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:744)

 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7)

 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException





Re: Spilled shuffle files not being cleared

2014-06-12 Thread Michael Chang
Bump


On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote:

 Hi all,

 I'm seeing exceptions that look like the below in Spark 0.9.1.  It looks
 like I'm running out of inodes on my machines (I have around 300k each in a
 12 machine cluster).  I took a quick look and I'm seeing some shuffle spill
 files that are around even around 12 minutes after they are created.  Can
 someone help me understand when these shuffle spill files should be cleaned
 up (Is it as soon as they are used?)

 Thanks,
 Michael


 java.io.FileNotFoundException:
 /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0
 (No space left on device)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7)
 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException



RE: Spilled shuffle files not being cleared

2014-06-12 Thread Shao, Saisai
Hi Michael,

I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata 
cleaner, which will clean old un-used shuffle data when it is timeout.

For Spark 1.0 another way is to clean shuffle data using weak reference 
(reference tracking based, configuration is spark.cleaner.referenceTracking), 
and it is enabled by default.

Thanks
Saisai

From: Michael Chang [mailto:m...@tellapart.com]
Sent: Friday, June 13, 2014 10:15 AM
To: user@spark.apache.org
Subject: Re: Spilled shuffle files not being cleared

Bump

On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang 
m...@tellapart.commailto:m...@tellapart.com wrote:
Hi all,

I'm seeing exceptions that look like the below in Spark 0.9.1.  It looks like 
I'm running out of inodes on my machines (I have around 300k each in a 12 
machine cluster).  I took a quick look and I'm seeing some shuffle spill files 
that are around even around 12 minutes after they are created.  Can someone 
help me understand when these shuffle spill files should be cleaned up (Is it 
as soon as they are used?)

Thanks,
Michael


java.io.FileNotFoundException: 
/mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0
 (No space left on device)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
at 
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at 
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7)
14/06/09 22:07:36 WARN TaskSetManager: Loss was due to 
java.io.FileNotFoundException



Spilled shuffle files not being cleared

2014-06-09 Thread Michael Chang
Hi all,

I'm seeing exceptions that look like the below in Spark 0.9.1.  It looks
like I'm running out of inodes on my machines (I have around 300k each in a
12 machine cluster).  I took a quick look and I'm seeing some shuffle spill
files that are around even around 12 minutes after they are created.  Can
someone help me understand when these shuffle spill files should be cleaned
up (Is it as soon as they are used?)

Thanks,
Michael


java.io.FileNotFoundException:
/mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0
(No space left on device)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:221)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7)
14/06/09 22:07:36 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException


Re: Shuffle Files

2014-03-04 Thread Aniket Mokashi
From BlockManager code + ShuffleMapTask code, it writes under
spark.local.dir or java.io.tmpdir.

val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get(spark.local.dir,  System.getProperty(java.io.tmpdir)))




On Mon, Mar 3, 2014 at 10:45 PM, Usman Ghani us...@platfora.com wrote:

 Where on the filesystem does spark write the shuffle files?




-- 
...:::Aniket:::... Quetzalco@tl


Shuffle Files

2014-03-03 Thread Usman Ghani
Where on the filesystem does spark write the shuffle files?