Repository: spark
Updated Branches:
  refs/heads/branch-2.2 8e85ce625 -> 0ef98fd43


[SPARK-21321][SPARK CORE] Spark very verbose on shutdown

## What changes were proposed in this pull request?

The current code is very verbose on shutdown.

The changes I propose is to change the log level when the driver is shutting 
down and the RPC connections are closed (RpcEnvStoppedException).

## How was this patch tested?

Tested with word count(deploy-mode = cluster, master = yarn, num-executors = 4) 
with 300GB of data.

Author: John Lee <jl...@yahoo-inc.com>

Closes #18547 from yoonlee95/SPARK-21321.

(cherry picked from commit 0e07a29cf4a5587f939585e6885ed0f7e68c31b5)
Signed-off-by: Tom Graves <tgra...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ef98fd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ef98fd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ef98fd4

Branch: refs/heads/branch-2.2
Commit: 0ef98fd435ff77196780c2cad6e1bda377b2642f
Parents: 8e85ce6
Author: John Lee <jl...@yahoo-inc.com>
Authored: Mon Jul 17 13:13:35 2017 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Mon Jul 17 13:14:25 2017 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/rpc/netty/Dispatcher.scala    |  7 +++++--
 .../main/scala/org/apache/spark/rpc/netty/Inbox.scala    |  7 ++++++-
 .../scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala   |  7 +++++--
 .../main/scala/org/apache/spark/rpc/netty/Outbox.scala   |  2 +-
 .../org/apache/spark/scheduler/LiveListenerBus.scala     |  2 +-
 .../spark/scheduler/cluster/YarnSchedulerBackend.scala   | 11 +++++++++--
 6 files changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index a02cf30..e94babb 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -109,8 +109,11 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
     val iter = endpoints.keySet().iterator()
     while (iter.hasNext) {
       val name = iter.next
-      postMessage(name, message, (e) => logWarning(s"Message $message dropped. 
${e.getMessage}"))
-    }
+        postMessage(name, message, (e) => { e match {
+          case e: RpcEnvStoppedException => logDebug (s"Message $message 
dropped. ${e.getMessage}")
+          case e: Throwable => logWarning(s"Message $message dropped. 
${e.getMessage}")
+        }}
+      )}
   }
 
   /** Posts a message sent by a remote endpoint. */

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index ae4a600..d32eba6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -205,7 +205,12 @@ private[netty] class Inbox(
     try action catch {
       case NonFatal(e) =>
         try endpoint.onError(e) catch {
-          case NonFatal(ee) => logError(s"Ignoring error", ee)
+          case NonFatal(ee) =>
+            if (stopped) {
+              logDebug("Ignoring error", ee)
+            } else {
+              logError("Ignoring error", ee)
+            }
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index b316e54..6489849 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -185,7 +185,7 @@ private[netty] class NettyRpcEnv(
       try {
         dispatcher.postOneWayMessage(message)
       } catch {
-        case e: RpcEnvStoppedException => logWarning(e.getMessage)
+        case e: RpcEnvStoppedException => logDebug(e.getMessage)
       }
     } else {
       // Message to a remote RPC endpoint.
@@ -203,7 +203,10 @@ private[netty] class NettyRpcEnv(
 
     def onFailure(e: Throwable): Unit = {
       if (!promise.tryFailure(e)) {
-        logWarning(s"Ignored failure: $e")
+        e match {
+          case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
+          case _ => logWarning(s"Ignored failure: $e")
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index a7b7f58..b7e068a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -45,7 +45,7 @@ private[netty] case class OneWayOutboxMessage(content: 
ByteBuffer) extends Outbo
 
   override def onFailure(e: Throwable): Unit = {
     e match {
-      case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
+      case e1: RpcEnvStoppedException => logDebug(e1.getMessage)
       case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 5533f7b..73e9141 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -121,7 +121,7 @@ private[spark] class LiveListenerBus(val sparkContext: 
SparkContext) extends Spa
   def post(event: SparkListenerEvent): Unit = {
     if (stopped.get) {
       // Drop further events to make `listenerThread` exit ASAP
-      logError(s"$name has already stopped! Dropping event $event")
+      logDebug(s"$name has already stopped! Dropping event $event")
       return
     }
     val eventAdded = eventQueue.offer(event)

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef98fd4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index cbc6e60..8452f43 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler.cluster
 
+import java.util.concurrent.atomic.{AtomicBoolean}
+
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
@@ -40,6 +42,8 @@ private[spark] abstract class YarnSchedulerBackend(
     sc: SparkContext)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
 
+  private val stopped = new AtomicBoolean(false)
+
   override val minRegisteredRatio =
     if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) 
{
       0.8
@@ -93,6 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
       requestTotalExecutors(0, 0, Map.empty)
       super.stop()
     } finally {
+      stopped.set(true)
       services.stop()
     }
   }
@@ -206,8 +211,10 @@ private[spark] abstract class YarnSchedulerBackend(
      */
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       addressToExecutorId.get(rpcAddress).foreach { executorId =>
-        if (disableExecutor(executorId)) {
-          
yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, 
rpcAddress)
+        if (!stopped.get) {
+          if (disableExecutor(executorId)) {
+            
yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, 
rpcAddress)
+          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to