Hi Mich, I used the serverless spark session, not the local mode in the notebook. So machine type does not matter in this case. Below is the chart for serverless spark session execution. I also tried to increase executor memory and core, but the issue did got get resolved. I will try shutting down autoscaling, and see what will happen. [image: Serverless Session Executors-4core.png]
On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > for your dataproc what type of machines are you using for example > n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if > autoscaling turned on. > > most likely executor memory limit? > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 10 Mar 2023 at 15:35, Gary Liu <gary...@gmail.com> wrote: > >> Hi , >> >> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a >> job involving multiple joinings, as well as a complex UDF. I always got the >> below FetchFailedException, but the job can be done and the results look >> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns, >> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran >> very smoothly on and on-premise spark environment though. >> >> According to Google's document ( >> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures), >> it has 3 solutions: >> 1. Using EFM mode >> 2. Increase executor memory >> 3, decrease the number of job partitions. >> >> 1. I started the session from a vertex notebook, so I don't know how to >> use EFM mode. >> 2. I increased executor memory from the default 12GB to 25GB, and the >> number of cores from 4 to 8, but it did not solve the problem. >> 3. Wonder how to do this? repartition the input dataset to have less >> partitions? I used df.rdd.getNumPartitions() to check the input data >> partitions, they have 9 and 17 partitions respectively, should I decrease >> them further? I also read a post on StackOverflow ( >> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se), >> saying increasing partitions may help.Which one makes more sense? I >> repartitioned the input data to 20 and 30 partitions, but still no luck. >> >> Any suggestions? >> >> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID >> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, >> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message= >> org.apache.spark.shuffle.FetchFailedException >> at >> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85) >> at >> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) >> at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587) >> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601) >> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) >> at >> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) >> at >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) >> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576) >> at >> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) >> at >> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) >> at >> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) >> at >> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> at >> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) >> at org.apache.spark.scheduler.Task.run(Task.scala:136) >> at >> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) >> at java.base/java.lang.Thread.run(Thread.java:833) >> Caused by: org.apache.spark.ExecutorDeadException: The relative remote >> executor(Id: 72), which maintains the block data to fetch is dead. >> at >> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136) >> at >> org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173) >> at >> org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152) >> at >> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:146) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:363) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1150) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1142) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:702) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:192) >> at >> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89) >> at >> org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:240) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) >> ... 40 more >> >> ) >> >> >> -- >> Gary Liu >> > -- Gary Liu