Re: org.apache.spark.shuffle.FetchFailedException in dataproc
Hi Mich, The y-axis is the number of executors. The code ran on dataproc serverless spark on 3.3.2. I tried closing autoscaling by setting the following: spark.dynamicAllocation.enabled=false spark.executor.instances=60 And still got the FetchFailedException error. I Wonder why it can run without problem in a vertex notebook with local mode, which has less resources. Of course it ran much longer time (8 hours local mode vs. 30 min in serverless) Will try to break the jobs into smaller parts, and see which step exactly caused the problem. Thanks! On Mon, Mar 13, 2023 at 11:26 AM Mich Talebzadeh wrote: > Hi Gary > > Thanks for the update. So this serverless dataproc. on 3.3.1. Maybe an > autoscaling policy could be an option. What is y-axis? Is that the capacity? > > Can you break down the join into multiple parts and save the intermediate > result set? > > > HTH > > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 13 Mar 2023 at 14:56, Gary Liu wrote: > >> Hi Mich, >> I used the serverless spark session, not the local mode in the notebook. >> So machine type does not matter in this case. Below is the chart for >> serverless spark session execution. I also tried to increase executor >> memory and core, but the issue did got get resolved. I will try shutting >> down autoscaling, and see what will happen. >> [image: Serverless Session Executors-4core.png] >> >> >> On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> for your dataproc what type of machines are you using for example >>> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if >>> autoscaling turned on. >>> >>> most likely executor memory limit? >>> >>> >>> HTH >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 10 Mar 2023 at 15:35, Gary Liu wrote: >>> >>>> Hi , >>>> >>>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is >>>> a job involving multiple joinings, as well as a complex UDF. I always got >>>> the below FetchFailedException, but the job can be done and the results >>>> look right. Neither of 2 input data is very big (one is 6.5M rows*11 >>>> columns, ~150M in orc format and 17.7M rows*11 columns, ~400M in orc >>>> format). It ran very smoothly on and on-premise spark environment though. >>>> >>>> According to Google's document ( >>>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures), >>>> it has 3 solutions: >>>> 1. Using EFM mode >>>> 2. Increase executor memory >>>> 3, decrease the number of job partitions. >>>> >>>> 1. I started the session from a vertex notebook, so I don't know how to >>>> use EFM mode. >>>> 2. I increased executor memory from the default 12GB to 25GB, and the >>>> number of cores from 4 to 8, but it did not solve the problem. >>>> 3. Wonder how to do this? repartition the input dataset to have less >>>> partitions? I used df.rdd.getNumPartitions() to check the input data >>>> partitions, they have 9 and 17 partitions respectively, should I decrease >>>> them further? I also read a post on StackOverflow ( >>>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se), >>>> saying increasing partitions
Re: org.apache.spark.shuffle.FetchFailedException in dataproc
Hi Gary Thanks for the update. So this serverless dataproc. on 3.3.1. Maybe an autoscaling policy could be an option. What is y-axis? Is that the capacity? Can you break down the join into multiple parts and save the intermediate result set? HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 13 Mar 2023 at 14:56, Gary Liu wrote: > Hi Mich, > I used the serverless spark session, not the local mode in the notebook. > So machine type does not matter in this case. Below is the chart for > serverless spark session execution. I also tried to increase executor > memory and core, but the issue did got get resolved. I will try shutting > down autoscaling, and see what will happen. > [image: Serverless Session Executors-4core.png] > > > On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> for your dataproc what type of machines are you using for example >> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if >> autoscaling turned on. >> >> most likely executor memory limit? >> >> >> HTH >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Fri, 10 Mar 2023 at 15:35, Gary Liu wrote: >> >>> Hi , >>> >>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a >>> job involving multiple joinings, as well as a complex UDF. I always got the >>> below FetchFailedException, but the job can be done and the results look >>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns, >>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran >>> very smoothly on and on-premise spark environment though. >>> >>> According to Google's document ( >>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures), >>> it has 3 solutions: >>> 1. Using EFM mode >>> 2. Increase executor memory >>> 3, decrease the number of job partitions. >>> >>> 1. I started the session from a vertex notebook, so I don't know how to >>> use EFM mode. >>> 2. I increased executor memory from the default 12GB to 25GB, and the >>> number of cores from 4 to 8, but it did not solve the problem. >>> 3. Wonder how to do this? repartition the input dataset to have less >>> partitions? I used df.rdd.getNumPartitions() to check the input data >>> partitions, they have 9 and 17 partitions respectively, should I decrease >>> them further? I also read a post on StackOverflow ( >>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se), >>> saying increasing partitions may help.Which one makes more sense? I >>> repartitioned the input data to 20 and 30 partitions, but still no luck. >>> >>> Any suggestions? >>> >>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID >>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, >>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message= >>> org.apache.spark.shuffle.FetchFailedException >>> at >>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(Shuf
Re: org.apache.spark.shuffle.FetchFailedException in dataproc
Hi Mich, I used the serverless spark session, not the local mode in the notebook. So machine type does not matter in this case. Below is the chart for serverless spark session execution. I also tried to increase executor memory and core, but the issue did got get resolved. I will try shutting down autoscaling, and see what will happen. [image: Serverless Session Executors-4core.png] On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh wrote: > for your dataproc what type of machines are you using for example > n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if > autoscaling turned on. > > most likely executor memory limit? > > > HTH > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 10 Mar 2023 at 15:35, Gary Liu wrote: > >> Hi , >> >> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a >> job involving multiple joinings, as well as a complex UDF. I always got the >> below FetchFailedException, but the job can be done and the results look >> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns, >> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran >> very smoothly on and on-premise spark environment though. >> >> According to Google's document ( >> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures), >> it has 3 solutions: >> 1. Using EFM mode >> 2. Increase executor memory >> 3, decrease the number of job partitions. >> >> 1. I started the session from a vertex notebook, so I don't know how to >> use EFM mode. >> 2. I increased executor memory from the default 12GB to 25GB, and the >> number of cores from 4 to 8, but it did not solve the problem. >> 3. Wonder how to do this? repartition the input dataset to have less >> partitions? I used df.rdd.getNumPartitions() to check the input data >> partitions, they have 9 and 17 partitions respectively, should I decrease >> them further? I also read a post on StackOverflow ( >> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se), >> saying increasing partitions may help.Which one makes more sense? I >> repartitioned the input data to 20 and 30 partitions, but still no luck. >> >> Any suggestions? >> >> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID >> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, >> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message= >> org.apache.spark.shuffle.FetchFailedException >> at >> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85) >> at >> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) >> at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587) >> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601) >> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) >> at >> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) >> at >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) >> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) >> at >> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) >> at >> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) >> at >> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) >> at >> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitions
Re: org.apache.spark.shuffle.FetchFailedException in dataproc
for your dataproc what type of machines are you using for example n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if autoscaling turned on. most likely executor memory limit? HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 10 Mar 2023 at 15:35, Gary Liu wrote: > Hi , > > I have a job in GCP dataproc server spark session (spark 3.3.2), it is a > job involving multiple joinings, as well as a complex UDF. I always got the > below FetchFailedException, but the job can be done and the results look > right. Neither of 2 input data is very big (one is 6.5M rows*11 columns, > ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran > very smoothly on and on-premise spark environment though. > > According to Google's document ( > https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures), > it has 3 solutions: > 1. Using EFM mode > 2. Increase executor memory > 3, decrease the number of job partitions. > > 1. I started the session from a vertex notebook, so I don't know how to > use EFM mode. > 2. I increased executor memory from the default 12GB to 25GB, and the > number of cores from 4 to 8, but it did not solve the problem. > 3. Wonder how to do this? repartition the input dataset to have less > partitions? I used df.rdd.getNumPartitions() to check the input data > partitions, they have 9 and 17 partitions respectively, should I decrease > them further? I also read a post on StackOverflow ( > https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se), > saying increasing partitions may help.Which one makes more sense? I > repartitioned the input data to 20 and 30 partitions, but still no luck. > > Any suggestions? > > 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID > 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, > 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message= > org.apache.spark.shuffle.FetchFailedException > at > org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) > at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601) > at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) > at > org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.
org.apache.spark.shuffle.FetchFailedException in dataproc
Hi , I have a job in GCP dataproc server spark session (spark 3.3.2), it is a job involving multiple joinings, as well as a complex UDF. I always got the below FetchFailedException, but the job can be done and the results look right. Neither of 2 input data is very big (one is 6.5M rows*11 columns, ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran very smoothly on and on-premise spark environment though. According to Google's document ( https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures), it has 3 solutions: 1. Using EFM mode 2. Increase executor memory 3, decrease the number of job partitions. 1. I started the session from a vertex notebook, so I don't know how to use EFM mode. 2. I increased executor memory from the default 12GB to 25GB, and the number of cores from 4 to 8, but it did not solve the problem. 3. Wonder how to do this? repartition the input dataset to have less partitions? I used df.rdd.getNumPartitions() to check the input data partitions, they have 9 and 17 partitions respectively, should I decrease them further? I also read a post on StackOverflow ( https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se), saying increasing partitions may help.Which one makes more sense? I repartitioned the input data to 20 and 30 partitions, but still no luck. Any suggestions? 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message= org.apache.spark.shuffle.FetchFailedException at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601) at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52
Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:
Yes, you can usually use a broadcast join to avoid skew problems. On Wed, May 2, 2018 at 8:57 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote: > I am performing join operation , if I convert reduce side join to map side > (no shuffle will happen) and I assume in that case this error shouldn't > come. Let me know if this understanding is correct > > On Tue, May 1, 2018 at 9:37 PM, Ryan Blue <rb...@netflix.com> wrote: > >> This is usually caused by skew. Sometimes you can work around it by in >> creasing the number of partitions like you tried, but when that doesn’t >> work you need to change the partitioning that you’re using. >> >> If you’re aggregating, try adding an intermediate aggregation. For >> example, if your query is select sum(x), a from t group by a, then try select >> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b) >> group by a. >> >> rb >> >> >> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com> >> wrote: >> >>> Hi >>> >>> I am getting the above error in Spark SQL . I have increase (using 5000 >>> ) number of partitions but still getting the same error . >>> >>> My data most probably is skew. >>> >>> >>> >>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829 >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349) >>> >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > > -- Ryan Blue Software Engineer Netflix
Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:
I am performing join operation , if I convert reduce side join to map side (no shuffle will happen) and I assume in that case this error shouldn't come. Let me know if this understanding is correct On Tue, May 1, 2018 at 9:37 PM, Ryan Blue <rb...@netflix.com> wrote: > This is usually caused by skew. Sometimes you can work around it by in > creasing the number of partitions like you tried, but when that doesn’t > work you need to change the partitioning that you’re using. > > If you’re aggregating, try adding an intermediate aggregation. For > example, if your query is select sum(x), a from t group by a, then try select > sum(partial), a from (select sum(x) as partial, a, b from t group by a, b) > group by a. > > rb > > > On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com> > wrote: > >> Hi >> >> I am getting the above error in Spark SQL . I have increase (using 5000 ) >> number of partitions but still getting the same error . >> >> My data most probably is skew. >> >> >> >> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829 >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349) >> >> > > > -- > Ryan Blue > Software Engineer > Netflix >
Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:
This is usually caused by skew. Sometimes you can work around it by in creasing the number of partitions like you tried, but when that doesn’t work you need to change the partitioning that you’re using. If you’re aggregating, try adding an intermediate aggregation. For example, if your query is select sum(x), a from t group by a, then try select sum(partial), a from (select sum(x) as partial, a, b from t group by a, b) group by a. rb On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com> wrote: > Hi > > I am getting the above error in Spark SQL . I have increase (using 5000 ) > number of partitions but still getting the same error . > > My data most probably is skew. > > > > org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829 > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349) > > -- Ryan Blue Software Engineer Netflix
org.apache.spark.shuffle.FetchFailedException: Too large frame:
Hi I am getting the above error in Spark SQL . I have increase (using 5000 ) number of partitions but still getting the same error . My data most probably is skew. org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
I think you should check the rpc target, may be the nodemanager has memory issue like gc or other.Check it out first. And i wonder why you assign --executor-cores 8? 2017-07-29 7:40 GMT+08:00 jeff saremi <jeffsar...@hotmail.com>: > asking this on a tangent: > > Is there anyway for the shuffle data to be replicated to more than one > server? > > thanks > > -- > *From:* jeff saremi <jeffsar...@hotmail.com> > *Sent:* Friday, July 28, 2017 4:38:08 PM > *To:* Juan Rodríguez Hortalá > > *Cc:* user@spark.apache.org > *Subject:* Re: Job keeps aborting because of > org.apache.spark.shuffle.FetchFailedException: > Failed to connect to server/ip:39232 > > > Thanks Juan for taking the time > > Here's more info: > - This is running on Yarn in Master mode > > - See config params below > > - This is a corporate environment. In general nodes should not be added or > removed that often to the cluster. Even if that is the case I would expect > that to be one or 2 servers. In my case I get hundreds of these errors > before the job fails. > > --master yarn-cluster ^ > --driver-memory 96G ^ > --executor-memory 48G ^ > --num-executors 150 ^ > --executor-cores 8 ^ > --driver-cores 8 ^ > --conf spark.yarn.executor.memoryOverhead=36000 ^ > --conf spark.shuffle.service.enabled=true ^ > --conf spark.yarn.submit.waitAppCompletion=false ^ > --conf spark.yarn.submit.file.replication=64 ^ > --conf spark.yarn.maxAppAttempts=1 ^ > --conf spark.speculation=true ^ > --conf spark.speculation.quantile=0.9 ^ > --conf spark.yarn.executor.nodeLabelExpression="prod" ^ > --conf spark.yarn.am.nodeLabelExpression="prod" ^ > --conf spark.stage.maxConsecutiveAttempts=1000 ^ > --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^ > --conf spark.yarn.launchContainer.count.simultaneously=50 ^ > --conf spark.driver.maxResultSize=16G ^ > --conf spark.network.timeout=1000s ^ > > -- > *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> > *Sent:* Friday, July 28, 2017 4:20:40 PM > *To:* jeff saremi > *Cc:* user@spark.apache.org > *Subject:* Re: Job keeps aborting because of > org.apache.spark.shuffle.FetchFailedException: > Failed to connect to server/ip:39232 > > Hi Jeff, > > Can you provide more information about how are you running your job? In > particular: > - which cluster manager are you using? It is YARN, Mesos, Spark > Standalone? > - with configuration options are you using to submit the job? In > particular are you using dynamic allocation or external shuffle? You should > be able to see this in the Environment tab of the Spark UI, looking > for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled. > - in which environment are you running the jobs? Is this an on premise > cluster or some cloud provider? Are you adding or removing nodes from the > cluster during the job execution? > > FetchFailedException errors happen during execution when an executor is > not able to read the shuffle blocks for a previous stage that are served by > other executor. That might happen if the executor that has to serve the > files dies and internal shuffle is used, although there can be other > reasons like network errors. If you are using dynamic allocation then you > should also enable external shuffle service so shuffle blocks can be served > by the node manager after the executor that created the blocks is > terminated, see https://spark.apache.org/docs/latest/job-scheduling.html# > dynamic-resource-allocation for more details. > > > > On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi <jeffsar...@hotmail.com> > wrote: > >> We have a not too complex and not too large spark job that keeps dying >> with this error >> >> I have researched it and I have not seen any convincing explanation on why >> >> I am not using a shuffle service. Which server is the one that is >> refusing the connection? >> If I go to the server that is being reported in the error message, I see >> a lot of these errors towards the end: >> >> java.io.FileNotFoundException: >> D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index >> >> (may or may not be related to the problem at all) >> >> and if you examine further on this machine there are >> fetchfailedexceptions resulting from other machines and so on and so forth >> >> >> This is Spark 1.6 on Yarn-master >> >> >> Could anyone provide some insight or solution to this? >> >> thanks >> >> >> >
Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
asking this on a tangent: Is there anyway for the shuffle data to be replicated to more than one server? thanks From: jeff saremi <jeffsar...@hotmail.com> Sent: Friday, July 28, 2017 4:38:08 PM To: Juan Rodríguez Hortalá Cc: user@spark.apache.org Subject: Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232 Thanks Juan for taking the time Here's more info: - This is running on Yarn in Master mode - See config params below - This is a corporate environment. In general nodes should not be added or removed that often to the cluster. Even if that is the case I would expect that to be one or 2 servers. In my case I get hundreds of these errors before the job fails. --master yarn-cluster ^ --driver-memory 96G ^ --executor-memory 48G ^ --num-executors 150 ^ --executor-cores 8 ^ --driver-cores 8 ^ --conf spark.yarn.executor.memoryOverhead=36000 ^ --conf spark.shuffle.service.enabled=true ^ --conf spark.yarn.submit.waitAppCompletion=false ^ --conf spark.yarn.submit.file.replication=64 ^ --conf spark.yarn.maxAppAttempts=1 ^ --conf spark.speculation=true ^ --conf spark.speculation.quantile=0.9 ^ --conf spark.yarn.executor.nodeLabelExpression="prod" ^ --conf spark.yarn.am.nodeLabelExpression="prod" ^ --conf spark.stage.maxConsecutiveAttempts=1000 ^ --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^ --conf spark.yarn.launchContainer.count.simultaneously=50 ^ --conf spark.driver.maxResultSize=16G ^ --conf spark.network.timeout=1000s ^ From: Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> Sent: Friday, July 28, 2017 4:20:40 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232 Hi Jeff, Can you provide more information about how are you running your job? In particular: - which cluster manager are you using? It is YARN, Mesos, Spark Standalone? - with configuration options are you using to submit the job? In particular are you using dynamic allocation or external shuffle? You should be able to see this in the Environment tab of the Spark UI, looking for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled. - in which environment are you running the jobs? Is this an on premise cluster or some cloud provider? Are you adding or removing nodes from the cluster during the job execution? FetchFailedException errors happen during execution when an executor is not able to read the shuffle blocks for a previous stage that are served by other executor. That might happen if the executor that has to serve the files dies and internal shuffle is used, although there can be other reasons like network errors. If you are using dynamic allocation then you should also enable external shuffle service so shuffle blocks can be served by the node manager after the executor that created the blocks is terminated, see https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation for more details. On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: We have a not too complex and not too large spark job that keeps dying with this error I have researched it and I have not seen any convincing explanation on why I am not using a shuffle service. Which server is the one that is refusing the connection? If I go to the server that is being reported in the error message, I see a lot of these errors towards the end: java.io.FileNotFoundException: D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index (may or may not be related to the problem at all) and if you examine further on this machine there are fetchfailedexceptions resulting from other machines and so on and so forth This is Spark 1.6 on Yarn-master Could anyone provide some insight or solution to this? thanks
Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
Thanks Juan for taking the time Here's more info: - This is running on Yarn in Master mode - See config params below - This is a corporate environment. In general nodes should not be added or removed that often to the cluster. Even if that is the case I would expect that to be one or 2 servers. In my case I get hundreds of these errors before the job fails. --master yarn-cluster ^ --driver-memory 96G ^ --executor-memory 48G ^ --num-executors 150 ^ --executor-cores 8 ^ --driver-cores 8 ^ --conf spark.yarn.executor.memoryOverhead=36000 ^ --conf spark.shuffle.service.enabled=true ^ --conf spark.yarn.submit.waitAppCompletion=false ^ --conf spark.yarn.submit.file.replication=64 ^ --conf spark.yarn.maxAppAttempts=1 ^ --conf spark.speculation=true ^ --conf spark.speculation.quantile=0.9 ^ --conf spark.yarn.executor.nodeLabelExpression="prod" ^ --conf spark.yarn.am.nodeLabelExpression="prod" ^ --conf spark.stage.maxConsecutiveAttempts=1000 ^ --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^ --conf spark.yarn.launchContainer.count.simultaneously=50 ^ --conf spark.driver.maxResultSize=16G ^ --conf spark.network.timeout=1000s ^ From: Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> Sent: Friday, July 28, 2017 4:20:40 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232 Hi Jeff, Can you provide more information about how are you running your job? In particular: - which cluster manager are you using? It is YARN, Mesos, Spark Standalone? - with configuration options are you using to submit the job? In particular are you using dynamic allocation or external shuffle? You should be able to see this in the Environment tab of the Spark UI, looking for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled. - in which environment are you running the jobs? Is this an on premise cluster or some cloud provider? Are you adding or removing nodes from the cluster during the job execution? FetchFailedException errors happen during execution when an executor is not able to read the shuffle blocks for a previous stage that are served by other executor. That might happen if the executor that has to serve the files dies and internal shuffle is used, although there can be other reasons like network errors. If you are using dynamic allocation then you should also enable external shuffle service so shuffle blocks can be served by the node manager after the executor that created the blocks is terminated, see https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation for more details. On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: We have a not too complex and not too large spark job that keeps dying with this error I have researched it and I have not seen any convincing explanation on why I am not using a shuffle service. Which server is the one that is refusing the connection? If I go to the server that is being reported in the error message, I see a lot of these errors towards the end: java.io.FileNotFoundException: D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index (may or may not be related to the problem at all) and if you examine further on this machine there are fetchfailedexceptions resulting from other machines and so on and so forth This is Spark 1.6 on Yarn-master Could anyone provide some insight or solution to this? thanks
Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
Hi Jeff, Can you provide more information about how are you running your job? In particular: - which cluster manager are you using? It is YARN, Mesos, Spark Standalone? - with configuration options are you using to submit the job? In particular are you using dynamic allocation or external shuffle? You should be able to see this in the Environment tab of the Spark UI, looking for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled. - in which environment are you running the jobs? Is this an on premise cluster or some cloud provider? Are you adding or removing nodes from the cluster during the job execution? FetchFailedException errors happen during execution when an executor is not able to read the shuffle blocks for a previous stage that are served by other executor. That might happen if the executor that has to serve the files dies and internal shuffle is used, although there can be other reasons like network errors. If you are using dynamic allocation then you should also enable external shuffle service so shuffle blocks can be served by the node manager after the executor that created the blocks is terminated, see https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation for more details. On Fri, Jul 28, 2017 at 9:57 AM, jeff saremiwrote: > We have a not too complex and not too large spark job that keeps dying > with this error > > I have researched it and I have not seen any convincing explanation on why > > I am not using a shuffle service. Which server is the one that is refusing > the connection? > If I go to the server that is being reported in the error message, I see a > lot of these errors towards the end: > > java.io.FileNotFoundException: > D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index > > (may or may not be related to the problem at all) > > and if you examine further on this machine there are fetchfailedexceptions > resulting from other machines and so on and so forth > > > This is Spark 1.6 on Yarn-master > > > Could anyone provide some insight or solution to this? > > thanks > > >
Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
We have a not too complex and not too large spark job that keeps dying with this error I have researched it and I have not seen any convincing explanation on why I am not using a shuffle service. Which server is the one that is refusing the connection? If I go to the server that is being reported in the error message, I see a lot of these errors towards the end: java.io.FileNotFoundException: D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index (may or may not be related to the problem at all) and if you examine further on this machine there are fetchfailedexceptions resulting from other machines and so on and so forth This is Spark 1.6 on Yarn-master Could anyone provide some insight or solution to this? thanks
Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."
I had the same problem. One forum post elsewhere suggested that too much network communication might be using up available ports. I reduced the partition size via repartition(int) and it solved the problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."
Also, this is the command I use to submit the Spark application: ** where *recommendation_engine-0.1-py2.7.egg* is a Python egg of my own library I've written for this application, and *'file'* and *'/home/spark/enigma_analytics/tests/msg-epims0730_small.json'* are input arguments for the application. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26532.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."
Slight update I suppose? For some reason, sometimes it will connect and continue and the job will be completed. But most of the time I still run into this error and the job is killed and the application doesn't finish. Still have no idea why this is happening. I could really use some help here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."
I am having trouble with my standalone Spark cluster and I can't seem to find a solution anywhere. I hope that maybe someone can figure out what is going wrong so this issue might be resolved and I can continue with my work. I am currently attempting to use Python and the pyspark library to do distributed computing. I have two virtual machines set up for this cluster, one machine is being used as both the master and one of the slaves (*spark-mastr-1* with ip address: *xx.xx.xx.248*) and the other machine is being used as just a slave (*spark-wrkr-1* with ip address: *xx.xx.xx.247*). Both of them have 8GB of memory, 2 virtual sockets with 2 cores per socket (4 CPU cores per machine for a total of 8 cores in the cluster). Both of them have passwordless SSHing set up to each other (the master has passwordless SSHing set up for itself as well since it is also being used as one of the slaves). At first I thought that *247* was just unable to connect to *248*, but I ran a simple test with the Spark shell in order to check that the slaves are able to talk with the master and they seem to be able to talk to each other just fine. However, when I attempt to run my pyspark application, I still run into trouble with *247* connecting with *248*. Then I thought it was a memory issue, so I allocated 6GB of memory to each machine to use in Spark, but this did not resolve the issue. Finally, I tried to give the pyspark application more time before it times out as well as more retry attempts, but I still get the same error. The error code that stands out to me is: *org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark-mastr-1:xx* The following is the error that I receive on my most recent attempted run of the application: Traceback (most recent call last): File "/home/spark/enigma_analytics/rec_engine/submission.py", line 413, in main(sc,sw,sw_set) File "/home/spark/enigma_analytics/rec_engine/submission.py", line 391, in main run_engine(submission_type,inputSub,mdb_collection,mdb_collectionType,sw_set,sc,weighted=False) File "/home/spark/enigma_analytics/rec_engine/submission.py", line 332, in run_engine similarities_recRDD,recommendations = recommend(subRDD,mdb_collection,query_format,sw_set,sc) File "/home/spark/enigma_analytics/rec_engine/submission.py", line 204, in recommend idfsCorpusWeightsBroadcast = core.idfsRDD(corpus,sc) File "/home/spark/enigma_analytics/rec_engine/core.py", line 38, in idfsRDD idfsInputRDD = ta.inverseDocumentFrequency(corpusRDD) File "/home/spark/enigma_analytics/rec_engine/textAnalyzer.py", line 106, in inverseDocumentFrequency N = corpus.map(lambda doc: doc[0]).distinct().count() File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect File "/usr/local/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ File "/usr/local/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 5 (count at /home/spark/enigma_analytics/rec_engine/textAnalyzer.py:106) has failed the maximum allowable number of times: 4. Most recent failure reason: */org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark-mastr-1:44642/* at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) at org.apache.spark.api.python.PythonRunner$WriterThread$
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ..... on worker failure
Hi, I am running a Spark Streaming Job. I was testing the fault tolerance by killing one of the workers using the kill -9 command. What I understand is, when I kill a worker the process should not die and resume the execution. But, I am getting the following error and my process is halted. org.apache.spark.shuffle.FetchFailedException: Failed to connect to . Now, when I restart the same worker or (2 workers were running on the machine and I killed just one of them) then the execution resumes and the process is completed. Please help me in understanding why on a worker failure my process is not fault tolerant. Am I missing something ? Basically I need that my process resumes even if a worker is lost. Regards, Kundan
Re: org.apache.spark.shuffle.FetchFailedException
I have set spark.sql.shuffle.partitions=1000 then also its failing. On Tue, Aug 25, 2015 at 11:36 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try increasing sql partitions? On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com wrote: I am running this query on a data size of 4 billion rows and getting org.apache.spark.shuffle.FetchFailedException error. select adid,position,userid,price from ( select adid,position,userid,price, dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank FROM trainInfo) as tmp WHERE rank = 2 I have attached the error logs from spark-sql terminal. Please suggest what is the reason for these kind of errors and how can I resolve them. Regards, Kundan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.shuffle.FetchFailedException
Did you try increasing sql partitions? On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com wrote: I am running this query on a data size of 4 billion rows and getting org.apache.spark.shuffle.FetchFailedException error. select adid,position,userid,price from ( select adid,position,userid,price, dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank FROM trainInfo) as tmp WHERE rank = 2 I have attached the error logs from spark-sql terminal. Please suggest what is the reason for these kind of errors and how can I resolve them. Regards, Kundan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3
There were some similar discussion happened on JIRA https://issues.apache.org/jira/browse/SPARK-3633 may be that will give you some insights. Thanks Best Regards On Mon, May 18, 2015 at 10:49 PM, zia_kayani zia.kay...@platalytics.com wrote: Hi, I'm getting this exception after shifting my code from Spark 1.2 to Spark 1.3 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84, cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337), shuffleId=0, mapId=9, reduceId=1, message= org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: Failed to open file: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index (Permission denied) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191) ... 23 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3
Hi, I'm getting this exception after shifting my code from Spark 1.2 to Spark 1.3 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84, cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337), shuffleId=0, mapId=9, reduceId=1, message= org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: Failed to open file: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index (Permission denied) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191) ... 23 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org