Repository: spark
Updated Branches:
  refs/heads/branch-1.6 70d4edda8 -> ff3497542


[SPARK-4134][CORE] Lower severity of some executor loss logs.

Don't log ERROR messages when executors are explicitly killed or when
the exit reason is not yet known.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #9780 from vanzin/SPARK-11789.

(cherry picked from commit 880128f37e1bc0b9d98d1786670be62a06c648f2)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff349754
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff349754
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff349754

Branch: refs/heads/branch-1.6
Commit: ff3497542f1b1f8034d1f185157dc99e637f06a1
Parents: 70d4edd
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Nov 19 16:49:18 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Nov 19 16:49:25 2015 -0800

----------------------------------------------------------------------
 .../spark/scheduler/ExecutorLossReason.scala    |  2 +
 .../spark/scheduler/TaskSchedulerImpl.scala     | 44 +++++++++++++-------
 .../apache/spark/scheduler/TaskSetManager.scala |  1 +
 .../cluster/CoarseGrainedSchedulerBackend.scala | 18 +++++---
 .../spark/deploy/yarn/YarnAllocator.scala       |  4 +-
 5 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff349754/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 47a5cbf..7e1197d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,8 @@ private[spark] object ExecutorExited {
   }
 }
 
+private[spark] object ExecutorKilled extends ExecutorLossReason("Executor 
killed by driver.")
+
 /**
  * A loss reason that means we don't yet know why the executor exited.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/ff349754/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bf0419d..bdf19f9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -470,25 +470,25 @@ private[spark] class TaskSchedulerImpl(
     synchronized {
       if (executorIdToTaskCount.contains(executorId)) {
         val hostPort = executorIdToHost(executorId)
-        logError("Lost executor %s on %s: %s".format(executorId, hostPort, 
reason))
+        logExecutorLoss(executorId, hostPort, reason)
         removeExecutor(executorId, reason)
         failedExecutor = Some(executorId)
       } else {
-         executorIdToHost.get(executorId) match {
-           case Some(_) =>
-             // If the host mapping still exists, it means we don't know the 
loss reason for the
-             // executor. So call removeExecutor() to update tasks running on 
that executor when
-             // the real loss reason is finally known.
-             logError(s"Actual reason for lost executor $executorId: 
${reason.message}")
-             removeExecutor(executorId, reason)
-
-           case None =>
-             // We may get multiple executorLost() calls with different loss 
reasons. For example,
-             // one may be triggered by a dropped connection from the slave 
while another may be a
-             // report of executor termination from Mesos. We produce log 
messages for both so we
-             // eventually report the termination reason.
-             logError("Lost an executor " + executorId + " (already removed): 
" + reason)
-         }
+        executorIdToHost.get(executorId) match {
+          case Some(hostPort) =>
+            // If the host mapping still exists, it means we don't know the 
loss reason for the
+            // executor. So call removeExecutor() to update tasks running on 
that executor when
+            // the real loss reason is finally known.
+            logExecutorLoss(executorId, hostPort, reason)
+            removeExecutor(executorId, reason)
+
+          case None =>
+            // We may get multiple executorLost() calls with different loss 
reasons. For example,
+            // one may be triggered by a dropped connection from the slave 
while another may be a
+            // report of executor termination from Mesos. We produce log 
messages for both so we
+            // eventually report the termination reason.
+            logError(s"Lost an executor $executorId (already removed): 
$reason")
+        }
       }
     }
     // Call dagScheduler.executorLost without holding the lock on this to 
prevent deadlock
@@ -498,6 +498,18 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  private def logExecutorLoss(
+      executorId: String,
+      hostPort: String,
+      reason: ExecutorLossReason): Unit = reason match {
+    case LossReasonPending =>
+      logDebug(s"Executor $executorId on $hostPort lost, but reason not yet 
known.")
+    case ExecutorKilled =>
+      logInfo(s"Executor $executorId on $hostPort killed by driver.")
+    case _ =>
+      logError(s"Lost executor $executorId on $hostPort: $reason")
+  }
+
   /**
    * Remove an executor from all our data structures and mark it as lost. If 
the executor's loss
    * reason is not yet known, do not yet remove its association with its host 
nor update the status

http://git-wip-us.apache.org/repos/asf/spark/blob/ff349754/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 114468c..a02f301 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -800,6 +800,7 @@ private[spark] class TaskSetManager(
     for ((tid, info) <- taskInfos if info.running && info.executorId == 
execId) {
       val exitCausedByApp: Boolean = reason match {
         case exited: ExecutorExited => exited.exitCausedByApp
+        case ExecutorKilled => false
         case _ => true
       }
       handleFailedTask(tid, TaskState.FAILED, 
ExecutorLostFailure(info.executorId, exitCausedByApp,

http://git-wip-us.apache.org/repos/asf/spark/blob/ff349754/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6f0c910..505c161 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -64,8 +64,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   private val listenerBus = scheduler.sc.listenerBus
 
-  // Executors we have requested the cluster manager to kill that have not 
died yet
-  private val executorsPendingToRemove = new HashSet[String]
+  // Executors we have requested the cluster manager to kill that have not 
died yet; maps
+  // the executor ID to whether it was explicitly killed by the driver (and 
thus shouldn't
+  // be considered an app-related failure).
+  private val executorsPendingToRemove = new HashMap[String, Boolean]
 
   // A map to store hostname with its possible task number running on it
   protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
@@ -250,15 +252,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
-          CoarseGrainedSchedulerBackend.this.synchronized {
+          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
-            executorsPendingToRemove -= executorId
             executorsPendingLossReason -= executorId
+            executorsPendingToRemove.remove(executorId).getOrElse(false)
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
-          scheduler.executorLost(executorId, reason)
+          scheduler.executorLost(executorId, if (killed) ExecutorKilled else 
reason)
           listenerBus.post(
             SparkListenerExecutorRemoved(System.currentTimeMillis(), 
executorId, reason.toString))
         case None => logInfo(s"Asked to remove non-existent executor 
$executorId")
@@ -459,6 +461,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Request that the cluster manager kill the specified executors.
    *
+   * When asking the executor to be replaced, the executor loss is considered 
a failure, and
+   * killed tasks that are running on the executor will count towards the 
failure limits. If no
+   * replacement is being requested, then the tasks will not count towards the 
limit.
+   *
    * @param executorIds identifiers of executors to kill
    * @param replace whether to replace the killed executors with new ones
    * @param force whether to force kill busy executors
@@ -479,7 +485,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     val executorsToKill = knownExecutors
       .filter { id => !executorsPendingToRemove.contains(id) }
       .filter { id => force || !scheduler.isExecutorBusy(id) }
-    executorsPendingToRemove ++= executorsToKill
+    executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
 
     // If we do not wish to replace the executors we kill, sync the target 
number of executors
     // with the cluster manager to avoid allocating new ones. When computing 
the new target,

http://git-wip-us.apache.org/repos/asf/spark/blob/ff349754/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7e39c3e..73cd903 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -481,7 +481,7 @@ private[yarn] class YarnAllocator(
             (true, memLimitExceededLogMessage(
               completedContainer.getDiagnostics,
               PMEM_EXCEEDED_PATTERN))
-          case unknown =>
+          case _ =>
             numExecutorsFailed += 1
             (true, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
@@ -493,7 +493,7 @@ private[yarn] class YarnAllocator(
         } else {
           logInfo(containerExitReason)
         }
-        ExecutorExited(0, exitCausedByApp, containerExitReason)
+        ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
       } else {
         // If we have already released this container, then it must mean
         // that the driver has explicitly requested it to be killed


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to