Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-14 Thread Gary Liu
Hi Mich,

The y-axis is the number of executors. The code ran on dataproc serverless
spark on 3.3.2.

I tried closing autoscaling by setting the following:

spark.dynamicAllocation.enabled=false
spark.executor.instances=60

And still got the FetchFailedException error. I Wonder why it can run
without problem in a vertex notebook with local mode, which has less
resources. Of course it ran much longer time (8 hours local mode vs. 30 min
in serverless)

Will try to break the jobs into smaller parts, and see which step exactly
caused the problem.

Thanks!

On Mon, Mar 13, 2023 at 11:26 AM Mich Talebzadeh 
wrote:

> Hi Gary
>
> Thanks for the update. So  this serverless dataproc. on 3.3.1. Maybe an
> autoscaling policy could be an option. What is y-axis? Is that the capacity?
>
> Can you break down the join into multiple parts and save the intermediate
> result set?
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 13 Mar 2023 at 14:56, Gary Liu  wrote:
>
>> Hi Mich,
>> I used the serverless spark session, not the local mode in the notebook.
>> So machine type does not matter in this case. Below is the chart for
>> serverless spark session execution. I also tried to increase executor
>> memory and core, but the issue did got get resolved. I will try shutting
>> down autoscaling, and see what will happen.
>> [image: Serverless Session Executors-4core.png]
>>
>>
>> On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> for your dataproc what type of machines are you using for example
>>> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
>>> autoscaling turned on.
>>>
>>> most likely executor memory limit?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>>>
>>>> Hi ,
>>>>
>>>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is
>>>> a job involving multiple joinings, as well as a complex UDF. I always got
>>>> the below FetchFailedException, but the job can be done and the results
>>>> look right. Neither of 2 input data is very big (one is 6.5M rows*11
>>>> columns, ~150M in orc format and 17.7M rows*11 columns, ~400M in orc
>>>> format). It ran very smoothly on and on-premise spark environment though.
>>>>
>>>> According to Google's document (
>>>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>>>> it has 3 solutions:
>>>> 1. Using EFM mode
>>>> 2. Increase executor memory
>>>> 3, decrease the number of job partitions.
>>>>
>>>> 1. I started the session from a vertex notebook, so I don't know how to
>>>> use EFM mode.
>>>> 2. I increased executor memory from the default 12GB to 25GB, and the
>>>> number of cores from 4 to 8, but it did not solve the problem.
>>>> 3. Wonder how to do this? repartition the input dataset to have less
>>>> partitions? I used df.rdd.getNumPartitions() to check the input data
>>>> partitions, they have 9 and 17 partitions respectively, should I decrease
>>>> them further? I also read a post on StackOverflow (
>>>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>>>> saying increasing partitions 

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-13 Thread Mich Talebzadeh
Hi Gary

Thanks for the update. So  this serverless dataproc. on 3.3.1. Maybe an
autoscaling policy could be an option. What is y-axis? Is that the capacity?

Can you break down the join into multiple parts and save the intermediate
result set?


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 13 Mar 2023 at 14:56, Gary Liu  wrote:

> Hi Mich,
> I used the serverless spark session, not the local mode in the notebook.
> So machine type does not matter in this case. Below is the chart for
> serverless spark session execution. I also tried to increase executor
> memory and core, but the issue did got get resolved. I will try shutting
> down autoscaling, and see what will happen.
> [image: Serverless Session Executors-4core.png]
>
>
> On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> for your dataproc what type of machines are you using for example
>> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
>> autoscaling turned on.
>>
>> most likely executor memory limit?
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>>
>>> Hi ,
>>>
>>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
>>> job involving multiple joinings, as well as a complex UDF. I always got the
>>> below FetchFailedException, but the job can be done and the results look
>>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
>>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
>>> very smoothly on and on-premise spark environment though.
>>>
>>> According to Google's document (
>>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>>> it has 3 solutions:
>>> 1. Using EFM mode
>>> 2. Increase executor memory
>>> 3, decrease the number of job partitions.
>>>
>>> 1. I started the session from a vertex notebook, so I don't know how to
>>> use EFM mode.
>>> 2. I increased executor memory from the default 12GB to 25GB, and the
>>> number of cores from 4 to 8, but it did not solve the problem.
>>> 3. Wonder how to do this? repartition the input dataset to have less
>>> partitions? I used df.rdd.getNumPartitions() to check the input data
>>> partitions, they have 9 and 17 partitions respectively, should I decrease
>>> them further? I also read a post on StackOverflow (
>>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>>> saying increasing partitions may help.Which one makes more sense? I
>>> repartitioned the input data to 20 and 30 partitions, but still no luck.
>>>
>>> Any suggestions?
>>>
>>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
>>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
>>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
>>> org.apache.spark.shuffle.FetchFailedException
>>> at 
>>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(Shuf

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-13 Thread Gary Liu
Hi Mich,
I used the serverless spark session, not the local mode in the notebook. So
machine type does not matter in this case. Below is the chart for
serverless spark session execution. I also tried to increase executor
memory and core, but the issue did got get resolved. I will try shutting
down autoscaling, and see what will happen.
[image: Serverless Session Executors-4core.png]


On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh 
wrote:

> for your dataproc what type of machines are you using for example
> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
> autoscaling turned on.
>
> most likely executor memory limit?
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>
>> Hi ,
>>
>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
>> job involving multiple joinings, as well as a complex UDF. I always got the
>> below FetchFailedException, but the job can be done and the results look
>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
>> very smoothly on and on-premise spark environment though.
>>
>> According to Google's document (
>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>> it has 3 solutions:
>> 1. Using EFM mode
>> 2. Increase executor memory
>> 3, decrease the number of job partitions.
>>
>> 1. I started the session from a vertex notebook, so I don't know how to
>> use EFM mode.
>> 2. I increased executor memory from the default 12GB to 25GB, and the
>> number of cores from 4 to 8, but it did not solve the problem.
>> 3. Wonder how to do this? repartition the input dataset to have less
>> partitions? I used df.rdd.getNumPartitions() to check the input data
>> partitions, they have 9 and 17 partitions respectively, should I decrease
>> them further? I also read a post on StackOverflow (
>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>> saying increasing partitions may help.Which one makes more sense? I
>> repartitioned the input data to 20 and 30 partitions, but still no luck.
>>
>> Any suggestions?
>>
>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
>> org.apache.spark.shuffle.FetchFailedException
>>  at 
>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>>  at 
>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>  at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>>  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>  at 
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>  at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>  at 
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>>  at 
>> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>>  at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>>  at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitions

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-10 Thread Mich Talebzadeh
for your dataproc what type of machines are you using for example
n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
autoscaling turned on.

most likely executor memory limit?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:

> Hi ,
>
> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
> job involving multiple joinings, as well as a complex UDF. I always got the
> below FetchFailedException, but the job can be done and the results look
> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
> very smoothly on and on-premise spark environment though.
>
> According to Google's document (
> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
> it has 3 solutions:
> 1. Using EFM mode
> 2. Increase executor memory
> 3, decrease the number of job partitions.
>
> 1. I started the session from a vertex notebook, so I don't know how to
> use EFM mode.
> 2. I increased executor memory from the default 12GB to 25GB, and the
> number of cores from 4 to 8, but it did not solve the problem.
> 3. Wonder how to do this? repartition the input dataset to have less
> partitions? I used df.rdd.getNumPartitions() to check the input data
> partitions, they have 9 and 17 partitions respectively, should I decrease
> them further? I also read a post on StackOverflow (
> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
> saying increasing partitions may help.Which one makes more sense? I
> repartitioned the input data to 20 and 30 partitions, but still no luck.
>
> Any suggestions?
>
> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
> org.apache.spark.shuffle.FetchFailedException
>   at 
> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>   at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.

org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-10 Thread Gary Liu
Hi ,

I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
job involving multiple joinings, as well as a complex UDF. I always got the
below FetchFailedException, but the job can be done and the results look
right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
very smoothly on and on-premise spark environment though.

According to Google's document (
https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
it has 3 solutions:
1. Using EFM mode
2. Increase executor memory
3, decrease the number of job partitions.

1. I started the session from a vertex notebook, so I don't know how to use
EFM mode.
2. I increased executor memory from the default 12GB to 25GB, and the
number of cores from 4 to 8, but it did not solve the problem.
3. Wonder how to do this? repartition the input dataset to have less
partitions? I used df.rdd.getNumPartitions() to check the input data
partitions, they have 9 and 17 partitions respectively, should I decrease
them further? I also read a post on StackOverflow (
https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
saying increasing partitions may help.Which one makes more sense? I
repartitioned the input data to 20 and 30 partitions, but still no luck.

Any suggestions?

23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0
(TID 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72,
10.1.15.199, 36791, None), shuffleId=24, mapIndex=77, mapId=3457,
reduceId=58, message=
org.apache.spark.shuffle.FetchFailedException
at 
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
at 
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52

Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-03 Thread Ryan Blue
Yes, you can usually use a broadcast join to avoid skew problems.

On Wed, May 2, 2018 at 8:57 PM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> I am performing join operation , if I convert reduce side join to map side
> (no shuffle will happen)  and I assume in that case this error shouldn't
> come. Let me know if this understanding is correct
>
> On Tue, May 1, 2018 at 9:37 PM, Ryan Blue <rb...@netflix.com> wrote:
>
>> This is usually caused by skew. Sometimes you can work around it by in
>> creasing the number of partitions like you tried, but when that doesn’t
>> work you need to change the partitioning that you’re using.
>>
>> If you’re aggregating, try adding an intermediate aggregation. For
>> example, if your query is select sum(x), a from t group by a, then try select
>> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
>> group by a.
>>
>> rb
>> ​
>>
>> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I am getting the above error in Spark SQL . I have increase (using 5000
>>> ) number of partitions but still getting the same error .
>>>
>>> My data most probably is skew.
>>>
>>>
>>>
>>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-02 Thread Pralabh Kumar
I am performing join operation , if I convert reduce side join to map side
(no shuffle will happen)  and I assume in that case this error shouldn't
come. Let me know if this understanding is correct

On Tue, May 1, 2018 at 9:37 PM, Ryan Blue <rb...@netflix.com> wrote:

> This is usually caused by skew. Sometimes you can work around it by in
> creasing the number of partitions like you tried, but when that doesn’t
> work you need to change the partitioning that you’re using.
>
> If you’re aggregating, try adding an intermediate aggregation. For
> example, if your query is select sum(x), a from t group by a, then try select
> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
> group by a.
>
> rb
> ​
>
> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com>
> wrote:
>
>> Hi
>>
>> I am getting the above error in Spark SQL . I have increase (using 5000 )
>> number of partitions but still getting the same error .
>>
>> My data most probably is skew.
>>
>>
>>
>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-01 Thread Ryan Blue
This is usually caused by skew. Sometimes you can work around it by in
creasing the number of partitions like you tried, but when that doesn’t
work you need to change the partitioning that you’re using.

If you’re aggregating, try adding an intermediate aggregation. For example,
if your query is select sum(x), a from t group by a, then try select
sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
group by a.

rb
​

On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi
>
> I am getting the above error in Spark SQL . I have increase (using 5000 )
> number of partitions but still getting the same error .
>
> My data most probably is skew.
>
>
>
> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>
>


-- 
Ryan Blue
Software Engineer
Netflix


org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-01 Thread Pralabh Kumar
Hi

I am getting the above error in Spark SQL . I have increase (using 5000 )
number of partitions but still getting the same error .

My data most probably is skew.



org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)


Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-29 Thread 周康
I think you should check the rpc target, may be the nodemanager has memory
issue like gc or other.Check it out first.
And i wonder why you assign  --executor-cores 8?

2017-07-29 7:40 GMT+08:00 jeff saremi <jeffsar...@hotmail.com>:

> asking this on a tangent:
>
> Is there anyway for the shuffle data to be replicated to more than one
> server?
>
> thanks
>
> --
> *From:* jeff saremi <jeffsar...@hotmail.com>
> *Sent:* Friday, July 28, 2017 4:38:08 PM
> *To:* Juan Rodríguez Hortalá
>
> *Cc:* user@spark.apache.org
> *Subject:* Re: Job keeps aborting because of 
> org.apache.spark.shuffle.FetchFailedException:
> Failed to connect to server/ip:39232
>
>
> Thanks Juan for taking the time
>
> Here's more info:
> - This is running on Yarn in Master mode
>
> - See config params below
>
> - This is a corporate environment. In general nodes should not be added or
> removed that often to the cluster. Even if that is the case I would expect
> that to be one or 2 servers. In my case I get hundreds of these errors
> before the job fails.
>
>   --master yarn-cluster ^
>   --driver-memory 96G ^
>   --executor-memory 48G ^
>   --num-executors 150 ^
>   --executor-cores 8 ^
>   --driver-cores 8 ^
>   --conf spark.yarn.executor.memoryOverhead=36000 ^
>   --conf spark.shuffle.service.enabled=true ^
>   --conf spark.yarn.submit.waitAppCompletion=false ^
>   --conf spark.yarn.submit.file.replication=64 ^
>   --conf spark.yarn.maxAppAttempts=1 ^
>   --conf spark.speculation=true ^
>   --conf spark.speculation.quantile=0.9 ^
>   --conf spark.yarn.executor.nodeLabelExpression="prod" ^
>   --conf spark.yarn.am.nodeLabelExpression="prod" ^
>   --conf spark.stage.maxConsecutiveAttempts=1000 ^
>   --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^
>   --conf spark.yarn.launchContainer.count.simultaneously=50 ^
>   --conf spark.driver.maxResultSize=16G ^
>   --conf spark.network.timeout=1000s ^
>
> --
> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
> *Sent:* Friday, July 28, 2017 4:20:40 PM
> *To:* jeff saremi
> *Cc:* user@spark.apache.org
> *Subject:* Re: Job keeps aborting because of 
> org.apache.spark.shuffle.FetchFailedException:
> Failed to connect to server/ip:39232
>
> Hi Jeff,
>
> Can you provide more information about how are you running your job? In
> particular:
>   - which cluster manager are you using? It is YARN, Mesos, Spark
> Standalone?
>   - with configuration options are you using to submit the job? In
> particular are you using dynamic allocation or external shuffle? You should
> be able to see this in the Environment tab of the Spark UI, looking
> for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
>   - in which environment are you running the jobs? Is this an on premise
> cluster or some cloud provider? Are you adding or removing nodes from the
> cluster during the job execution?
>
> FetchFailedException errors happen during execution when an executor is
> not able to read the shuffle blocks for a previous stage that are served by
> other executor. That might happen if the executor that has to serve the
> files dies and internal shuffle is used, although there can be other
> reasons like network errors. If you are using dynamic allocation then you
> should also enable external shuffle service so shuffle blocks can be served
> by the node manager after the executor that created the blocks is
> terminated, see https://spark.apache.org/docs/latest/job-scheduling.html#
> dynamic-resource-allocation for more details.
>
>
>
> On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> We have a not too complex and not too large spark job that keeps dying
>> with this error
>>
>> I have researched it and I have not seen any convincing explanation on why
>>
>> I am not using a shuffle service. Which server is the one that is
>> refusing the connection?
>> If I go to the server that is being reported in the error message, I see
>> a lot of these errors towards the end:
>>
>> java.io.FileNotFoundException: 
>> D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index
>>
>> (may or may not be related to the problem at all)
>>
>> and if you examine further on this machine there are
>> fetchfailedexceptions resulting from other machines and so on and so forth
>>
>>
>> This is Spark 1.6 on Yarn-master
>>
>>
>> Could anyone provide some insight or solution to this?
>>
>> thanks
>>
>>
>>
>


Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread jeff saremi
asking this on a tangent:

Is there anyway for the shuffle data to be replicated to more than one server?

thanks


From: jeff saremi <jeffsar...@hotmail.com>
Sent: Friday, July 28, 2017 4:38:08 PM
To: Juan Rodríguez Hortalá
Cc: user@spark.apache.org
Subject: Re: Job keeps aborting because of 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
server/ip:39232


Thanks Juan for taking the time

Here's more info:
- This is running on Yarn in Master mode

- See config params below

- This is a corporate environment. In general nodes should not be added or 
removed that often to the cluster. Even if that is the case I would expect that 
to be one or 2 servers. In my case I get hundreds of these errors before the 
job fails.


  --master yarn-cluster ^
  --driver-memory 96G ^
  --executor-memory 48G ^
  --num-executors 150 ^
  --executor-cores 8 ^
  --driver-cores 8 ^
  --conf spark.yarn.executor.memoryOverhead=36000 ^
  --conf spark.shuffle.service.enabled=true ^
  --conf spark.yarn.submit.waitAppCompletion=false ^
  --conf spark.yarn.submit.file.replication=64 ^
  --conf spark.yarn.maxAppAttempts=1 ^
  --conf spark.speculation=true ^
  --conf spark.speculation.quantile=0.9 ^
  --conf spark.yarn.executor.nodeLabelExpression="prod" ^
  --conf spark.yarn.am.nodeLabelExpression="prod" ^
  --conf spark.stage.maxConsecutiveAttempts=1000 ^
  --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^
  --conf spark.yarn.launchContainer.count.simultaneously=50 ^
  --conf spark.driver.maxResultSize=16G ^
  --conf spark.network.timeout=1000s ^



From: Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
Sent: Friday, July 28, 2017 4:20:40 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: Job keeps aborting because of 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
server/ip:39232

Hi Jeff,

Can you provide more information about how are you running your job? In 
particular:
  - which cluster manager are you using? It is YARN, Mesos, Spark Standalone?
  - with configuration options are you using to submit the job? In particular 
are you using dynamic allocation or external shuffle? You should be able to see 
this in the Environment tab of the Spark UI, looking for 
spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
  - in which environment are you running the jobs? Is this an on premise 
cluster or some cloud provider? Are you adding or removing nodes from the 
cluster during the job execution?

FetchFailedException errors happen during execution when an executor is not 
able to read the shuffle blocks for a previous stage that are served by other 
executor. That might happen if the executor that has to serve the files dies 
and internal shuffle is used, although there can be other reasons like network 
errors. If you are using dynamic allocation then you should also enable 
external shuffle service so shuffle blocks can be served by the node manager 
after the executor that created the blocks is terminated, see 
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
 for more details.



On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi 
<jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote:

We have a not too complex and not too large spark job that keeps dying with 
this error

I have researched it and I have not seen any convincing explanation on why

I am not using a shuffle service. Which server is the one that is refusing the 
connection?
If I go to the server that is being reported in the error message, I see a lot 
of these errors towards the end:


java.io.FileNotFoundException: 
D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index

(may or may not be related to the problem at all)


and if you examine further on this machine there are fetchfailedexceptions 
resulting from other machines and so on and so forth

This is Spark 1.6 on Yarn-master

Could anyone provide some insight or solution to this?

thanks




Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread jeff saremi
Thanks Juan for taking the time

Here's more info:
- This is running on Yarn in Master mode

- See config params below

- This is a corporate environment. In general nodes should not be added or 
removed that often to the cluster. Even if that is the case I would expect that 
to be one or 2 servers. In my case I get hundreds of these errors before the 
job fails.


  --master yarn-cluster ^
  --driver-memory 96G ^
  --executor-memory 48G ^
  --num-executors 150 ^
  --executor-cores 8 ^
  --driver-cores 8 ^
  --conf spark.yarn.executor.memoryOverhead=36000 ^
  --conf spark.shuffle.service.enabled=true ^
  --conf spark.yarn.submit.waitAppCompletion=false ^
  --conf spark.yarn.submit.file.replication=64 ^
  --conf spark.yarn.maxAppAttempts=1 ^
  --conf spark.speculation=true ^
  --conf spark.speculation.quantile=0.9 ^
  --conf spark.yarn.executor.nodeLabelExpression="prod" ^
  --conf spark.yarn.am.nodeLabelExpression="prod" ^
  --conf spark.stage.maxConsecutiveAttempts=1000 ^
  --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^
  --conf spark.yarn.launchContainer.count.simultaneously=50 ^
  --conf spark.driver.maxResultSize=16G ^
  --conf spark.network.timeout=1000s ^



From: Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
Sent: Friday, July 28, 2017 4:20:40 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: Job keeps aborting because of 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
server/ip:39232

Hi Jeff,

Can you provide more information about how are you running your job? In 
particular:
  - which cluster manager are you using? It is YARN, Mesos, Spark Standalone?
  - with configuration options are you using to submit the job? In particular 
are you using dynamic allocation or external shuffle? You should be able to see 
this in the Environment tab of the Spark UI, looking for 
spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
  - in which environment are you running the jobs? Is this an on premise 
cluster or some cloud provider? Are you adding or removing nodes from the 
cluster during the job execution?

FetchFailedException errors happen during execution when an executor is not 
able to read the shuffle blocks for a previous stage that are served by other 
executor. That might happen if the executor that has to serve the files dies 
and internal shuffle is used, although there can be other reasons like network 
errors. If you are using dynamic allocation then you should also enable 
external shuffle service so shuffle blocks can be served by the node manager 
after the executor that created the blocks is terminated, see 
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
 for more details.



On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi 
<jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote:

We have a not too complex and not too large spark job that keeps dying with 
this error

I have researched it and I have not seen any convincing explanation on why

I am not using a shuffle service. Which server is the one that is refusing the 
connection?
If I go to the server that is being reported in the error message, I see a lot 
of these errors towards the end:


java.io.FileNotFoundException: 
D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index

(may or may not be related to the problem at all)


and if you examine further on this machine there are fetchfailedexceptions 
resulting from other machines and so on and so forth

This is Spark 1.6 on Yarn-master

Could anyone provide some insight or solution to this?

thanks




Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread Juan Rodríguez Hortalá
Hi Jeff,

Can you provide more information about how are you running your job? In
particular:
  - which cluster manager are you using? It is YARN, Mesos, Spark
Standalone?
  - with configuration options are you using to submit the job? In
particular are you using dynamic allocation or external shuffle? You should
be able to see this in the Environment tab of the Spark UI, looking
for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled.
  - in which environment are you running the jobs? Is this an on premise
cluster or some cloud provider? Are you adding or removing nodes from the
cluster during the job execution?

FetchFailedException errors happen during execution when an executor is not
able to read the shuffle blocks for a previous stage that are served by
other executor. That might happen if the executor that has to serve the
files dies and internal shuffle is used, although there can be other
reasons like network errors. If you are using dynamic allocation then you
should also enable external shuffle service so shuffle blocks can be served
by the node manager after the executor that created the blocks is
terminated, see
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
for more details.



On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi  wrote:

> We have a not too complex and not too large spark job that keeps dying
> with this error
>
> I have researched it and I have not seen any convincing explanation on why
>
> I am not using a shuffle service. Which server is the one that is refusing
> the connection?
> If I go to the server that is being reported in the error message, I see a
> lot of these errors towards the end:
>
> java.io.FileNotFoundException: 
> D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index
>
> (may or may not be related to the problem at all)
>
> and if you examine further on this machine there are fetchfailedexceptions
> resulting from other machines and so on and so forth
>
>
> This is Spark 1.6 on Yarn-master
>
>
> Could anyone provide some insight or solution to this?
>
> thanks
>
>
>


Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232

2017-07-28 Thread jeff saremi
We have a not too complex and not too large spark job that keeps dying with 
this error

I have researched it and I have not seen any convincing explanation on why

I am not using a shuffle service. Which server is the one that is refusing the 
connection?
If I go to the server that is being reported in the error message, I see a lot 
of these errors towards the end:


java.io.FileNotFoundException: 
D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index

(may or may not be related to the problem at all)


and if you examine further on this machine there are fetchfailedexceptions 
resulting from other machines and so on and so forth

This is Spark 1.6 on Yarn-master

Could anyone provide some insight or solution to this?

thanks



Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."

2016-05-04 Thread HLee
I had the same problem.  One forum post elsewhere suggested that too much
network communication might be using up available ports.  I reduced the
partition size via repartition(int) and it solved the problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26879.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."

2016-03-20 Thread craigiggy
Also, this is the command I use to submit the Spark application:

**

where *recommendation_engine-0.1-py2.7.egg* is a Python egg of my own
library I've written for this application, and *'file'* and
*'/home/spark/enigma_analytics/tests/msg-epims0730_small.json'* are input
arguments for the application.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26532.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."

2016-03-19 Thread craigiggy
Slight update I suppose?
For some reason, sometimes it will connect and continue and the job will be
completed. But most of the time I still run into this error and the job is
killed and the application doesn't finish.

Still have no idea why this is happening. I could really use some help here.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26531.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."

2016-03-15 Thread craigiggy
I am having trouble with my standalone Spark cluster and I can't seem to find
a solution anywhere. I hope that maybe someone can figure out what is going
wrong so this issue might be resolved and I can continue with my work.

I am currently attempting to use Python and the pyspark library to do
distributed computing. I have two virtual machines set up for this cluster,
one machine is being used as both the master and one of the slaves
(*spark-mastr-1* with ip address: *xx.xx.xx.248*) and the other machine is
being used as just a slave (*spark-wrkr-1* with ip address: *xx.xx.xx.247*).
Both of them have 8GB of memory, 2 virtual sockets with 2 cores per socket
(4 CPU cores per machine for a total of 8 cores in the cluster). Both of
them have passwordless SSHing set up to each other (the master has
passwordless SSHing set up for itself as well since it is also being used as
one of the slaves).

At first I thought that *247* was just unable to connect to *248*, but I ran
a simple test with the Spark shell in order to check that the slaves are
able to talk with the master and they seem to be able to talk to each other
just fine. However, when I attempt to run my pyspark application, I still
run into trouble with *247* connecting with *248*. Then I thought it was a
memory issue, so I allocated 6GB of memory to each machine to use in Spark,
but this did not resolve the issue. Finally, I tried to give the pyspark
application more time before it times out as well as more retry attempts,
but I still get the same error. The error code that stands out to me is:

*org.apache.spark.shuffle.FetchFailedException: Failed to connect to
spark-mastr-1:xx*


The following is the error that I receive on my most recent attempted run of
the application:

Traceback (most recent call last):
  File "/home/spark/enigma_analytics/rec_engine/submission.py", line 413, in

main(sc,sw,sw_set)
  File "/home/spark/enigma_analytics/rec_engine/submission.py", line 391, in
main
   
run_engine(submission_type,inputSub,mdb_collection,mdb_collectionType,sw_set,sc,weighted=False)
  File "/home/spark/enigma_analytics/rec_engine/submission.py", line 332, in
run_engine
similarities_recRDD,recommendations =
recommend(subRDD,mdb_collection,query_format,sw_set,sc)
  File "/home/spark/enigma_analytics/rec_engine/submission.py", line 204, in
recommend
idfsCorpusWeightsBroadcast = core.idfsRDD(corpus,sc)
  File "/home/spark/enigma_analytics/rec_engine/core.py", line 38, in
idfsRDD
idfsInputRDD = ta.inverseDocumentFrequency(corpusRDD)
  File "/home/spark/enigma_analytics/rec_engine/textAnalyzer.py", line 106,
in inverseDocumentFrequency
N = corpus.map(lambda doc: doc[0]).distinct().count()
  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py",
line 1004, in count
  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py",
line 995, in sum
  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py",
line 869, in fold
  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/rdd.py",
line 771, in collect
  File
"/usr/local/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __call__
  File
"/usr/local/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py",
line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure:
ResultStage 5 (count at
/home/spark/enigma_analytics/rec_engine/textAnalyzer.py:106) has failed the
maximum allowable number of times: 4. Most recent failure reason:
*/org.apache.spark.shuffle.FetchFailedException: Failed to connect to
spark-mastr-1:44642/*
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRunner$WriterThread$

org.apache.spark.shuffle.FetchFailedException: Failed to connect to ..... on worker failure

2015-10-28 Thread kundan kumar
Hi,

I am running a Spark Streaming Job. I was testing the fault tolerance by
killing one of the workers using the kill -9 command.

What I understand is, when I kill a worker the process should not die and
resume the execution.

But, I am getting the following error and my process is halted.

org.apache.spark.shuffle.FetchFailedException: Failed to connect to .



Now, when I restart the same worker or (2 workers were running on the
machine  and I killed just one of them) then the execution resumes and the
process is completed.

Please help me in understanding why on a worker failure my process is not
fault tolerant. Am I missing something ? Basically I need that my process
resumes even if a worker is lost.



Regards,
Kundan


Re: org.apache.spark.shuffle.FetchFailedException

2015-08-25 Thread kundan kumar
I have set spark.sql.shuffle.partitions=1000 then also its failing.



On Tue, Aug 25, 2015 at 11:36 AM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 Did you try increasing sql partitions?

 On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com
 wrote:

 I am running this query on a data size of 4 billion rows and
 getting org.apache.spark.shuffle.FetchFailedException error.

 select adid,position,userid,price
 from (
 select adid,position,userid,price,
 dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank
 FROM trainInfo) as tmp
 WHERE rank = 2


 I have attached the error logs from spark-sql terminal.

 Please suggest what is the reason for these kind of errors and how can I
 resolve them.


 Regards,
 Kundan


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: org.apache.spark.shuffle.FetchFailedException

2015-08-25 Thread Raghavendra Pandey
Did you try increasing sql partitions?

On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com
wrote:

 I am running this query on a data size of 4 billion rows and
 getting org.apache.spark.shuffle.FetchFailedException error.

 select adid,position,userid,price
 from (
 select adid,position,userid,price,
 dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank
 FROM trainInfo) as tmp
 WHERE rank = 2


 I have attached the error logs from spark-sql terminal.

 Please suggest what is the reason for these kind of errors and how can I
 resolve them.


 Regards,
 Kundan


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3

2015-05-19 Thread Akhil Das
There were some similar discussion happened on JIRA
https://issues.apache.org/jira/browse/SPARK-3633 may be that will give you
some insights.

Thanks
Best Regards

On Mon, May 18, 2015 at 10:49 PM, zia_kayani zia.kay...@platalytics.com
wrote:

 Hi, I'm getting this exception after shifting my code from Spark 1.2 to
 Spark
 1.3

 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84,
 cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337),
 shuffleId=0, mapId=9, reduceId=1, message=
 org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
 Failed to open file:

 /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
 at

 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
 at

 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
 at

 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
 at

 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
 at

 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at

 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at

 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at

 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
 at

 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at

 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at

 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at

 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.FileNotFoundException:

 /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
 (Permission denied)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191)
 ... 23 more



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3

2015-05-18 Thread zia_kayani
Hi, I'm getting this exception after shifting my code from Spark 1.2 to Spark
1.3

15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84,
cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337),
shuffleId=0, mapId=9, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
Failed to open file:
/tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
(Permission denied)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191)
... 23 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org