Repository: spark
Updated Branches:
  refs/heads/master 4286cba7d -> 51066b437


[SPARK-14228][CORE][YARN] Lost executor of RPC disassociated, and occurs 
exception: Could not find CoarseGrainedScheduler or it has been stopped

## What changes were proposed in this pull request?
I see the two instances where the exception is occurring.

**Instance 1:**

```
17/11/10 15:49:32 ERROR util.Utils: Uncaught exception in thread 
driver-revive-thread
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at 
org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
        at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:187)
        at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:521)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(CoarseGrainedSchedulerBackend.scala:125)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(CoarseGrainedSchedulerBackend.scala:125)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1.apply$mcV$sp(CoarseGrainedSchedulerBackend.scala:125)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1344)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1.run(CoarseGrainedSchedulerBackend.scala:124)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

In CoarseGrainedSchedulerBackend.scala, driver-revive-thread starts with 
DriverEndpoint.onStart() and keeps sending the ReviveOffers messages 
periodically till it gets shutdown as part DriverEndpoint.onStop(). There is no 
proper coordination between the driver-revive-thread(shutdown) and the 
RpcEndpoint unregister, RpcEndpoint unregister happens first and then 
driver-revive-thread shuts down as part of DriverEndpoint.onStop(), In-between 
driver-revive-thread may try to send the ReviveOffers message which is leading 
to the above exception.

To fix this issue, this PR moves the shutting down of driver-revive-thread to 
CoarseGrainedSchedulerBackend.stop() which executes before the DriverEndpoint 
unregister.

**Instance 2:**

```
17/11/10 16:31:38 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
Error requesting driver to remove executor 1 for reason Executor for container 
container_1508535467865_0226_01_000002 exited because of a YARN event (e.g., 
pre-emption) and not because of an error in the running job.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at 
org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
        at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
        at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:516)
        at org.apache.spark.rpc.RpcEndpointRef.ask(RpcEndpointRef.scala:63)
        at 
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:269)
        at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

Here YarnDriverEndpoint tries to send remove executor messages after the Yarn 
scheduler backend service stop, which is leading to the above exception. To 
avoid the above exception,
1) We may add a condition(which checks whether service has stopped or not) 
before sending executor remove message
2) Add a warn log message in onFailure case when the service is already stopped

In this PR, chosen the 2) option which adds a log message in the case of 
onFailure without the exception stack trace since the option 1) would need to 
to go through for every remove executor message.

## How was this patch tested?
I verified it manually, I don't see these exceptions with the PR changes.

Author: Devaraj K <deva...@apache.org>

Closes #19741 from devaraj-kavali/SPARK-14228.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51066b43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51066b43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51066b43

Branch: refs/heads/master
Commit: 51066b437b750933da03bb401c8356f4b48aefac
Parents: 4286cba
Author: Devaraj K <deva...@apache.org>
Authored: Wed Dec 6 10:39:15 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Dec 6 10:39:15 2017 -0800

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala |  6 +---
 .../cluster/CoarseGrainedSchedulerBackend.scala | 30 ++++++++------------
 ...KubernetesClusterSchedulerBackendSuite.scala |  6 ++--
 .../cluster/YarnSchedulerBackend.scala          | 20 ++++---------
 4 files changed, 22 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51066b43/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4c1f92a..9b62e4b 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -165,11 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     }
 
     if (notifyDriver && driver.nonEmpty) {
-      driver.get.ask[Boolean](
-        RemoveExecutor(executorId, new ExecutorLossReason(reason))
-      ).failed.foreach(e =>
-        logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
-      )(ThreadUtils.sameThread)
+      driver.get.send(RemoveExecutor(executorId, new 
ExecutorLossReason(reason)))
     }
 
     System.exit(code)

http://git-wip-us.apache.org/repos/asf/spark/blob/51066b43/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7bfb4d5..e6982f3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -95,6 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
+  private val reviveThread =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
+
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -103,9 +106,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
     protected val addressToExecutorId = new HashMap[RpcAddress, String]
 
