Hi, Han.
The configuration for External Shuffle Service(ESS) in YARN has to be
configured in yarn-site.xml for NodeManagers, as it is an auxiliary service
in NodeManager. We will try to improve the documentation for enabling push
based shuffle. Thanks for the feedback.

For the straggler issue, is the shuffle data skew heavily? The 20x long
running task is shuffle data generation or shuffle data reading? If
applicable, you can share the application log file through some way, we can
take a look through a local History Server.

Thanks.
Ye.

On Wed, May 25, 2022 at 7:15 PM Han Altae-Tran <alta...@mit.edu> wrote:

> Hi Ye,
>
> This is super super helpful! It wasn't obvious to me from the
> documentation that this property needed to be set in the yarn-site.xml
> file, as all other configurations in the main spark configuration page are
> set through spark conf. It was particularly confusing because this
> property, like most other spark related properties also begins with spark
> (i.e. spark.shuffle as opposed to yarn.spark.shuffle). Perhaps it would be
> useful to add some extra documentation explaining this there? In either
> case, thank you! I will test out the push-based service more thoroughly.
> From my initial tests, I can see the median task time finishes faster
> during the reduce stage when using pull-based shuffle, but unlike with the
> standard shuffle, there was an oddly large tail end with the max length
> tasks taking up to 20x longer. Is there a suggestion for what may be
> causing this and if there is a way to reduce this? It is somewhat odd as
> the pull-based shuffle also has the capability of reading data from the
> original blocks, so for some reason the choice of reading from merged
> pull-blocks appears to be slower perhaps. I am planning on running more
> systematics tests to understand these behaviors further. Thank you!
>
> Best,
> Han
>
> On Tue, May 24, 2022 at 8:07 PM Ye Zhou <zhouye...@gmail.com> wrote:
>
>> Hi, Han.
>> Thanks for trying out the push based shuffle.
>> Please make sure you configure both the Spark client side configuration
>> and server side configurations.
>> The client side configuration looks good, and from the error message,
>> looks like you are missing the server side configurations.
>> Please refer to this blog post about how to try out push based shuffle.
>> https://engineering.linkedin.com/blog/2021/push-based-shuffle-in-apache-spark.
>> And also this documentation about how to configure properly for External
>> shuffle service in YARN environment.
>> https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
>> In our environment, we added this parameter to enable server side push
>> based shuffle in yarn-site.xml for NodeManager configurations:
>>   <property>
>>     <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
>>
>> <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
>>   </property>
>>
>> On Tue, May 24, 2022 at 3:30 PM Mridul Muralidharan <mri...@gmail.com>
>> wrote:
>>
>>> +CC zhouye...@gmail.com
>>>
>>>
>>> On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran <alta...@mit.edu> wrote:
>>>
>>>> Hi,
>>>>
>>>> First of all, I am very thankful for all of the amazing work that goes
>>>> into this project! It has opened up so many doors for me! I am a long
>>>> time Spark user, and was very excited to start working with the push-based
>>>> shuffle service for an academic paper we are working on, but I encountered
>>>> some difficulties along the way and am wondering if someone could help me
>>>> resolve this new feature. I was able to get the push-based shuffle running
>>>> on my yarn setup (I am using Dataproc but I added an additional spark 3.2
>>>> installation on top of the dataproc base installations using a custom
>>>> image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
>>>> it with the new one for spark 3.2), however the main issue is that when I
>>>> actually try to use spark shuffles using the push-based shuffle, I
>>>> consistently encounter errors of the following sort:
>>>>
>>>> 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>>>> task 163.0 in stage 3.1 (TID 16729)
>>>> (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
>>>> FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
>>>> 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
>>>> org.apache.spark.shuffle.FetchFailedException
>>>>         at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
>>>>         at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
>>>>         at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
>>>>         at
>>>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>>>         at
>>>> scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>>>>         at
>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>>>>         at
>>>> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>>>         at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>>>         at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>>>         at
>>>> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>>>         at
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>>>>         at
>>>> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>>>         at
>>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>>>>         at
>>>> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>>>>         at
>>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>         at java.lang.Thread.run(Thread.java:750)
>>>> Caused by: java.io.IOException: Failed to send RPC
>>>> StreamChunkId[streamId=1514743314249,chunkIndex=59] to
>>>> cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
>>>> java.io.IOException: Connection reset by peer
>>>>         at
>>>> org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
>>>>         at
>>>> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
>>>>         at
>>>> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>>>>         at
>>>> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
>>>>         at
>>>> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>>>>         at
>>>> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
>>>>         at
>>>> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
>>>>         at
>>>> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
>>>>         at
>>>> org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151)
>>>>         at
>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297)
>>>>         at
>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
>>>>         at
>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
>>>>         at
>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>>>>         at
>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>>         at
>>>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>>         at
>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>>         at
>>>> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>>         at
>>>> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>>         at
>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>>>         at
>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>>>>         at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
>>>>         at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
>>>>         at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
>>>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>>>>         at
>>>> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>>>>         at
>>>> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>>         at
>>>> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>>>> Caused by: java.io.IOException: Connection reset by peer
>>>>         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>>>>         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>         at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>>>>         at
>>>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>>>>         at
>>>> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408)
>>>>         at
>>>> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
>>>>         at
>>>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
>>>>         at
>>>> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
>>>>         at
>>>> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
>>>>         at
>>>> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>>>>         at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>>>> 22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler:
>>>> Exception encountered when trying to finalize shuffle merge on
>>>> cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1
>>>> java.lang.RuntimeException: java.lang.UnsupportedOperationException:
>>>> Cannot handle shuffle block merge
>>>>
>>>> This arises using the following conf:
>>>> PYSPARK_DRIVER_PYTHON=`which ipython` \
>>>> PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python
>>>> \
>>>> pyspark --master yarn \
>>>> --deploy-mode client \
>>>> --driver-memory 50g \
>>>> --conf spark.executor.memory=114000m \
>>>> --conf spark.task.cpus=1
>>>> --conf spark.executor.cores=32 \
>>>> --conf spark.dynamicAllocation.enabled=true \
>>>> --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \
>>>> --num-executors 500 \
>>>> --conf spark.task.maxFailures=30 \
>>>> --conf spark.storage.replication.active=true \
>>>> --conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \
>>>> --conf spark.executor.memoryOverhead=2048m
>>>> --conf spark.stage.maxConsecutiveAttempts=1000 -\
>>>> -conf spark.default.parallelism=10811 \
>>>> --conf spark.sql.shuffle.partitions=10811 \
>>>> --conf spark.sql.sources.partitionOverwriteMode="dynamic" \
>>>> --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
>>>> --conf spark.hadoop.dfs.replication=1 \
>>>> --conf spark.shuffle.io.numConnectionsPerPeer=5 \
>>>> --conf spark.locality.wait=3s \
>>>> --conf spark.shuffle.push.enabled=true \
>>>> --conf spark.shuffle.push.maxRetainedMergerLocations=1500 \
>>>> --conf spark.shuffle.service.enabled=true \
>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>> --conf
>>>> spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver
>>>> \
>>>> --conf spark.yarn.shuffle.stopOnFailure=false \
>>>> --conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \
>>>> --conf spark.shuffle.push.mergersMinStaticThreshold=1
>>>>
>>>> which cause the stage to be retried multiple times. Do you know if
>>>> there is something obvious that might be wrong with this setup? Thank you
>>>> so much for your time and consideration!
>>>>
>>>> Best,
>>>> Han
>>>>
>>>
>>
>> --
>>
>> *Zhou, Ye  **周晔*
>>
>

-- 

*Zhou, Ye  **周晔*

Reply via email to