Hi Mich,
I used the serverless spark session, not the local mode in the notebook. So
machine type does not matter in this case. Below is the chart for
serverless spark session execution. I also tried to increase executor
memory and core, but the issue did got get resolved. I will try shutting
down autoscaling, and see what will happen.
[image: Serverless Session Executors-4core.png]


On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> for your dataproc what type of machines are you using for example
> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
> autoscaling turned on.
>
> most likely executor memory limit?
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 15:35, Gary Liu <gary...@gmail.com> wrote:
>
>> Hi ,
>>
>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
>> job involving multiple joinings, as well as a complex UDF. I always got the
>> below FetchFailedException, but the job can be done and the results look
>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
>> very smoothly on and on-premise spark environment though.
>>
>> According to Google's document (
>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>> it has 3 solutions:
>> 1. Using EFM mode
>> 2. Increase executor memory
>> 3, decrease the number of job partitions.
>>
>> 1. I started the session from a vertex notebook, so I don't know how to
>> use EFM mode.
>> 2. I increased executor memory from the default 12GB to 25GB, and the
>> number of cores from 4 to 8, but it did not solve the problem.
>> 3. Wonder how to do this? repartition the input dataset to have less
>> partitions? I used df.rdd.getNumPartitions() to check the input data
>> partitions, they have 9 and 17 partitions respectively, should I decrease
>> them further? I also read a post on StackOverflow (
>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>> saying increasing partitions may help.Which one makes more sense? I
>> repartitioned the input data to 20 and 30 partitions, but still no luck.
>>
>> Any suggestions?
>>
>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
>> org.apache.spark.shuffle.FetchFailedException
>>      at 
>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>>      at 
>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>      at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>>      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>>      at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>      at 
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>      at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>      at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>      at 
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>>      at 
>> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>>      at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>>      at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      at 
>> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:136)
>>      at 
>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
>>      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
>>      at 
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>>      at 
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>>      at java.base/java.lang.Thread.run(Thread.java:833)
>> Caused by: org.apache.spark.ExecutorDeadException: The relative remote 
>> executor(Id: 72), which maintains the block data to fetch is dead.
>>      at 
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
>>      at 
>> org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
>>      at 
>> org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
>>      at 
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:146)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:363)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1150)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1142)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:702)
>>      at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:192)
>>      at 
>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
>>      at 
>> org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:240)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>>      ... 40 more
>>
>> )
>>
>>
>> --
>> Gary Liu
>>
>

-- 
Gary Liu

Reply via email to