Hi,

First of all, I am very thankful for all of the amazing work that goes into
this project! It has opened up so many doors for me! I am a long time Spark
user, and was very excited to start working with the push-based shuffle
service for an academic paper we are working on, but I encountered some
difficulties along the way and am wondering if someone could help me
resolve this new feature. I was able to get the push-based shuffle running
on my yarn setup (I am using Dataproc but I added an additional spark 3.2
installation on top of the dataproc base installations using a custom
image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
it with the new one for spark 3.2), however the main issue is that when I
actually try to use spark shuffles using the push-based shuffle, I
consistently encounter errors of the following sort:

22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost task
163.0 in stage 3.1 (TID 16729) (cluster-fast-w-0.c.altaeth-biolux.internal
executor 1): FetchFailed(BlockManagerId(2,
cluster-fast-w-1.c.altaeth-biolux.internal, 7337, None), shuffleId=0,
mapIndex=171, mapId=11287, reduceId=808, message=
org.apache.spark.shuffle.FetchFailedException
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
        at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
Source)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
Source)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Failed to send RPC
StreamChunkId[streamId=1514743314249,chunkIndex=59] to
cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
java.io.IOException: Connection reset by peer
        at
org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
        at
org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
        at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
        at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
        at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
        at
io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
        at
io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
        at
io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
        at
org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151)
        at
org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297)
        at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
        at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
        at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:51)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
        at
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408)
        at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
        at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
        at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
        at
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
        at
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
        at
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler: Exception
encountered when trying to finalize shuffle merge on
cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1
java.lang.RuntimeException: java.lang.UnsupportedOperationException: Cannot
handle shuffle block merge

This arises using the following conf:
PYSPARK_DRIVER_PYTHON=`which ipython` \
PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python \
pyspark --master yarn \
--deploy-mode client \
--driver-memory 50g \
--conf spark.executor.memory=114000m \
--conf spark.task.cpus=1
--conf spark.executor.cores=32 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \
--num-executors 500 \
--conf spark.task.maxFailures=30 \
--conf spark.storage.replication.active=true \
--conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \
--conf spark.executor.memoryOverhead=2048m
--conf spark.stage.maxConsecutiveAttempts=1000 -\
-conf spark.default.parallelism=10811 \
--conf spark.sql.shuffle.partitions=10811 \
--conf spark.sql.sources.partitionOverwriteMode="dynamic" \
--conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
--conf spark.hadoop.dfs.replication=1 \
--conf spark.shuffle.io.numConnectionsPerPeer=5 \
--conf spark.locality.wait=3s \
--conf spark.shuffle.push.enabled=true \
--conf spark.shuffle.push.maxRetainedMergerLocations=1500 \
--conf spark.shuffle.service.enabled=true \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf
spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver
\
--conf spark.yarn.shuffle.stopOnFailure=false \
--conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \
--conf spark.shuffle.push.mergersMinStaticThreshold=1

which cause the stage to be retried multiple times. Do you know if there is
something obvious that might be wrong with this setup? Thank you so much
for your time and consideration!

Best,
Han

Reply via email to