Repository: spark Updated Branches: refs/heads/branch-2.2 8e85ce625 -> 0ef98fd43
[SPARK-21321][SPARK CORE] Spark very verbose on shutdown ## What changes were proposed in this pull request? The current code is very verbose on shutdown. The changes I propose is to change the log level when the driver is shutting down and the RPC connections are closed (RpcEnvStoppedException). ## How was this patch tested? Tested with word count(deploy-mode = cluster, master = yarn, num-executors = 4) with 300GB of data. Author: John Lee <jl...@yahoo-inc.com> Closes #18547 from yoonlee95/SPARK-21321. (cherry picked from commit 0e07a29cf4a5587f939585e6885ed0f7e68c31b5) Signed-off-by: Tom Graves <tgra...@yahoo-inc.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ef98fd4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ef98fd4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ef98fd4 Branch: refs/heads/branch-2.2 Commit: 0ef98fd435ff77196780c2cad6e1bda377b2642f Parents: 8e85ce6 Author: John Lee <jl...@yahoo-inc.com> Authored: Mon Jul 17 13:13:35 2017 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Mon Jul 17 13:14:25 2017 -0500 ---------------------------------------------------------------------- .../scala/org/apache/spark/rpc/netty/Dispatcher.scala | 7 +++++-- .../main/scala/org/apache/spark/rpc/netty/Inbox.scala | 7 ++++++- .../scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 7 +++++-- .../main/scala/org/apache/spark/rpc/netty/Outbox.scala | 2 +- .../org/apache/spark/scheduler/LiveListenerBus.scala | 2 +- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 11 +++++++++-- 6 files changed, 27 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index a02cf30..e94babb 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -109,8 +109,11 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val iter = endpoints.keySet().iterator() while (iter.hasNext) { val name = iter.next - postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}")) - } + postMessage(name, message, (e) => { e match { + case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}") + case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}") + }} + )} } /** Posts a message sent by a remote endpoint. */ http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index ae4a600..d32eba6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -205,7 +205,12 @@ private[netty] class Inbox( try action catch { case NonFatal(e) => try endpoint.onError(e) catch { - case NonFatal(ee) => logError(s"Ignoring error", ee) + case NonFatal(ee) => + if (stopped) { + logDebug("Ignoring error", ee) + } else { + logError("Ignoring error", ee) + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index b316e54..6489849 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -185,7 +185,7 @@ private[netty] class NettyRpcEnv( try { dispatcher.postOneWayMessage(message) } catch { - case e: RpcEnvStoppedException => logWarning(e.getMessage) + case e: RpcEnvStoppedException => logDebug(e.getMessage) } } else { // Message to a remote RPC endpoint. @@ -203,7 +203,10 @@ private[netty] class NettyRpcEnv( def onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { - logWarning(s"Ignored failure: $e") + e match { + case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") + case _ => logWarning(s"Ignored failure: $e") + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index a7b7f58..b7e068a 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -45,7 +45,7 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo override def onFailure(e: Throwable): Unit = { e match { - case e1: RpcEnvStoppedException => logWarning(e1.getMessage) + case e1: RpcEnvStoppedException => logDebug(e1.getMessage) case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1) } } http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 5533f7b..73e9141 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -121,7 +121,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP - logError(s"$name has already stopped! Dropping event $event") + logDebug(s"$name has already stopped! Dropping event $event") return } val eventAdded = eventQueue.offer(event) http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index cbc6e60..8452f43 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler.cluster +import java.util.concurrent.atomic.{AtomicBoolean} + import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} import scala.util.control.NonFatal @@ -40,6 +42,8 @@ private[spark] abstract class YarnSchedulerBackend( sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + private val stopped = new AtomicBoolean(false) + override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 @@ -93,6 +97,7 @@ private[spark] abstract class YarnSchedulerBackend( requestTotalExecutors(0, 0, Map.empty) super.stop() } finally { + stopped.set(true) services.stop() } } @@ -206,8 +211,10 @@ private[spark] abstract class YarnSchedulerBackend( */ override def onDisconnected(rpcAddress: RpcAddress): Unit = { addressToExecutorId.get(rpcAddress).foreach { executorId => - if (disableExecutor(executorId)) { - yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress) + if (!stopped.get) { + if (disableExecutor(executorId)) { + yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org