for your dataproc what type of machines are you using for example
n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
autoscaling turned on.

most likely executor memory limit?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Mar 2023 at 15:35, Gary Liu <gary...@gmail.com> wrote:

> Hi ,
>
> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
> job involving multiple joinings, as well as a complex UDF. I always got the
> below FetchFailedException, but the job can be done and the results look
> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
> very smoothly on and on-premise spark environment though.
>
> According to Google's document (
> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
> it has 3 solutions:
> 1. Using EFM mode
> 2. Increase executor memory
> 3, decrease the number of job partitions.
>
> 1. I started the session from a vertex notebook, so I don't know how to
> use EFM mode.
> 2. I increased executor memory from the default 12GB to 25GB, and the
> number of cores from 4 to 8, but it did not solve the problem.
> 3. Wonder how to do this? repartition the input dataset to have less
> partitions? I used df.rdd.getNumPartitions() to check the input data
> partitions, they have 9 and 17 partitions respectively, should I decrease
> them further? I also read a post on StackOverflow (
> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
> saying increasing partitions may help.Which one makes more sense? I
> repartitioned the input data to 20 and 30 partitions, but still no luck.
>
> Any suggestions?
>
> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
> org.apache.spark.shuffle.FetchFailedException
>       at 
> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>       at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>       at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>       at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>       at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>       at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>       at org.apache.spark.scheduler.Task.run(Task.scala:136)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>       at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: org.apache.spark.ExecutorDeadException: The relative remote 
> executor(Id: 72), which maintains the block data to fetch is dead.
>       at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
>       at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:146)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:363)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1150)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1142)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:702)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:192)
>       at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
>       at 
> org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:240)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
>       ... 40 more
>
> )
>
>
> --
> Gary Liu
>

Reply via email to