Re: Re: [spark-core] Can executors recover/reuse shuffle files upon failure?
Just to correct the last sentence, if we end up starting a new instance of Spark, I don't think it will be able to read the shuffle data from storage from another instance, I stand corrected. Mich Talebzadeh, Lead Solutions Architect/Engineering Lead Palantir Technologies Limited London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 22 May 2023 at 15:27, Mich Talebzadeh wrote: > Hi Maksym. > > Let us understand the basics here first > My thoughtsSpark replicates the partitions among multiple nodes. If one > executor fails, it moves the processing over to the other executor. > However, if the data is lost, it re-executes the processing that generated > the data, > and might have to go back to the source. In case of failure, there will > be delay in getting the results. The amount of delay depends on how much > reprocessing Spark needs to do. > Spark, by itself, doesn't add executors when executors fail. It just moves > the tasks to other executors. If you are installing plain vanilla Spark > on your own cluster, you need to figure out how to bring back executors. > Most of the popular platforms built on top of Spark (Glue, EMR, GKS) will > replace failed nodes. However, I don't think that applies to your case. > > With regard to below point you raised > > "" One of the offerings from the service we use is EBS migration which > basically means if a host is about to get evicted, a new host is created > and the EBS volume is attached to it. When Spark assigns a new executor > to the newly created instance, it basically can recover all the shuffle > files that are already persisted in the migrated EBS volume Is this how > it works? Do executors recover / re-register the shuffle files that they > found?""" > > My understanding is that RDD lineage keeps track of records of what needs > to be re-executed. It uses RDD lineage to figure out what needs to be > re-executed in the same Spark instance. For example, if you have done a > groupBy Key , you will have 2 stages. After the first stage, the data will > be shuffled by hashing the groupBy key , so that data for the same value of > key lands in same partition. Now, if one of those partitions is lost > during execution of second stage, I am guessing Spark will have to go back > and re-execute all the tasks in the first stage. > > > So it your case stating a new instance will not have the same issue, > re-executes the job. > > > HTH > > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 22 May 2023 at 13:19, Maksym M > wrote: > >> Hey vaquar, >> >> The link does't explain the crucial detail we're interested in - does >> executor >> re-use the data that exists on a node from previous executor and if not, >> how >> can we configure it to do so? >> >> We are not running on kubernetes, so EKS/Kubernetes-specific advice isn't >> very relevant. >> >> We are running spark standalone mode. >> >> Best regards, >> maksym >> >> On 2023/05/17 12:28:35 vaquar khan wrote: >> > Following link you will get all required details >> > >> > >> https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/ >> > >> > Let me know if you required further informations. >> > >> > >> > Regards, >> > Vaquar khan >> > >> > >> > >> > >> > On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh >> > wrote: >> > >> > > Couple of points >> > > >> > > Why use spot or pre-empt intantes when your application as you stated >> > &
Re: Re: [spark-core] Can executors recover/reuse shuffle files upon failure?
Hi Maksym. Let us understand the basics here first My thoughtsSpark replicates the partitions among multiple nodes. If one executor fails, it moves the processing over to the other executor. However, if the data is lost, it re-executes the processing that generated the data, and might have to go back to the source. In case of failure, there will be delay in getting the results. The amount of delay depends on how much reprocessing Spark needs to do. Spark, by itself, doesn't add executors when executors fail. It just moves the tasks to other executors. If you are installing plain vanilla Spark on your own cluster, you need to figure out how to bring back executors. Most of the popular platforms built on top of Spark (Glue, EMR, GKS) will replace failed nodes. However, I don't think that applies to your case. With regard to below point you raised "" One of the offerings from the service we use is EBS migration which basically means if a host is about to get evicted, a new host is created and the EBS volume is attached to it. When Spark assigns a new executor to the newly created instance, it basically can recover all the shuffle files that are already persisted in the migrated EBS volume Is this how it works? Do executors recover / re-register the shuffle files that they found?""" My understanding is that RDD lineage keeps track of records of what needs to be re-executed. It uses RDD lineage to figure out what needs to be re-executed in the same Spark instance. For example, if you have done a groupBy Key , you will have 2 stages. After the first stage, the data will be shuffled by hashing the groupBy key , so that data for the same value of key lands in same partition. Now, if one of those partitions is lost during execution of second stage, I am guessing Spark will have to go back and re-execute all the tasks in the first stage. So it your case stating a new instance will not have the same issue, re-executes the job. HTH Mich Talebzadeh, Lead Solutions Architect/Engineering Lead Palantir Technologies Limited London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 22 May 2023 at 13:19, Maksym M wrote: > Hey vaquar, > > The link does't explain the crucial detail we're interested in - does > executor > re-use the data that exists on a node from previous executor and if not, > how > can we configure it to do so? > > We are not running on kubernetes, so EKS/Kubernetes-specific advice isn't > very relevant. > > We are running spark standalone mode. > > Best regards, > maksym > > On 2023/05/17 12:28:35 vaquar khan wrote: > > Following link you will get all required details > > > > > https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/ > > > > Let me know if you required further informations. > > > > > > Regards, > > Vaquar khan > > > > > > > > > > On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh > > wrote: > > > > > Couple of points > > > > > > Why use spot or pre-empt intantes when your application as you stated > > > shuffles heavily. > > > Have you looked at why you are having these shuffles? What is the > cause of > > > these large transformations ending up in shuffle > > > > > > Also on your point: > > > "..then ideally we should expect that when an executor is killed/OOM'd > > > and a new executor is spawned on the same host, the new executor > registers > > > the shuffle files to itself. Is that so?" > > > > > > What guarantee is that the new executor with inherited shuffle files > will > > > succeed? > > > > > > Also OOM is often associated with some form of skewed data > > > > > > HTH > > > . > > > Mich Talebzadeh, > > > Lead Solutions Architect/Engineering Lead > > > Palantir Technologies Limited > > > London > > > United Kingdom > > > > > > > > >view my Linkedin profile > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > > > > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for > any &
RE: Re: [spark-core] Can executors recover/reuse shuffle files upon failure?
Hey vaquar, The link does't explain the crucial detail we're interested in - does executor re-use the data that exists on a node from previous executor and if not, how can we configure it to do so? We are not running on kubernetes, so EKS/Kubernetes-specific advice isn't very relevant. We are running spark standalone mode. Best regards, maksym On 2023/05/17 12:28:35 vaquar khan wrote: > Following link you will get all required details > > https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/ > > Let me know if you required further informations. > > > Regards, > Vaquar khan > > > > > On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh > wrote: > > > Couple of points > > > > Why use spot or pre-empt intantes when your application as you stated > > shuffles heavily. > > Have you looked at why you are having these shuffles? What is the cause of > > these large transformations ending up in shuffle > > > > Also on your point: > > "..then ideally we should expect that when an executor is killed/OOM'd > > and a new executor is spawned on the same host, the new executor registers > > the shuffle files to itself. Is that so?" > > > > What guarantee is that the new executor with inherited shuffle files will > > succeed? > > > > Also OOM is often associated with some form of skewed data > > > > HTH > > . > > Mich Talebzadeh, > > Lead Solutions Architect/Engineering Lead > > Palantir Technologies Limited > > London > > United Kingdom > > > > > >view my Linkedin profile > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > > loss, damage or destruction of data or any other property which may arise > > from relying on this email's technical content is explicitly disclaimed. > > The author will in no case be liable for any monetary damages arising from > > such loss, damage or destruction. > > > > > > > > > > On Mon, 15 May 2023 at 13:11, Faiz Halde > > wrote: > > > >> Hello, > >> > >> We've been in touch with a few spark specialists who suggested us a > >> potential solution to improve the reliability of our jobs that are shuffle > >> heavy > >> > >> Here is what our setup looks like > >> > >>- Spark version: 3.3.1 > >>- Java version: 1.8 > >>- We do not use external shuffle service > >>- We use spot instances > >> > >> We run spark jobs on clusters that use Amazon EBS volumes. The > >> spark.local.dir is mounted on this EBS volume. One of the offerings from > >> the service we use is EBS migration which basically means if a host is > >> about to get evicted, a new host is created and the EBS volume is attached > >> to it > >> > >> When Spark assigns a new executor to the newly created instance, it > >> basically can recover all the shuffle files that are already persisted in > >> the migrated EBS volume > >> > >> Is this how it works? Do executors recover / re-register the shuffle > >> files that they found? > >> > >> So far I have not come across any recovery mechanism. I can only see > >> > >> KubernetesLocalDiskShuffleDataIO > >> > >> that has a pre-init step where it tries to register the available > >> shuffle files to itself > >> > >> A natural follow-up on this, > >> > >> If what they claim is true, then ideally we should expect that when an > >> executor is killed/OOM'd and a new executor is spawned on the same host, > >> the new executor registers the shuffle files to itself. Is that so? > >> > >> Thanks > >> > >> -- > >> Confidentiality note: This e-mail may contain confidential information > >> from Nu Holdings Ltd and/or its affiliates. If you have received it by > >> mistake, please let us know by e-mail reply and delete it from your system; > >> you may not copy this message or disclose its contents to anyone; for > >> details about what personal information we collect and why, please refer to > >> our privacy policy > >> <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54> > >> . > >> > > > -- Confidentiality note: This e-mail may contain confidential information from Nu Holdings Ltd and/or its affiliates. If you have received it by mistake, please let us know by e-mail reply and delete it from your system; you may not copy this message or disclose its contents to anyone; for details about what personal information we collect and why, please refer to our privacy policy <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [spark-core] Can executors recover/reuse shuffle files upon failure?
Following link you will get all required details https://aws.amazon.com/blogs/containers/best-practices-for-running-spark-on-amazon-eks/ Let me know if you required further informations. Regards, Vaquar khan On Mon, May 15, 2023, 10:14 PM Mich Talebzadeh wrote: > Couple of points > > Why use spot or pre-empt intantes when your application as you stated > shuffles heavily. > Have you looked at why you are having these shuffles? What is the cause of > these large transformations ending up in shuffle > > Also on your point: > "..then ideally we should expect that when an executor is killed/OOM'd > and a new executor is spawned on the same host, the new executor registers > the shuffle files to itself. Is that so?" > > What guarantee is that the new executor with inherited shuffle files will > succeed? > > Also OOM is often associated with some form of skewed data > > HTH > . > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 15 May 2023 at 13:11, Faiz Halde > wrote: > >> Hello, >> >> We've been in touch with a few spark specialists who suggested us a >> potential solution to improve the reliability of our jobs that are shuffle >> heavy >> >> Here is what our setup looks like >> >>- Spark version: 3.3.1 >>- Java version: 1.8 >>- We do not use external shuffle service >>- We use spot instances >> >> We run spark jobs on clusters that use Amazon EBS volumes. The >> spark.local.dir is mounted on this EBS volume. One of the offerings from >> the service we use is EBS migration which basically means if a host is >> about to get evicted, a new host is created and the EBS volume is attached >> to it >> >> When Spark assigns a new executor to the newly created instance, it >> basically can recover all the shuffle files that are already persisted in >> the migrated EBS volume >> >> Is this how it works? Do executors recover / re-register the shuffle >> files that they found? >> >> So far I have not come across any recovery mechanism. I can only see >> >> KubernetesLocalDiskShuffleDataIO >> >> that has a pre-init step where it tries to register the available >> shuffle files to itself >> >> A natural follow-up on this, >> >> If what they claim is true, then ideally we should expect that when an >> executor is killed/OOM'd and a new executor is spawned on the same host, >> the new executor registers the shuffle files to itself. Is that so? >> >> Thanks >> >> -- >> Confidentiality note: This e-mail may contain confidential information >> from Nu Holdings Ltd and/or its affiliates. If you have received it by >> mistake, please let us know by e-mail reply and delete it from your system; >> you may not copy this message or disclose its contents to anyone; for >> details about what personal information we collect and why, please refer to >> our privacy policy >> <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54> >> . >> >
Re: [spark-core] Can executors recover/reuse shuffle files upon failure?
Couple of points Why use spot or pre-empt intantes when your application as you stated shuffles heavily. Have you looked at why you are having these shuffles? What is the cause of these large transformations ending up in shuffle Also on your point: "..then ideally we should expect that when an executor is killed/OOM'd and a new executor is spawned on the same host, the new executor registers the shuffle files to itself. Is that so?" What guarantee is that the new executor with inherited shuffle files will succeed? Also OOM is often associated with some form of skewed data HTH . Mich Talebzadeh, Lead Solutions Architect/Engineering Lead Palantir Technologies Limited London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 15 May 2023 at 13:11, Faiz Halde wrote: > Hello, > > We've been in touch with a few spark specialists who suggested us a > potential solution to improve the reliability of our jobs that are shuffle > heavy > > Here is what our setup looks like > >- Spark version: 3.3.1 >- Java version: 1.8 >- We do not use external shuffle service >- We use spot instances > > We run spark jobs on clusters that use Amazon EBS volumes. The > spark.local.dir is mounted on this EBS volume. One of the offerings from > the service we use is EBS migration which basically means if a host is > about to get evicted, a new host is created and the EBS volume is attached > to it > > When Spark assigns a new executor to the newly created instance, it > basically can recover all the shuffle files that are already persisted in > the migrated EBS volume > > Is this how it works? Do executors recover / re-register the shuffle files > that they found? > > So far I have not come across any recovery mechanism. I can only see > > KubernetesLocalDiskShuffleDataIO > > that has a pre-init step where it tries to register the available shuffle > files to itself > > A natural follow-up on this, > > If what they claim is true, then ideally we should expect that when an > executor is killed/OOM'd and a new executor is spawned on the same host, > the new executor registers the shuffle files to itself. Is that so? > > Thanks > > -- > Confidentiality note: This e-mail may contain confidential information > from Nu Holdings Ltd and/or its affiliates. If you have received it by > mistake, please let us know by e-mail reply and delete it from your system; > you may not copy this message or disclose its contents to anyone; for > details about what personal information we collect and why, please refer to > our privacy policy > <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54> > . >
[spark-core] Can executors recover/reuse shuffle files upon failure?
Hello, We've been in touch with a few spark specialists who suggested us a potential solution to improve the reliability of our jobs that are shuffle heavy Here is what our setup looks like - Spark version: 3.3.1 - Java version: 1.8 - We do not use external shuffle service - We use spot instances We run spark jobs on clusters that use Amazon EBS volumes. The spark.local.dir is mounted on this EBS volume. One of the offerings from the service we use is EBS migration which basically means if a host is about to get evicted, a new host is created and the EBS volume is attached to it When Spark assigns a new executor to the newly created instance, it basically can recover all the shuffle files that are already persisted in the migrated EBS volume Is this how it works? Do executors recover / re-register the shuffle files that they found? So far I have not come across any recovery mechanism. I can only see KubernetesLocalDiskShuffleDataIO that has a pre-init step where it tries to register the available shuffle files to itself A natural follow-up on this, If what they claim is true, then ideally we should expect that when an executor is killed/OOM'd and a new executor is spawned on the same host, the new executor registers the shuffle files to itself. Is that so? Thanks -- Confidentiality note: This e-mail may contain confidential information from Nu Holdings Ltd and/or its affiliates. If you have received it by mistake, please let us know by e-mail reply and delete it from your system; you may not copy this message or disclose its contents to anyone; for details about what personal information we collect and why, please refer to our privacy policy <https://api.mziq.com/mzfilemanager/v2/d/59a081d2-0d63-4bb5-b786-4c07ae26bc74/6f4939b9-5f74-a528-1835-596b481dca54>.
Re: [EXTERNAL] spark re-use shuffle files not happening
ok thanks. guess i am simply misremembering that i saw the shuffle files getting re-used across jobs (actions). it was probably across stages for the same job. in structured streaming this is a pretty big deal. if you join a streaming dataframe with a large static dataframe each microbatch becomes a job (action), so the large static dataframe gets reshuffled for every microbatch. observing this performance issue was actually why i did the little basic experiment in this post. On Sat, Jul 16, 2022 at 12:33 PM Shay Elbaz wrote: > Spark can reuse shuffle stages *in the same job *(action), not cross jobs. > -- > *From:* Koert Kuipers > *Sent:* Saturday, July 16, 2022 6:43 PM > *To:* user > *Subject:* [EXTERNAL] spark re-use shuffle files not happening > > > *ATTENTION:* This email originated from outside of GM. > > > i have seen many jobs where spark re-uses shuffle files (and skips a stage > of a job), which is an awesome feature given how expensive shuffles are, > and i generally now assume this will happen. > > however i feel like i am going a little crazy today. i did the simplest > test in spark 3.3.0, basically i run 2 jobs within same spark shell, so > using same spark session, and broadcast join is disabled so we get > shuffles: > 1) job1 joins dataframe1 with dataframe0 and writes results out. > 2) job2 joins dataframe2 with dataframe0 and writes results out. > > i would expect job2 to skip the stage where dataframe0 is getting > shuffled, but its not skipping it! what am i missing? > is shuffle re-use only enabled within same job/action? that goes against > what i remember... > > code: > $ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1 > scala> val data0 = spark.read.format("csv").option("header", > true).load("data0.csv") > scala> val data1 = spark.read.format("csv").option("header", > true).load("data1.csv") > scala> val data2 = spark.read.format("csv").option("header", > true).load("data2.csv") > scala> data1.join(data0, "key").write.format("parquet").save("out1") > scala> data2.join(data0, "key").write.format("parquet").save("out2") // > should skip stage that scans csv for data0 and writes shuffle files... but > it doesn't > > > > CONFIDENTIALITY NOTICE: This electronic communication and any files > transmitted with it are confidential, privileged and intended solely for > the use of the individual or entity to whom they are addressed. If you are > not the intended recipient, you are hereby notified that any disclosure, > copying, distribution (electronic or otherwise) or forwarding of, or the > taking of any action in reliance on the contents of this transmission is > strictly prohibited. Please notify the sender immediately by e-mail if you > have received this email by mistake and delete this email from your system. > > Is it necessary to print this email? If you care about the environment > like we do, please refrain from printing emails. It helps to keep the > environment forested and litter-free. > -- CONFIDENTIALITY NOTICE: This electronic communication and any files transmitted with it are confidential, privileged and intended solely for the use of the individual or entity to whom they are addressed. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution (electronic or otherwise) or forwarding of, or the taking of any action in reliance on the contents of this transmission is strictly prohibited. Please notify the sender immediately by e-mail if you have received this email by mistake and delete this email from your system. Is it necessary to print this email? If you care about the environment like we do, please refrain from printing emails. It helps to keep the environment forested and litter-free.
Re: [EXTERNAL] spark re-use shuffle files not happening
Spark can reuse shuffle stages in the same job (action), not cross jobs. From: Koert Kuipers Sent: Saturday, July 16, 2022 6:43 PM To: user Subject: [EXTERNAL] spark re-use shuffle files not happening ATTENTION: This email originated from outside of GM. i have seen many jobs where spark re-uses shuffle files (and skips a stage of a job), which is an awesome feature given how expensive shuffles are, and i generally now assume this will happen. however i feel like i am going a little crazy today. i did the simplest test in spark 3.3.0, basically i run 2 jobs within same spark shell, so using same spark session, and broadcast join is disabled so we get shuffles: 1) job1 joins dataframe1 with dataframe0 and writes results out. 2) job2 joins dataframe2 with dataframe0 and writes results out. i would expect job2 to skip the stage where dataframe0 is getting shuffled, but its not skipping it! what am i missing? is shuffle re-use only enabled within same job/action? that goes against what i remember... code: $ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1 scala> val data0 = spark.read.format("csv").option("header", true).load("data0.csv") scala> val data1 = spark.read.format("csv").option("header", true).load("data1.csv") scala> val data2 = spark.read.format("csv").option("header", true).load("data2.csv") scala> data1.join(data0, "key").write.format("parquet").save("out1") scala> data2.join(data0, "key").write.format("parquet").save("out2") // should skip stage that scans csv for data0 and writes shuffle files... but it doesn't CONFIDENTIALITY NOTICE: This electronic communication and any files transmitted with it are confidential, privileged and intended solely for the use of the individual or entity to whom they are addressed. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution (electronic or otherwise) or forwarding of, or the taking of any action in reliance on the contents of this transmission is strictly prohibited. Please notify the sender immediately by e-mail if you have received this email by mistake and delete this email from your system. Is it necessary to print this email? If you care about the environment like we do, please refrain from printing emails. It helps to keep the environment forested and litter-free.
spark re-use shuffle files not happening
i have seen many jobs where spark re-uses shuffle files (and skips a stage of a job), which is an awesome feature given how expensive shuffles are, and i generally now assume this will happen. however i feel like i am going a little crazy today. i did the simplest test in spark 3.3.0, basically i run 2 jobs within same spark shell, so using same spark session, and broadcast join is disabled so we get shuffles: 1) job1 joins dataframe1 with dataframe0 and writes results out. 2) job2 joins dataframe2 with dataframe0 and writes results out. i would expect job2 to skip the stage where dataframe0 is getting shuffled, but its not skipping it! what am i missing? is shuffle re-use only enabled within same job/action? that goes against what i remember... code: $ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1 scala> val data0 = spark.read.format("csv").option("header", true).load("data0.csv") scala> val data1 = spark.read.format("csv").option("header", true).load("data1.csv") scala> val data2 = spark.read.format("csv").option("header", true).load("data2.csv") scala> data1.join(data0, "key").write.format("parquet").save("out1") scala> data2.join(data0, "key").write.format("parquet").save("out2") // should skip stage that scans csv for data0 and writes shuffle files... but it doesn't -- CONFIDENTIALITY NOTICE: This electronic communication and any files transmitted with it are confidential, privileged and intended solely for the use of the individual or entity to whom they are addressed. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution (electronic or otherwise) or forwarding of, or the taking of any action in reliance on the contents of this transmission is strictly prohibited. Please notify the sender immediately by e-mail if you have received this email by mistake and delete this email from your system. Is it necessary to print this email? If you care about the environment like we do, please refrain from printing emails. It helps to keep the environment forested and litter-free.
Re: Spark with External Shuffle Service - using saved shuffle files in the event of executor failure
Hello, I have answered it on the Stack Overflow. Best Regards, Attila On Wed, May 12, 2021 at 4:57 PM Chris Thomas wrote: > Hi, > > I am pretty confident I have observed Spark configured with the Shuffle > Service continuing to fetch shuffle files on a node in the event of > executor failure, rather than recompute the shuffle files as happens > without the Shuffle Service. Can anyone confirm this? > > (I have a SO question > <https://stackoverflow.com/questions/67466878/can-spark-with-external-shuffle-service-use-saved-shuffle-files-in-the-event-of>open > on the same if you would rather answer directly there). > > Kind regards, > > Chris > > >
Spark with External Shuffle Service - using saved shuffle files in the event of executor failure
Hi, I am pretty confident I have observed Spark configured with the Shuffle Service continuing to fetch shuffle files on a node in the event of executor failure, rather than recompute the shuffle files as happens without the Shuffle Service. Can anyone confirm this? (I have a SO question <https://stackoverflow.com/questions/67466878/can-spark-with-external-shuffle-service-use-saved-shuffle-files-in-the-event-of>open on the same if you would rather answer directly there). Kind regards, Chris
Re: Spark not releasing shuffle files in time (with very large heap)
You can also look at the shuffle file cleanup tricks we do inside of the ALS algorithm in Spark. On Fri, Feb 23, 2018 at 6:20 PM, vijay.bvpwrote: > have you looked at > http://apache-spark-user-list.1001560.n3.nabble.com/Limit- > Spark-Shuffle-Disk-Usage-td23279.html > > and the post mentioned there > https://forums.databricks.com/questions/277/how-do-i-avoid- > the-no-space-left-on-device-error.html > > also try compressing the output > https://spark.apache.org/docs/latest/configuration.html# > compression-and-serialization > spark.shuffle.compress > > thanks > Vijay > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Twitter: https://twitter.com/holdenkarau
Re: Spark not releasing shuffle files in time (with very large heap)
have you looked at http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-td23279.html and the post mentioned there https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html also try compressing the output https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization spark.shuffle.compress thanks Vijay -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark not releasing shuffle files in time (with very large heap)
Got it. I understood issue in different way. On Thu, Feb 22, 2018 at 9:19 PM Keith Chapman <keithgchap...@gmail.com> wrote: > My issue is that there is not enough pressure on GC, hence GC is not > kicking in fast enough to delete the shuffle files of previous iterations. > > Regards, > Keith. > > http://keith-chapman.com > > On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com> > wrote: > >> It would be very difficult to tell without knowing what is your >> application code doing, what kind of transformation/actions performing. >> From my previous experience tuning application code which avoids >> unnecessary objects reduce pressure on GC. >> >> >> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I'm benchmarking a spark application by running it for multiple >>> iterations, its a benchmark thats heavy on shuffle and I run it on a local >>> machine with a very large hear (~200GB). The system has a SSD. When running >>> for 3 to 4 iterations I get into a situation that I run out of disk space >>> on the /tmp directory. On further investigation I was able to figure out >>> that the reason for this is that the shuffle files are still around, >>> because I have a very large hear GC has not happen and hence the shuffle >>> files are not deleted. I was able to confirm this by lowering the heap size >>> and I see GC kicking in more often and the size of /tmp stays under >>> control. Is there any way I could configure spark to handle this issue? >>> >>> One option that I have is to have GC run more often by >>> setting spark.cleaner.periodicGC.interval to a much lower value. Is there a >>> cleaner solution? >>> >>> Regards, >>> Keith. >>> >>> http://keith-chapman.com >>> >> >> >
Re: Spark not releasing shuffle files in time (with very large heap)
My issue is that there is not enough pressure on GC, hence GC is not kicking in fast enough to delete the shuffle files of previous iterations. Regards, Keith. http://keith-chapman.com On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com> wrote: > It would be very difficult to tell without knowing what is your > application code doing, what kind of transformation/actions performing. > From my previous experience tuning application code which avoids > unnecessary objects reduce pressure on GC. > > > On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com> > wrote: > >> Hi, >> >> I'm benchmarking a spark application by running it for multiple >> iterations, its a benchmark thats heavy on shuffle and I run it on a local >> machine with a very large hear (~200GB). The system has a SSD. When running >> for 3 to 4 iterations I get into a situation that I run out of disk space >> on the /tmp directory. On further investigation I was able to figure out >> that the reason for this is that the shuffle files are still around, >> because I have a very large hear GC has not happen and hence the shuffle >> files are not deleted. I was able to confirm this by lowering the heap size >> and I see GC kicking in more often and the size of /tmp stays under >> control. Is there any way I could configure spark to handle this issue? >> >> One option that I have is to have GC run more often by >> setting spark.cleaner.periodicGC.interval to a much lower value. Is >> there a cleaner solution? >> >> Regards, >> Keith. >> >> http://keith-chapman.com >> > >
Re: Spark not releasing shuffle files in time (with very large heap)
It would be very difficult to tell without knowing what is your application code doing, what kind of transformation/actions performing. From my previous experience tuning application code which avoids unnecessary objects reduce pressure on GC. On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com> wrote: > Hi, > > I'm benchmarking a spark application by running it for multiple > iterations, its a benchmark thats heavy on shuffle and I run it on a local > machine with a very large hear (~200GB). The system has a SSD. When running > for 3 to 4 iterations I get into a situation that I run out of disk space > on the /tmp directory. On further investigation I was able to figure out > that the reason for this is that the shuffle files are still around, > because I have a very large hear GC has not happen and hence the shuffle > files are not deleted. I was able to confirm this by lowering the heap size > and I see GC kicking in more often and the size of /tmp stays under > control. Is there any way I could configure spark to handle this issue? > > One option that I have is to have GC run more often by > setting spark.cleaner.periodicGC.interval to a much lower value. Is there > a cleaner solution? > > Regards, > Keith. > > http://keith-chapman.com >
Spark not releasing shuffle files in time (with very large heap)
Hi, I'm benchmarking a spark application by running it for multiple iterations, its a benchmark thats heavy on shuffle and I run it on a local machine with a very large hear (~200GB). The system has a SSD. When running for 3 to 4 iterations I get into a situation that I run out of disk space on the /tmp directory. On further investigation I was able to figure out that the reason for this is that the shuffle files are still around, because I have a very large hear GC has not happen and hence the shuffle files are not deleted. I was able to confirm this by lowering the heap size and I see GC kicking in more often and the size of /tmp stays under control. Is there any way I could configure spark to handle this issue? One option that I have is to have GC run more often by setting spark.cleaner.periodicGC.interval to a much lower value. Is there a cleaner solution? Regards, Keith. http://keith-chapman.com
Re: Spark shuffle files
When the RDD using them goes out of scope. On Mon, Mar 27, 2017 at 3:13 PM, Ashwin Sai Shankar <ashan...@netflix.com> wrote: > Thanks Mark! follow up question, do you know when shuffle files are > usually un-referenced? > > On Mon, Mar 27, 2017 at 2:35 PM, Mark Hamstra <m...@clearstorydata.com> > wrote: > >> Shuffle files are cleaned when they are no longer referenced. See >> https://github.com/apache/spark/blob/master/core/src/mai >> n/scala/org/apache/spark/ContextCleaner.scala >> >> On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar < >> ashan...@netflix.com.invalid> wrote: >> >>> Hi! >>> >>> In spark on yarn, when are shuffle files on local disk removed? (Is it >>> when the app completes or >>> once all the shuffle files are fetched or end of the stage?) >>> >>> Thanks, >>> Ashwin >>> >> >> >
Re: Spark shuffle files
Thanks Mark! follow up question, do you know when shuffle files are usually un-referenced? On Mon, Mar 27, 2017 at 2:35 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > Shuffle files are cleaned when they are no longer referenced. See > https://github.com/apache/spark/blob/master/core/src/ > main/scala/org/apache/spark/ContextCleaner.scala > > On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar < > ashan...@netflix.com.invalid> wrote: > >> Hi! >> >> In spark on yarn, when are shuffle files on local disk removed? (Is it >> when the app completes or >> once all the shuffle files are fetched or end of the stage?) >> >> Thanks, >> Ashwin >> > >
Re: Spark shuffle files
Shuffle files are cleaned when they are no longer referenced. See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ContextCleaner.scala On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar < ashan...@netflix.com.invalid> wrote: > Hi! > > In spark on yarn, when are shuffle files on local disk removed? (Is it > when the app completes or > once all the shuffle files are fetched or end of the stage?) > > Thanks, > Ashwin >
Spark shuffle files
Hi! In spark on yarn, when are shuffle files on local disk removed? (Is it when the app completes or once all the shuffle files are fetched or end of the stage?) Thanks, Ashwin
Fwd: Connection failure followed by bad shuffle files during shuffle
Hi, I'm running into consistent failures during a shuffle read while trying to do a group-by followed by a count aggregation (using the DataFrame API on Spark 1.5.2). The shuffle read (in stage 1) fails with org.apache.spark.shuffle.FetchFailedException: Failed to send RPC 7719188499899260109 to host_a/ip_a:35946: java.nio.channels.ClosedChannelException at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) Looking into executor logs shows first shows ERROR TransportChannelHandler: Connection to host_b/ip_b:38804 has been quiet for 12 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. on the node that threw the FetchFailedException (host_a) and ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=207789700738, chunkIndex=894}, buffer=FileSegmentManagedBuffer{file=/local_disk/spark-ed6667d4-445b-4d65-bfda-e4540b7215aa/executor-d03e5e7e-57d4-40e2-9021-c20d0b84bf75/blockmgr-05d5f2b6-142e-415c-a08b-58d16a10b8bf/27/shuffle_1_13732_0.data, offset=18960736, length=19477}} to /ip_a:32991; closing connection on the node referenced in the exception (host_b). The error in the host_b logs occurred a few seconds after the error in the host_a logs. I noticed there was a lot of spilling going on during the shuffle read, so I attempted to work around this problem by increasing the number of shuffle partitions (to decrease spilling) as well as increasing spark.network.timeout. Neither of these got rid of these connection failures. This causes some of stage 0 to recompute (which runs successfully). Stage 1 retry 1 then always fails with java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) Changing the spark.io.compression.codec to lz4 changes this error to java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:153) which leads me to believe that the timeout during the shuffle read failure leaves invalid files on disk. Notably, these failures do not occur when I run on smaller subsets of data. The failure is occurring while attempting to group ~100 billion rows into 20 billion groups (with key size of 24 bytes and count as the only aggregation) on a 16 node cluster. I've replicated this failure on 2 completely separate clusters (both running with standalone cluster manager). Does anyone have suggestions about how I could make this crash go away or how I could try to make a smaller failing test case so the bug can be more easily investigated? Best, Eric Martin
RE: FileNotFoundException in appcache shuffle files
I have similar issues... Exception only with very large data. And I tried to double the memory or partition as suggested by some google search, but in vain.. any idea? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FileNotFoundException in appcache shuffle files
Usually there is another error or log message before FileNotFoundException. Try to check your logs for something like that. 2015-12-10 10:47 GMT+01:00 kendal <ken...@163.com>: > I have similar issues... Exception only with very large data. > And I tried to double the memory or partition as suggested by some google > search, but in vain.. > any idea? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Checkpointing not removing shuffle files from local disk
Hi all, We are running a class with Pyspark notebook for data analysis. Some of the books are fairly long and have a lot of operations. Through the course of the notebook, the shuffle storage expands considerably and often exceeds quota (e.g. 1.5GB input expands to 24GB in shuffle files). Closing and reopening the notebook doesn't clean out the shuffle directory. FWIW, the shuffle memory really explodes when we use ALS. There is a ticket to make sure this is well documented, but there are also suggestions that the problem should have gone away with Spark 1.0: https://issues.apache.org/jira/browse/SPARK-5836 Yours, Ewan On Tue, 2015-09-29 at 01:18 -0700, ramibatal wrote: > Hi all, > > I am applying MLlib LDA for topic modelling. I am setting up the the > lda > parameter as follow: > > lda.setOptimizer(optimizer) > .setK(params.k) > .setMaxIterations(params.maxIterations) > .setDocConcentration(params.docConcentration) > .setTopicConcentration(params.topicConcentration) > .setCheckpointInterval(params.checkpointInterval) > if (params.checkpointDir.nonEmpty) { > sc.setCheckpointDir(params.checkpointDir.get) > } > > > I am running the LDA algorithm on my local MacOS machine, on a corpus > of > 800,000 english text documents (total size 9GB), and my machine has 8 > cores > with 16GB or RAM and 500GB or hard disk. > > Here are my Spark configurations: > > val conf = new > SparkConf().setMaster("local[6]").setAppName("LDAExample") > val sc = new SparkContext(conf) > > > When calling the LDA with a large number of iteration (100) (i.e. by > calling > val ldaModel = lda.run(corpus)), the algorithm start to create > shuffle files > on my disk at at point that it fills it up till there is space left. > > I am using spark-submit to run my program as follow: > > spark-submit --driver-memory 14G --class > com.heystaks.spark.ml.topicmodelling.LDAExample > ./target/scala-2.10/lda-assembly-1.0.jar path/to/copurs/file --k 100 > --maxIterations 100 --checkpointDir /Users/ramialbatal/checkpoints > --checkpointInterval 1 > > > Where 'K' is the number of topics to extract, when the number of > iterations > and topics are small everything is fine, but when there is large > iteration > number like 100, no matter what is the value of --checkpointInterval > the > phenomenon is the same: disk will fill up after about 25 iteration. > > Everything seems to run correctly and the checkpoints files are > created on > my disk but the shuffle files are not removed at all. > > I am using Spark and MLlib 1.5.0, and my machine is Mac Yosemite > 10.10.5. > > Any help is highly appreciated. Thanks > > > > -- > View this message in context: http://apache-spark-user-list.1001560.n > 3.nabble.com/Checkpointing-not-removing-shuffle-files-from-local- > disk-tp24857.html > Sent from the Apache Spark User List mailing list archive at > Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle files lifecycle
Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files lifecycle
Ah, for #3, maybe this is what *rdd.checkpoint *does! https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD Thomas On Mon, Jun 29, 2015 at 7:12 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files lifecycle
Thanks Silvio. On Mon, Jun 29, 2015 at 7:41 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Regarding 1 and 2, yes shuffle output is stored on the worker local disks and will be reused across jobs as long as they’re available. You can identify when they’re used by seeing skipped stages in the job UI. They are periodically cleaned up based on available space of the configured spark.local.dirs paths. From: Thomas Gerber Date: Monday, June 29, 2015 at 10:12 PM To: user Subject: Shuffle files lifecycle Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files lifecycle
Regarding 1 and 2, yes shuffle output is stored on the worker local disks and will be reused across jobs as long as they’re available. You can identify when they’re used by seeing skipped stages in the job UI. They are periodically cleaned up based on available space of the configured spark.local.dirs paths. From: Thomas Gerber Date: Monday, June 29, 2015 at 10:12 PM To: user Subject: Shuffle files lifecycle Hello, It is my understanding that shuffle are written on disk and that they act as checkpoints. I wonder if this is true only within a job, or across jobs. Please note that I use the words job and stage carefully here. 1. can a shuffle created during JobN be used to skip many stages from JobN+1? Or is the lifecycle of the shuffle files bound to the job that created them? 2. when are shuffle files actually deleted? Is it TTL based or is it cleaned when the job is over? 3. we have a very long batch application, and as it goes on, the number of total tasks for each job gets larger and larger. It is not really a problem, because most of those tasks will be skipped since we cache RDDs. We noticed however that there is a delay in the actual start of a job of 1 min for every 2M tasks in your job. Are there suggested workarounds to avoid that delay? Maybe saving the RDD and re-loading it? Thanks Thomas
Re: Shuffle files not cleaned up (Spark 1.2.1)
Hi TD, That little experiment helped a bit. This time we did not see any exceptions for about 16 hours but eventually it did throw the same exceptions as before. The cleaning of the shuffle files also stopped much before these exceptions happened - about 7-1/2 hours after startup. I am not quite sure how to proceed from here. Any suggestions on how to avoid these errors? Thanks NB On Fri, Apr 24, 2015 at 12:57 AM, N B nb.nos...@gmail.com wrote: Hi TD, That may very well have been the case. There may be some delay on our output side. I have made a change just for testing that sends the output nowhere. I will see if that helps get rid of these errors. Then we can try to find out how we can optimize so that we do not lag. Questions: How can we ever be sure that a lag even if temporary never occur in the future? Also, should Spark not clean up any temp files that it still knows it might need in the (near?) future. Thanks Nikunj On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das t...@databricks.com wrote: What was the state of your streaming application? Was it falling behind with a large increasing scheduling delay? TD On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote: Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com wrote: Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote: We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK
Re: Shuffle files not cleaned up (Spark 1.2.1)
Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com wrote: Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote: We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Shuffle files not cleaned up (Spark 1.2.1)
What was the state of your streaming application? Was it falling behind with a large increasing scheduling delay? TD On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote: Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com wrote: Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote: We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Shuffle files not cleaned up (Spark 1.2.1)
We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Shuffle files not cleaned up (Spark 1.2.1)
Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Shuffle files not cleaned up (Spark 1.2.1)
Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Missing shuffle files
Just wanted to point out- raising the memory-head (as I saw in the logs) was the fix for this issue and I have not seen dying executors since this calue was increased On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg arp...@spotify.com wrote: If you thinking of the yarn memory overhead, then yes, I have increased that as well. However, I'm glad to say that my job finished successfully finally. Besides the timeout and memory settings, performing repartitioning (with shuffling) at the right time seems to be the key to make this large job succeed. With all the transformations in the job, the partition distribution was becoming increasingly skewed. Not easy to figure out when and to what number of partitions to set, and takes forever to tweak these settings since it's works perfectly for small datasets and you'll have to experiment with large time-consuming jobs. Imagine if there was an automatic partition reconfiguration function that automagically did that... On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote: I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote: I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just
Re: Missing shuffle files
If you thinking of the yarn memory overhead, then yes, I have increased that as well. However, I'm glad to say that my job finished successfully finally. Besides the timeout and memory settings, performing repartitioning (with shuffling) at the right time seems to be the key to make this large job succeed. With all the transformations in the job, the partition distribution was becoming increasingly skewed. Not easy to figure out when and to what number of partitions to set, and takes forever to tweak these settings since it's works perfectly for small datasets and you'll have to experiment with large time-consuming jobs. Imagine if there was an automatic partition reconfiguration function that automagically did that... On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote: I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote: I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's
Re: Missing shuffle files
No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: Missing shuffle files
I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method
Re: Missing shuffle files
Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason
Re: Missing shuffle files
I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching
Re: Missing shuffle files
I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote: I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines
Re: Missing shuffle files
Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Missing shuffle files
For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: Missing shuffle files
I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: Missing shuffle files
Could you try to turn on the external shuffle service? spark.shuffle.service.enable= true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com mailto:arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net http://lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: FileNotFoundException in appcache shuffle files
Hey, I am having a similar issue, did you manage to find a solution yet? Please check my post below for reference: http://apache-spark-user-list.1001560.n3.nabble.com/IOError-Errno-2-No-such-file-or-directory-tmp-spark-9e23f17e-2e23-4c26-9621-3cb4d8b832da-tmp3i3xno-td21076.html Thank you, Lucio -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p21077.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FileNotFoundException in appcache shuffle files
As Jerry said, this is not related to shuffle file consolidation. The unique thing about this problem is that it's failing to find a file while trying to _write_ to it, in append mode. The simplest explanation for this would be that the file is deleted in between some check for existence and opening the file for append. The deletion of such files as a race condition with writing them (on the map side) would be most easily explained by a JVM shutdown event, for instance caused by a fatal error such as OutOfMemoryError. So, as Ilya said, please look for another exception possibly preceding this one. On Sat, Jan 10, 2015 at 12:16 PM, lucio raimondo luxmea...@hotmail.com wrote: Hey, I am having a similar issue, did you manage to find a solution yet? Please check my post below for reference: http://apache-spark-user-list.1001560.n3.nabble.com/IOError-Errno-2-No-such-file-or-directory-tmp-spark-9e23f17e-2e23-4c26-9621-3cb4d8b832da-tmp3i3xno-td21076.html Thank you, Lucio -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p21077.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FileNotFoundException in appcache shuffle files
- hopefully fixed in 1.1 with this patch: https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd - 78f2af5 https://github.com/apache/spark/commit/78f2af5[3] implements pieces of #1609 https://github.com/apache/spark/pull/1609[4], on which mridulm has a comment https://github.com/apache/spark/pull/1609#issuecomment-54393908[5] saying: it got split into four issues, two of which got committed, not sure of the other other two And the first one was regressed upon in 1.1.already. - Until 1.0.3 or 1.1 are released, the simplest solution is to disable spark.shuffle.consolidateFiles. - I've not tried this yet as I'm waiting on a re-run with some other parameters tweaked first. - Also, I can't tell if it's expected that this was fixed, known that it subsequently regressed, etc., so hoping for some guidance there. So! Anyone else seen this? Is this related to the bug in shuffle file consolidation? Was it fixed? Did it regress? Are my confs or other steps unreasonable in some way? Any assistance would be appreciated, thanks. -Ryan [1] https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0 [2] http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CCANGvG8qtK57frWS+kaqTiUZ9jSLs5qJKXXjXTTQ9eh2-GsrmpA@...%3E http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E [3] https://github.com/apache/spark/commit/78f2af5 [4] https://github.com/apache/spark/pull/1609 [5] https://github.com/apache/spark/pull/1609#issuecomment-54393908 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=dGlhbnNoYW9jdW5AZ21haWwuY29tfDF8NjkzNjc2OTQ4 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p17610.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: FileNotFoundException in appcache shuffle files
Hi Ryan - I've been fighting the exact same issue for well over a month now. I initially saw the issue in 1.02 but it persists in 1.1. Jerry - I believe you are correct that this happens during a pause on long-running jobs on a large data set. Are there any parameters that you suggest tuning to mitigate these situations? Also, you ask if there are any other exceptions - for me this error has tended to follow an earlier exception, which supports the theory that it is a symptom of an earlier problem. My understanding is as follows - during a shuffle step an executor fails and doesn't report its output - next, during the reduce step, that output can't be found where expected and rather than rerunning the failed execution, Spark goes down. We can add my email thread to your reference list : https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201410.mbox/CAM-S9zS-+-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd=h...@mail.gmail.com -Original Message- From: Shao, Saisai [saisai.s...@intel.commailto:saisai.s...@intel.com] Sent: Wednesday, October 29, 2014 01:46 AM Eastern Standard Time To: Ryan Williams Cc: user Subject: RE: FileNotFoundException in appcache shuffle files Hi Ryan, This is an issue from sort-based shuffle, not consolidated hash-based shuffle. I guess mostly this issue occurs when Spark cluster is in abnormal situation, maybe long time of GC pause or some others, you can check the system status or if there’s any other exceptions beside this one. Thanks Jerry From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of Ryan Williams Sent: Wednesday, October 29, 2014 1:31 PM To: user Subject: FileNotFoundException in appcache shuffle files My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: * val A = [read a file in 1419 shards] * the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). * some relevant Spark options: * spark.default.parallelism=2000 * --master yarn-client * --executor-memory 50g * --driver-memory 10g * --num-executors 100 * --executor-cores 4 * A.repartition(3000) * 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired
RE: FileNotFoundException in appcache shuffle files
Hi Ryan, This is an issue from sort-based shuffle, not consolidated hash-based shuffle. I guess mostly this issue occurs when Spark cluster is in abnormal situation, maybe long time of GC pause or some others, you can check the system status or if there’s any other exceptions beside this one. Thanks Jerry From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of Ryan Williams Sent: Wednesday, October 29, 2014 1:31 PM To: user Subject: FileNotFoundException in appcache shuffle files My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: * val A = [read a file in 1419 shards] * the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). * some relevant Spark options: * spark.default.parallelism=2000 * --master yarn-client * --executor-memory 50g * --driver-memory 10g * --num-executors 100 * --executor-cores 4 * A.repartition(3000) * 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired in * A.persist() * A.count() // succeeds * screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v * I don't know why each task reports 8 TB of Input; that metric seems like it is always ludicrously high and I don't pay attention to it typically. * Each task shuffle-writes 3.5GB, for a total of 4.9TB * Does that mean that 4.9TB is the uncompressed size of the file that A was read from? * 4.9TB is pretty close to the total amount of memory I've configured the job to use: (50GB/executor) * (100 executors) ~= 5TB. * Is that a coincidence, or are my executors shuffle-writing an amount equal to all of their memory for some reason? * val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x = x).persist() * my expectation is that ~all elements pass the filter step, so B should ~equal to A, just to give a sense of the expected memory blowup. * B.count() * this fails while executing .groupBy(...) above I've found a few discussions of issues whose manifestations look *like* this, but nothing that is obviously the same issue
Re: Shuffle files
My observation is opposite. When my job runs under default spark.shuffle.manager, I don't see this exception. However, when it runs with SORT based, I start seeing this error? How would that be possible? I am running my job in YARN, and I noticed that the YARN process limits (cat /proc/$PID/limits) are not consistent with system wide limits (shown by limit -a), I don't know how that happened. Is there a way to let Spark driver to propagate this setting (limit -n number) to spark executors before startup? On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash and...@andrewash.com wrote: You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.com wrote: @SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Chen Song
RE: Shuffle files
Hi Song, For what I know in sort-based shuffle. Normally parallel opened file numbers for sort-based shuffle is much smaller than hash-based shuffle. In hash based shuffle, parallel opened file numbers is C * R (where C is core number used and R is the reducer number), as you can see the file numbers are related to reducer number, no matter how large the shuffle size is. While in sort-based shuffle, final map output file is only 1, to achieve this we need to do by-partition sorting, this will generate some intermediate spilling files, but spilled file numbers are related to shuffle size and memory size for shuffle, no relation to reducer number. So If you met “too many open files” in sort-based shuffle, I guess that you have so many spilled files while doing shuffle write, one possible way to alleviate this is to increase the shuffle memory usage, also change the ulimit is a possible way. I guess in Yarn you have to do system configuration manually, Spark cannot set ulimit automatically for you, I don’t think it’s an issue Spark should take care. Thanks Jerry From: Chen Song [mailto:chen.song...@gmail.com] Sent: Tuesday, October 21, 2014 9:10 AM To: Andrew Ash Cc: Sunny Khatri; Lisonbee, Todd; u...@spark.incubator.apache.org Subject: Re: Shuffle files My observation is opposite. When my job runs under default spark.shuffle.manager, I don't see this exception. However, when it runs with SORT based, I start seeing this error? How would that be possible? I am running my job in YARN, and I noticed that the YARN process limits (cat /proc/$PID/limits) are not consistent with system wide limits (shown by limit -a), I don't know how that happened. Is there a way to let Spark driver to propagate this setting (limit -n number) to spark executors before startup? On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash and...@andrewash.commailto:and...@andrewash.com wrote: You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.commailto:sunny.k...@gmail.com wrote: @SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.commailto:todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.commailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org -- Chen Song
Re: Shuffle files
- We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Storing shuffle files on a Tachyon
Is it possible to store spark shuffle files on Tachyon ?
RE: Shuffle files
Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
@SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.com wrote: @SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle files
Hi, I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a directory (I am using sc.textfile(dir/*) ) to read in the files. I am getting the following warning: WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99, mesos12-dev.sccps.net): java.io.FileNotFoundException: /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open files) basically I think a lot of shuffle files are being created. 1) The tasks eventually fail and the job just hangs (after taking very long, more than an hour). If I read these 30 files in a for loop, the same job completes in a few minutes. However, I need to specify the files names, which is not convenient. I am assuming that sc.textfile(dir/*) creates a large RDD for all the 30 files. Is there a way to make the operation on this large RDD efficient so as to avoid creating too many shuffle files? 2) Also, I am finding that all the shuffle files for my other completed jobs are not being automatically deleted even after days. I thought that sc.stop() clears the intermediate files. Is there some way to programmatically delete these temp shuffle files upon job completion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
Hi SK, For the problem with lots of shuffle files and the too many open files exception there are a couple options: 1. The linux kernel has a limit on the number of open files at once. This is set with ulimit -n, and can be set permanently in /etc/sysctl.conf or /etc/sysctl.d/. Try increasing this to a large value, at the bare minimum the square of your partition count. 2. Try using shuffle consolidation -- spark.shuffle.consolidateFiles=true This option writes fewer files to disk so shouldn't hit limits nearly as much 3. Try using the sort-based shuffle by setting spark.shuffle.manager=SORT. You should likely hold off on this until https://issues.apache.org/jira/browse/SPARK-3032 is fixed, hopefully in 1.1.1 Hope that helps! Andrew On Thu, Sep 25, 2014 at 4:20 PM, SK skrishna...@gmail.com wrote: Hi, I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a directory (I am using sc.textfile(dir/*) ) to read in the files. I am getting the following warning: WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99, mesos12-dev.sccps.net): java.io.FileNotFoundException: /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open files) basically I think a lot of shuffle files are being created. 1) The tasks eventually fail and the job just hangs (after taking very long, more than an hour). If I read these 30 files in a for loop, the same job completes in a few minutes. However, I need to specify the files names, which is not convenient. I am assuming that sc.textfile(dir/*) creates a large RDD for all the 30 files. Is there a way to make the operation on this large RDD efficient so as to avoid creating too many shuffle files? 2) Also, I am finding that all the shuffle files for my other completed jobs are not being automatically deleted even after days. I thought that sc.stop() clears the intermediate files. Is there some way to programmatically delete these temp shuffle files upon job completion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spilled shuffle files not being cleared
Thanks Saisai, I think I will just try lowering my spark.cleaner.ttl value - I've set it to an hour. On Thu, Jun 12, 2014 at 7:32 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Michael, I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata cleaner, which will clean old un-used shuffle data when it is timeout. For Spark 1.0 another way is to clean shuffle data using weak reference (reference tracking based, configuration is spark.cleaner.referenceTracking), and it is enabled by default. Thanks Saisai *From:* Michael Chang [mailto:m...@tellapart.com] *Sent:* Friday, June 13, 2014 10:15 AM *To:* user@spark.apache.org *Subject:* Re: Spilled shuffle files not being cleared Bump On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote: Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
Re: Spilled shuffle files not being cleared
Bump On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote: Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
RE: Spilled shuffle files not being cleared
Hi Michael, I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata cleaner, which will clean old un-used shuffle data when it is timeout. For Spark 1.0 another way is to clean shuffle data using weak reference (reference tracking based, configuration is spark.cleaner.referenceTracking), and it is enabled by default. Thanks Saisai From: Michael Chang [mailto:m...@tellapart.com] Sent: Friday, June 13, 2014 10:15 AM To: user@spark.apache.org Subject: Re: Spilled shuffle files not being cleared Bump On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.commailto:m...@tellapart.com wrote: Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
Spilled shuffle files not being cleared
Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
Re: Shuffle Files
From BlockManager code + ShuffleMapTask code, it writes under spark.local.dir or java.io.tmpdir. val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get(spark.local.dir, System.getProperty(java.io.tmpdir))) On Mon, Mar 3, 2014 at 10:45 PM, Usman Ghani us...@platfora.com wrote: Where on the filesystem does spark write the shuffle files? -- ...:::Aniket:::... Quetzalco@tl
Shuffle Files
Where on the filesystem does spark write the shuffle files?