Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-14 Thread Gary Liu
Hi Mich,

The y-axis is the number of executors. The code ran on dataproc serverless
spark on 3.3.2.

I tried closing autoscaling by setting the following:

spark.dynamicAllocation.enabled=false
spark.executor.instances=60

And still got the FetchFailedException error. I Wonder why it can run
without problem in a vertex notebook with local mode, which has less
resources. Of course it ran much longer time (8 hours local mode vs. 30 min
in serverless)

Will try to break the jobs into smaller parts, and see which step exactly
caused the problem.

Thanks!

On Mon, Mar 13, 2023 at 11:26 AM Mich Talebzadeh 
wrote:

> Hi Gary
>
> Thanks for the update. So  this serverless dataproc. on 3.3.1. Maybe an
> autoscaling policy could be an option. What is y-axis? Is that the capacity?
>
> Can you break down the join into multiple parts and save the intermediate
> result set?
>
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 13 Mar 2023 at 14:56, Gary Liu  wrote:
>
>> Hi Mich,
>> I used the serverless spark session, not the local mode in the notebook.
>> So machine type does not matter in this case. Below is the chart for
>> serverless spark session execution. I also tried to increase executor
>> memory and core, but the issue did got get resolved. I will try shutting
>> down autoscaling, and see what will happen.
>> [image: Serverless Session Executors-4core.png]
>>
>>
>> On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> for your dataproc what type of machines are you using for example
>>> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
>>> autoscaling turned on.
>>>
>>> most likely executor memory limit?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>>>
 Hi ,

 I have a job in GCP dataproc server spark session (spark 3.3.2), it is
 a job involving multiple joinings, as well as a complex UDF. I always got
 the below FetchFailedException, but the job can be done and the results
 look right. Neither of 2 input data is very big (one is 6.5M rows*11
 columns, ~150M in orc format and 17.7M rows*11 columns, ~400M in orc
 format). It ran very smoothly on and on-premise spark environment though.

 According to Google's document (
 https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
 it has 3 solutions:
 1. Using EFM mode
 2. Increase executor memory
 3, decrease the number of job partitions.

 1. I started the session from a vertex notebook, so I don't know how to
 use EFM mode.
 2. I increased executor memory from the default 12GB to 25GB, and the
 number of cores from 4 to 8, but it did not solve the problem.
 3. Wonder how to do this? repartition the input dataset to have less
 partitions? I used df.rdd.getNumPartitions() to check the input data
 partitions, they have 9 and 17 partitions respectively, should I decrease
 them further? I also read a post on StackOverflow (
 https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
 saying increasing partitions may help.Which one makes more sense? I
 repartitioned the input data to 20 and 30 partitions, but still no luck.

 Any suggestions?

 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 
 10.1.15.199, 36791, None), shuffleId=24, mapIndex=77, mapId=3457, 
 reduceId=58, message=
 org.apache.spark.shuffle.FetchFailedException
at 
 org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
at 
 org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
at 
 

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-13 Thread Mich Talebzadeh
Hi Gary

Thanks for the update. So  this serverless dataproc. on 3.3.1. Maybe an
autoscaling policy could be an option. What is y-axis? Is that the capacity?

Can you break down the join into multiple parts and save the intermediate
result set?


HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 13 Mar 2023 at 14:56, Gary Liu  wrote:

