Hi, First of all, I am very thankful for all of the amazing work that goes into this project! It has opened up so many doors for me! I am a long time Spark user, and was very excited to start working with the push-based shuffle service for an academic paper we are working on, but I encountered some difficulties along the way and am wondering if someone could help me resolve this new feature. I was able to get the push-based shuffle running on my yarn setup (I am using Dataproc but I added an additional spark 3.2 installation on top of the dataproc base installations using a custom image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced it with the new one for spark 3.2), however the main issue is that when I actually try to use spark shuffles using the push-based shuffle, I consistently encounter errors of the following sort:
22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 163.0 in stage 3.1 (TID 16729) (cluster-fast-w-0.c.altaeth-biolux.internal executor 1): FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal, 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message= org.apache.spark.shuffle.FetchFailedException at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.IOException: Failed to send RPC StreamChunkId[streamId=1514743314249,chunkIndex=59] to cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337: java.io.IOException: Connection reset by peer at org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146) at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) at org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:51) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) 22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler: Exception encountered when trying to finalize shuffle merge on cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1 java.lang.RuntimeException: java.lang.UnsupportedOperationException: Cannot handle shuffle block merge This arises using the following conf: PYSPARK_DRIVER_PYTHON=`which ipython` \ PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python \ pyspark --master yarn \ --deploy-mode client \ --driver-memory 50g \ --conf spark.executor.memory=114000m \ --conf spark.task.cpus=1 --conf spark.executor.cores=32 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \ --num-executors 500 \ --conf spark.task.maxFailures=30 \ --conf spark.storage.replication.active=true \ --conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \ --conf spark.executor.memoryOverhead=2048m --conf spark.stage.maxConsecutiveAttempts=1000 -\ -conf spark.default.parallelism=10811 \ --conf spark.sql.shuffle.partitions=10811 \ --conf spark.sql.sources.partitionOverwriteMode="dynamic" \ --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \ --conf spark.hadoop.dfs.replication=1 \ --conf spark.shuffle.io.numConnectionsPerPeer=5 \ --conf spark.locality.wait=3s \ --conf spark.shuffle.push.enabled=true \ --conf spark.shuffle.push.maxRetainedMergerLocations=1500 \ --conf spark.shuffle.service.enabled=true \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver \ --conf spark.yarn.shuffle.stopOnFailure=false \ --conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \ --conf spark.shuffle.push.mergersMinStaticThreshold=1 which cause the stage to be retried multiple times. Do you know if there is something obvious that might be wrong with this setup? Thank you so much for your time and consideration! Best, Han