Hi Ye,

This is super super helpful! It wasn't obvious to me from the documentation
that this property needed to be set in the yarn-site.xml file, as all other
configurations in the main spark configuration page are set through spark
conf. It was particularly confusing because this property, like most other
spark related properties also begins with spark (i.e. spark.shuffle as
opposed to yarn.spark.shuffle). Perhaps it would be useful to add some
extra documentation explaining this there? In either case, thank you! I
will test out the push-based service more thoroughly. From my initial
tests, I can see the median task time finishes faster during the reduce
stage when using pull-based shuffle, but unlike with the standard shuffle,
there was an oddly large tail end with the max length tasks taking up to
20x longer. Is there a suggestion for what may be causing this and if there
is a way to reduce this? It is somewhat odd as the pull-based shuffle also
has the capability of reading data from the original blocks, so for some
reason the choice of reading from merged pull-blocks appears to be slower
perhaps. I am planning on running more systematics tests to understand
these behaviors further. Thank you!

Best,
Han

On Tue, May 24, 2022 at 8:07 PM Ye Zhou <zhouye...@gmail.com> wrote:

> Hi, Han.
> Thanks for trying out the push based shuffle.
> Please make sure you configure both the Spark client side configuration
> and server side configurations.
> The client side configuration looks good, and from the error message,
> looks like you are missing the server side configurations.
> Please refer to this blog post about how to try out push based shuffle.
> https://engineering.linkedin.com/blog/2021/push-based-shuffle-in-apache-spark.
> And also this documentation about how to configure properly for External
> shuffle service in YARN environment.
> https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
> In our environment, we added this parameter to enable server side push
> based shuffle in yarn-site.xml for NodeManager configurations:
>   <property>
>     <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
>     <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
>   </property>
>
> On Tue, May 24, 2022 at 3:30 PM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
>> +CC zhouye...@gmail.com
>>
>>
>> On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran <alta...@mit.edu> wrote:
>>
>>> Hi,
>>>
>>> First of all, I am very thankful for all of the amazing work that goes
>>> into this project! It has opened up so many doors for me! I am a long
>>> time Spark user, and was very excited to start working with the push-based
>>> shuffle service for an academic paper we are working on, but I encountered
>>> some difficulties along the way and am wondering if someone could help me
>>> resolve this new feature. I was able to get the push-based shuffle running
>>> on my yarn setup (I am using Dataproc but I added an additional spark 3.2
>>> installation on top of the dataproc base installations using a custom
>>> image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
>>> it with the new one for spark 3.2), however the main issue is that when I
>>> actually try to use spark shuffles using the push-based shuffle, I
>>> consistently encounter errors of the following sort:
>>>
>>> 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>>> task 163.0 in stage 3.1 (TID 16729)
>>> (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
>>> FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
>>> 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
>>> org.apache.spark.shuffle.FetchFailedException
>>>         at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
>>>         at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
>>>         at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
>>>         at
>>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>>         at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>>         at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
>>> Source)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
>>> Source)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>>> Source)
>>>         at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>>         at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>>>         at
>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>>>         at
>>> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>>>         at
>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:750)
>>> Caused by: java.io.IOException: Failed to send RPC
>>> StreamChunkId[streamId=1514743314249,chunkIndex=59] to
>>> cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
>>> java.io.IOException: Connection reset by peer
>>>         at
>>> org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
>>>         at
>>> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
>>>         at
>>> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>>>         at
>>> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
>>>         at
>>> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>>>         at
>>> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
>>>         at
>>> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
>>>         at
>>> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
>>>         at
>>> org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151)
>>>         at
>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297)
>>>         at
>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
>>>         at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
>>>         at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>>>         at
>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>         at
>>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>         at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>         at
>>> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>>         at
>>> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>>         at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>>         at
>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>>>         at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
>>>         at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
>>>         at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
>>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>>>         at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>>>         at
>>> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>         at
>>> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>>> Caused by: java.io.IOException: Connection reset by peer
>>>         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>>>         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>         at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>>>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>>>         at
>>> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408)
>>>         at
>>> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
>>>         at
>>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
>>>         at
>>> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
>>>         at
>>> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
>>>         at
>>> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>>>         at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>>> 22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler:
>>> Exception encountered when trying to finalize shuffle merge on
>>> cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1
>>> java.lang.RuntimeException: java.lang.UnsupportedOperationException:
>>> Cannot handle shuffle block merge
>>>
>>> This arises using the following conf:
>>> PYSPARK_DRIVER_PYTHON=`which ipython` \
>>> PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python \
>>> pyspark --master yarn \
>>> --deploy-mode client \
>>> --driver-memory 50g \
>>> --conf spark.executor.memory=114000m \
>>> --conf spark.task.cpus=1
>>> --conf spark.executor.cores=32 \
>>> --conf spark.dynamicAllocation.enabled=true \
>>> --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \
>>> --num-executors 500 \
>>> --conf spark.task.maxFailures=30 \
>>> --conf spark.storage.replication.active=true \
>>> --conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \
>>> --conf spark.executor.memoryOverhead=2048m
>>> --conf spark.stage.maxConsecutiveAttempts=1000 -\
>>> -conf spark.default.parallelism=10811 \
>>> --conf spark.sql.shuffle.partitions=10811 \
>>> --conf spark.sql.sources.partitionOverwriteMode="dynamic" \
>>> --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
>>> --conf spark.hadoop.dfs.replication=1 \
>>> --conf spark.shuffle.io.numConnectionsPerPeer=5 \
>>> --conf spark.locality.wait=3s \
>>> --conf spark.shuffle.push.enabled=true \
>>> --conf spark.shuffle.push.maxRetainedMergerLocations=1500 \
>>> --conf spark.shuffle.service.enabled=true \
>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>> --conf
>>> spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver
>>> \
>>> --conf spark.yarn.shuffle.stopOnFailure=false \
>>> --conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \
>>> --conf spark.shuffle.push.mergersMinStaticThreshold=1
>>>
>>> which cause the stage to be retried multiple times. Do you know if there
>>> is something obvious that might be wrong with this setup? Thank you so much
>>> for your time and consideration!
>>>
>>> Best,
>>> Han
>>>
>>
>
> --
>
> *Zhou, Ye  **周晔*
>

Reply via email to