-    private val reviveThread =
-      
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
-
     override def onStart() {
       // Periodically revive offers to allow delay scheduling to work
       val reviveIntervalMs = 
conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
@@ -154,6 +154,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         executorDataMap.values.foreach { ed =>
           ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
         }
+
+      case RemoveExecutor(executorId, reason) =>
+        // We will remove the executor's state and cannot restore it. However, 
the connection
+        // between the driver and the executor may be still alive so that the 
executor won't exit
+        // automatically, so try to tell the executor to stop itself. See 
SPARK-13519.
+        
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
+        removeExecutor(executorId, reason)
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
@@ -215,14 +222,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         }
         context.reply(true)
 
-      case RemoveExecutor(executorId, reason) =>
-        // We will remove the executor's state and cannot restore it. However, 
the connection
-        // between the driver and the executor may be still alive so that the 
executor won't exit
-        // automatically, so try to tell the executor to stop itself. See 
SPARK-13519.
-        
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
-        removeExecutor(executorId, reason)
-        context.reply(true)
-
       case RemoveWorker(workerId, host, message) =>
         removeWorker(workerId, host, message)
         context.reply(true)
@@ -373,10 +372,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       shouldDisable
     }
-
-    override def onStop() {
-      reviveThread.shutdownNow()
-    }
   }
 
   var driverEndpoint: RpcEndpointRef = null
@@ -417,6 +412,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   override def stop() {
+    reviveThread.shutdownNow()
     stopExecutors()
     try {
       if (driverEndpoint != null) {
@@ -465,9 +461,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    * at once.
    */
   protected def removeExecutor(executorId: String, reason: 
ExecutorLossReason): Unit = {
-    // Only log the failure since we don't care about the result.
-    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, 
reason)).failed.foreach(t =>
-      logError(t.getMessage, t))(ThreadUtils.sameThread)
+    driverEndpoint.send(RemoveExecutor(executorId, reason))
   }
 
   protected def removeWorker(workerId: String, host: String, message: String): 
Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/51066b43/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 3febb2f..13c0903 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -282,7 +282,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     // No more deletion attempts of the executors.
     // This is graceful termination and should not be detected as a failure.
     verify(podOperations, times(1)).delete(resolvedPod)
-    verify(driverEndpointRef, times(1)).ask[Boolean](
+    verify(driverEndpointRef, times(1)).send(
       RemoveExecutor("1", ExecutorExited(
         0,
         exitCausedByApp = false,
@@ -318,7 +318,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     requestExecutorRunnable.getValue.run()
     allocatorRunnable.getAllValues.asScala.last.run()
     verify(podOperations, never()).delete(firstResolvedPod)
-    verify(driverEndpointRef).ask[Boolean](
+    verify(driverEndpointRef).send(
       RemoveExecutor("1", ExecutorExited(
         1,
         exitCausedByApp = true,
@@ -356,7 +356,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
     allocatorRunnable.getValue.run()
     verify(podOperations).delete(firstResolvedPod)
-    verify(driverEndpointRef).ask[Boolean](
+    verify(driverEndpointRef).send(
       RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51066b43/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 415a29f..bb615c3 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.util.concurrent.atomic.{AtomicBoolean}
 
 import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
@@ -245,14 +246,7 @@ private[spark] abstract class YarnSchedulerBackend(
           Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not 
yet registered.")))
       }
 
-      removeExecutorMessage
-        .flatMap { message =>
-          driverEndpoint.ask[Boolean](message)
-        }(ThreadUtils.sameThread)
-        .onFailure {
-          case NonFatal(e) => logError(
-            s"Error requesting driver to remove executor $executorId after 
disconnection.", e)
-        }(ThreadUtils.sameThread)
+      removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
     }
 
     override def receive: PartialFunction[Any, Unit] = {
@@ -265,12 +259,10 @@ private[spark] abstract class YarnSchedulerBackend(
         addWebUIFilter(filterName, filterParams, proxyBase)
 
       case r @ RemoveExecutor(executorId, reason) =>
-        logWarning(reason.toString)
-        driverEndpoint.ask[Boolean](r).onFailure {
-          case e =>
-            logError("Error requesting driver to remove executor" +
-              s" $executorId for reason $reason", e)
-        }(ThreadUtils.sameThread)
+        if (!stopped.get) {
+          logWarning(s"Requesting driver to remove executor $executorId for 
reason $reason")
+          driverEndpoint.send(r)
+        }
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to