Hi, Han.
Thanks for trying out the push based shuffle.
Please make sure you configure both the Spark client side configuration and
server side configurations.
The client side configuration looks good, and from the error message, looks
like you are missing the server side configurations.
Please refer to this blog post about how to try out push based shuffle.
https://engineering.linkedin.com/blog/2021/push-based-shuffle-in-apache-spark.
And also this documentation about how to configure properly for External
shuffle service in YARN environment.
https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
In our environment, we added this parameter to enable server side push
based shuffle in yarn-site.xml for NodeManager configurations:
  <property>
    <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
    <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
  </property>

On Tue, May 24, 2022 at 3:30 PM Mridul Muralidharan <mri...@gmail.com>
wrote:

> +CC zhouye...@gmail.com
>
>
> On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran <alta...@mit.edu> wrote:
>
>> Hi,
>>
>> First of all, I am very thankful for all of the amazing work that goes
>> into this project! It has opened up so many doors for me! I am a long
>> time Spark user, and was very excited to start working with the push-based
>> shuffle service for an academic paper we are working on, but I encountered
>> some difficulties along the way and am wondering if someone could help me
>> resolve this new feature. I was able to get the push-based shuffle running
>> on my yarn setup (I am using Dataproc but I added an additional spark 3.2
>> installation on top of the dataproc base installations using a custom
>> image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
>> it with the new one for spark 3.2), however the main issue is that when I
>> actually try to use spark shuffles using the push-based shuffle, I
>> consistently encounter errors of the following sort:
>>
>> 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 163.0 in stage 3.1 (TID 16729)
>> (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
>> FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
>> 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
>> org.apache.spark.shuffle.FetchFailedException
>>         at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
>>         at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
>>         at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
>>         at
>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>         at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>         at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
>> Source)
>>         at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
>> Source)
>>         at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>> Source)
>>         at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>         at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>>         at
>> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>>         at
>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:750)
>> Caused by: java.io.IOException: Failed to send RPC
>> StreamChunkId[streamId=1514743314249,chunkIndex=59] to
>> cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
>> java.io.IOException: Connection reset by peer
>>         at
>> org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
>>         at
>> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
>>         at
>> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>>         at
>> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
>>         at
>> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>>         at
>> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
>>         at
>> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
>>         at
>> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
>>         at
>> org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151)
>>         at
>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297)
>>         at
>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>>         at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>         at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>         at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>         at
>> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>         at
>> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>         at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>         at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>>         at
>> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>         at
>> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>> Caused by: java.io.IOException: Connection reset by peer
>>         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>>         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>         at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>>         at
>> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408)
>>         at
>> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
>>         at
>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
>>         at
>> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
>>         at
>> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
>>         at
>> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>> 22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler: Exception
>> encountered when trying to finalize shuffle merge on
>> cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1
>> java.lang.RuntimeException: java.lang.UnsupportedOperationException:
>> Cannot handle shuffle block merge
>>
>> This arises using the following conf:
>> PYSPARK_DRIVER_PYTHON=`which ipython` \
>> PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python \
>> pyspark --master yarn \
>> --deploy-mode client \
>> --driver-memory 50g \
>> --conf spark.executor.memory=114000m \
>> --conf spark.task.cpus=1
>> --conf spark.executor.cores=32 \
>> --conf spark.dynamicAllocation.enabled=true \
>> --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \
>> --num-executors 500 \
>> --conf spark.task.maxFailures=30 \
>> --conf spark.storage.replication.active=true \
>> --conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \
>> --conf spark.executor.memoryOverhead=2048m
>> --conf spark.stage.maxConsecutiveAttempts=1000 -\
>> -conf spark.default.parallelism=10811 \
>> --conf spark.sql.shuffle.partitions=10811 \
>> --conf spark.sql.sources.partitionOverwriteMode="dynamic" \
>> --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
>> --conf spark.hadoop.dfs.replication=1 \
>> --conf spark.shuffle.io.numConnectionsPerPeer=5 \
>> --conf spark.locality.wait=3s \
>> --conf spark.shuffle.push.enabled=true \
>> --conf spark.shuffle.push.maxRetainedMergerLocations=1500 \
>> --conf spark.shuffle.service.enabled=true \
>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>> --conf
>> spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver
>> \
>> --conf spark.yarn.shuffle.stopOnFailure=false \
>> --conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \
>> --conf spark.shuffle.push.mergersMinStaticThreshold=1
>>
>> which cause the stage to be retried multiple times. Do you know if there
>> is something obvious that might be wrong with this setup? Thank you so much
>> for your time and consideration!
>>
>> Best,
>> Han
>>
>

-- 

*Zhou, Ye  **周晔*

Reply via email to