> Hi Mich,
> I used the serverless spark session, not the local mode in the notebook.
> So machine type does not matter in this case. Below is the chart for
> serverless spark session execution. I also tried to increase executor
> memory and core, but the issue did got get resolved. I will try shutting
> down autoscaling, and see what will happen.
> [image: Serverless Session Executors-4core.png]
>
>
> On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> for your dataproc what type of machines are you using for example
>> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
>> autoscaling turned on.
>>
>> most likely executor memory limit?
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>>
>>> Hi ,
>>>
>>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
>>> job involving multiple joinings, as well as a complex UDF. I always got the
>>> below FetchFailedException, but the job can be done and the results look
>>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
>>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
>>> very smoothly on and on-premise spark environment though.
>>>
>>> According to Google's document (
>>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>>> it has 3 solutions:
>>> 1. Using EFM mode
>>> 2. Increase executor memory
>>> 3, decrease the number of job partitions.
>>>
>>> 1. I started the session from a vertex notebook, so I don't know how to
>>> use EFM mode.
>>> 2. I increased executor memory from the default 12GB to 25GB, and the
>>> number of cores from 4 to 8, but it did not solve the problem.
>>> 3. Wonder how to do this? repartition the input dataset to have less
>>> partitions? I used df.rdd.getNumPartitions() to check the input data
>>> partitions, they have 9 and 17 partitions respectively, should I decrease
>>> them further? I also read a post on StackOverflow (
>>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>>> saying increasing partitions may help.Which one makes more sense? I
>>> repartitioned the input data to 20 and 30 partitions, but still no luck.
>>>
>>> Any suggestions?
>>>
>>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
>>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
>>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
>>> org.apache.spark.shuffle.FetchFailedException
>>> at 
>>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>>> at 
>>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>> at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>>> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>> at 
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>> at 
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>> at 
>>> 

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-13 Thread Gary Liu
Hi Mich,
I used the serverless spark session, not the local mode in the notebook. So
machine type does not matter in this case. Below is the chart for
serverless spark session execution. I also tried to increase executor
memory and core, but the issue did got get resolved. I will try shutting
down autoscaling, and see what will happen.
[image: Serverless Session Executors-4core.png]


On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh 
wrote:

> for your dataproc what type of machines are you using for example
> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
> autoscaling turned on.
>
> most likely executor memory limit?
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>
>> Hi ,
>>
>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
>> job involving multiple joinings, as well as a complex UDF. I always got the
>> below FetchFailedException, but the job can be done and the results look
>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
>> very smoothly on and on-premise spark environment though.
>>
>> According to Google's document (
>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>> it has 3 solutions:
>> 1. Using EFM mode
>> 2. Increase executor memory
>> 3, decrease the number of job partitions.
>>
>> 1. I started the session from a vertex notebook, so I don't know how to
>> use EFM mode.
>> 2. I increased executor memory from the default 12GB to 25GB, and the
>> number of cores from 4 to 8, but it did not solve the problem.
>> 3. Wonder how to do this? repartition the input dataset to have less
>> partitions? I used df.rdd.getNumPartitions() to check the input data
>> partitions, they have 9 and 17 partitions respectively, should I decrease
>> them further? I also read a post on StackOverflow (
>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>> saying increasing partitions may help.Which one makes more sense? I
>> repartitioned the input data to 20 and 30 partitions, but still no luck.
>>
>> Any suggestions?
>>
>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
>> org.apache.spark.shuffle.FetchFailedException
>>  at 
>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>>  at 
>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>  at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>>  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>  at 
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>  at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>  at 
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>>  at 
>> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>>  at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>>  at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>  at 
>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>  at 

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-10 Thread Mich Talebzadeh
for your dataproc what type of machines are you using for example
n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
autoscaling turned on.

most likely executor memory limit?


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:

> Hi ,
>
> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
> job involving multiple joinings, as well as a complex UDF. I always got the
> below FetchFailedException, but the job can be done and the results look
> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
> very smoothly on and on-premise spark environment though.
>
> According to Google's document (
> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
> it has 3 solutions:
> 1. Using EFM mode
> 2. Increase executor memory
> 3, decrease the number of job partitions.
>
> 1. I started the session from a vertex notebook, so I don't know how to
> use EFM mode.
> 2. I increased executor memory from the default 12GB to 25GB, and the
> number of cores from 4 to 8, but it did not solve the problem.
> 3. Wonder how to do this? repartition the input dataset to have less
> partitions? I used df.rdd.getNumPartitions() to check the input data
> partitions, they have 9 and 17 partitions respectively, should I decrease
> them further? I also read a post on StackOverflow (
> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
> saying increasing partitions may help.Which one makes more sense? I
> repartitioned the input data to 20 and 30 partitions, but still no luck.
>
> Any suggestions?
>
> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
> org.apache.spark.shuffle.FetchFailedException
>   at 
> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>   at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>   at