Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c83177d30 -> 38fe092ff


[SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue

Currently if dynamic allocation is enabled, explicitly killing executor will 
not get response, so the executor metadata is wrong in driver side. Which will 
make dynamic allocation on Yarn fail to work.

The problem is  `disableExecutor` returns false for pending killing executors 
when `onDisconnect` is detected, so no further implementation is done.

One solution is to bypass these explicitly killed executors to use 
`super.onDisconnect` to remove executor. This is simple.

Another solution is still querying the loss reason for these explicitly kill 
executors. Since executor may get killed and informed in the same AM-RM 
communication, so current way of adding pending loss reason request is not 
worked (container complete is already processed), here we should store this 
loss reason for later query.

Here this PR chooses solution 2.

Please help to review. vanzin I think this part is changed by you previously, 
would you please help to review? Thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #9684 from jerryshao/SPARK-11718.

(cherry picked from commit 24477d2705bcf2a851acc241deb8376c5450dc73)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38fe092f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38fe092f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38fe092f

Branch: refs/heads/branch-1.6
Commit: 38fe092ff3a1e86088a22adc934a4b4e269b0a47
Parents: c83177d
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Nov 16 11:43:18 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Nov 16 11:43:39 2015 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  1 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++--
 .../spark/deploy/yarn/YarnAllocator.scala       | 30 ++++++++++++++++----
 3 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 43d7d80..5f13669 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl(
              // If the host mapping still exists, it means we don't know the 
loss reason for the
              // executor. So call removeExecutor() to update tasks running on 
that executor when
              // the real loss reason is finally known.
+             logError(s"Actual reason for lost executor $executorId: 
${reason.message}")
              removeExecutor(executorId, reason)
 
            case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71d98f..3373caf 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
      * Stop making resource offers for the given executor. The executor is 
marked as lost with
      * the loss reason still pending.
      *
-     * @return Whether executor was alive.
+     * @return Whether executor should be disabled
      */
     protected def disableExecutor(executorId: String): Boolean = {
       val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
@@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           executorsPendingLossReason += executorId
           true
         } else {
-          false
+          // Returns true for explicitly killed executors, we also need to get 
pending loss reasons;
+          // For others return false.
+          executorsPendingToRemove.contains(executorId)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4d9e777..7e39c3e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver
 
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
@@ -96,6 +96,10 @@ private[yarn] class YarnAllocator(
   // was lost.
   private val pendingLossReasonRequests = new HashMap[String, 
mutable.Buffer[RpcCallContext]]
 
+  // Maintain loss reasons for already released executors, it will be added 
when executor loss
+  // reason is got from AM-RM call, and be removed after querying this loss 
reason.
+  private val releasedExecutorLossReasons = new HashMap[String, 
ExecutorLossReason]
+
   // Keep track of which container is running which executor to remove the 
executors later
   // Visible for testing.
   private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -202,8 +206,7 @@ private[yarn] class YarnAllocator(
    */
   def killExecutor(executorId: String): Unit = synchronized {
     if (executorIdToContainer.contains(executorId)) {
-      val container = executorIdToContainer.remove(executorId).get
-      containerIdToExecutorId.remove(container.getId)
+      val container = executorIdToContainer.get(executorId).get
       internalReleaseContainer(container)
       numExecutorsRunning -= 1
     } else {
@@ -514,9 +517,18 @@ private[yarn] class YarnAllocator(
 
       containerIdToExecutorId.remove(containerId).foreach { eid =>
         executorIdToContainer.remove(eid)
-        pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
-          // Notify application of executor loss reasons so it can decide 
whether it should abort
-          pendingRequests.foreach(_.reply(exitReason))
+        pendingLossReasonRequests.remove(eid) match {
+          case Some(pendingRequests) =>
+            // Notify application of executor loss reasons so it can decide 
whether it should abort
+            pendingRequests.foreach(_.reply(exitReason))
+
+          case None =>
+            // We cannot find executor for pending reasons. This is because 
completed container
+            // is processed before querying pending result. We should store it 
for later query.
+            // This is usually happened when explicitly killing a container, 
the result will be
+            // returned in one AM-RM communication. So query RPC will be later 
than this completed
+            // container process.
+            releasedExecutorLossReasons.put(eid, exitReason)
         }
         if (!alreadyReleased) {
           // The executor could have gone away (like no route to host, node 
failure, etc)
@@ -538,8 +550,14 @@ private[yarn] class YarnAllocator(
     if (executorIdToContainer.contains(eid)) {
       pendingLossReasonRequests
         .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+    } else if (releasedExecutorLossReasons.contains(eid)) {
+      // Executor is already released explicitly before getting the loss 
reason, so directly send
+      // the pre-stored lost reason
+      context.reply(releasedExecutorLossReasons.remove(eid).get)
     } else {
       logWarning(s"Tried to get the loss reason for non-existent executor 
$eid")
+      context.sendFailure(
+        new SparkException(s"Fail to find loss reason for non-existent 
executor $eid"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to