[ https://issues.apache.org/jira/browse/SPARK-31664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-31664: ------------------------------------ Assignee: Apache Spark > Race in YARN scheduler shutdown leads to uncaught SparkException "Could not > find CoarseGrainedScheduler" > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-31664 > URL: https://issues.apache.org/jira/browse/SPARK-31664 > Project: Spark > Issue Type: Bug > Components: Scheduler, YARN > Affects Versions: 3.0.0, 3.0.1, 3.1.0 > Reporter: Baohe Zhang > Assignee: Apache Spark > Priority: Minor > > I used this command to run SparkPi on a yarn cluster with dynamicAllocation > enabled: "$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster > --class org.apache.spark.examples.SparkPi ./spark-examples.jar 1000" and > received error log below every time. > > {code:java} > 20/05/06 16:31:44 ERROR TransportRequestHandler: Error while invoking > RpcHandler#receive() for one-way message. > org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. > at > org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:169) > at > org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:684) > at > org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > at java.lang.Thread.run(Thread.java:748) > 20/05/06 16:31:45 INFO MapOutputTrackerMasterEndpoint: > MapOutputTrackerMasterEndpoint stopped! > 20/05/06 16:31:45 INFO MemoryStore: MemoryStore cleared > 20/05/06 16:31:45 INFO BlockManager: BlockManager stopped > {code} > > After some investigation, I found this issue might be introduced in > [https://github.com/apache/spark/pull/25964]. There is a race between driver > backend and executor backend that could happen when driver shutdown. > > PR#25964 added a new message type LaunchedExecutor and updated the > communication mechanism between executor and driver when launching executor > to: > # executor backend sends "RegisterExecutor" to the driver backend. > # the driver backend replies "true". > # executor backend instantiates executor once it receives "true" from driver > backend. > # after the executor is instantiated, the executor backend sends > "LaunchedExecutor" to the driver backend. > # the driver backend makes offers for executor when received > "LaunchedExecutor". > So the issue occurs in steps 3 and 4. If the driver backend is stopped(hence > driver endpoint removed in dispatcher) during step 3, in step 4, when > executor backend tries to send "LaunchedExecutor" to driver backend, RPC > dispatcher will throw a SparkException for "Could not find > CoarseGrainedScheduler". These exception logs are verbose and somewhat > misleading. > > This race can be fixed or greatly alleviated through these changes: > When the stop() in CoarseGrainedSchedulerBackend is called: > # A stopping boolean variable is set to true. > # driverEndpoint will not be stopped at this time. (dispatcher will stop it > at the end) > And when the stopping is set to true, the driver backend will: > # replies sendFailure to executor backend when receives "RegisterExecutor". > # replies "StopExecutor" to executor backend (or "RemoveExecutor" to self) > when receives "LaunchedExecutor" -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org