Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ff0a7f400 -> 1723f0591


[SPARK-6325] [core,yarn] Do not change target executor count when killing 
executors.

The dynamic execution code has two ways to reduce the number of executors: one
where it reduces the total number of executors it wants, by asking for an 
absolute
number of executors that is lower than the previous one. The second is by
explicitly killing idle executors.

YarnAllocator was mixing those up and lowering the target number of executors
when a kill was issued. Instead, trust the frontend knows what it's doing, and 
kill
executors without messing with other accounting. That means that if the frontend
kills an executor without lowering the target, it will get a new executor 
shortly.

The one situation where both actions (lower the target and kill executor) need 
to
happen together is when user code explicitly calls `SparkContext.killExecutors`.
In that case, issue two calls to the backend to achieve the goal.

I also did some minor cleanup in related code:
- avoid sending a request for executors when target is unchanged, to avoid log
  spam in the AM
- avoid printing misleading log messages in the AM when there are no requests
  to cancel
- fix a slow memory leak plus misleading error message on the driver caused by
  failing to completely unregister the executor.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #5018 from vanzin/SPARK-6325 and squashes the following commits:

2e782a3 [Marcelo Vanzin] Avoid redundant logging on the AM side.
a3567cd [Marcelo Vanzin] Add parentheses.
a363926 [Marcelo Vanzin] Update logic.
a158101 [Marcelo Vanzin] [SPARK-6325] [core,yarn] Disallow reducing executor 
count past running count.

(cherry picked from commit 981fbafa2a878e86abeefe1d77cca01fd848f9f6)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1723f059
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1723f059
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1723f059

Branch: refs/heads/branch-1.3
Commit: 1723f0591705d779b533dd1899706fef02e421ab
Parents: ff0a7f4
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Mar 18 09:18:28 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Mar 18 09:18:35 2015 -0400

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala |  7 +++++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  1 -
 .../spark/deploy/yarn/YarnAllocator.scala       | 13 +++++++-----
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 22 ++++++++++++++++++++
 4 files changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1723f059/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6f77fa3..87ebf31 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -211,6 +211,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
+            addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
             executorsPendingToRemove -= executorId
           }
@@ -371,6 +372,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
         logWarning(s"Executor to kill $id does not exist!")
       }
     }
+    // Killing executors means effectively that we want less executors than 
before, so also update
+    // the target number of executors to avoid having the backend allocate new 
ones.
+    val newTotal = (numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size
+      - filteredExecutorIds.size)
+    doRequestTotalExecutors(newTotal)
+
     executorsPendingToRemove ++= filteredExecutorIds
     doKillExecutors(filteredExecutorIds)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1723f059/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 796422b..418f904 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -534,7 +534,6 @@ private[spark] class ApplicationMaster(
         driver ! x
 
       case RequestExecutors(requestedTotal) =>
-        logInfo(s"Driver requested a total number of $requestedTotal 
executor(s).")
         Option(allocator) match {
           case Some(a) => a.requestTotalExecutors(requestedTotal)
           case None => logWarning("Container allocator is not ready to request 
executors yet.")

http://git-wip-us.apache.org/repos/asf/spark/blob/1723f059/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 55bfbcd..c98763e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -86,7 +86,8 @@ private[yarn] class YarnAllocator(
   @volatile private var targetNumExecutors = args.numExecutors
 
   // Keep track of which container is running which executor to remove the 
executors later
-  private val executorIdToContainer = new HashMap[String, Container]
+  // Visible for testing.
+  private[yarn] val executorIdToContainer = new HashMap[String, Container]
 
   // Executor memory in MB.
   protected val executorMemory = args.executorMemory
@@ -137,7 +138,10 @@ private[yarn] class YarnAllocator(
    * be killed.
    */
   def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
-    targetNumExecutors = requestedTotal
+    if (requestedTotal != targetNumExecutors) {
+      logInfo(s"Driver requested a total number of $requestedTotal 
executor(s).")
+      targetNumExecutors = requestedTotal
+    }
   }
 
   /**
@@ -148,8 +152,6 @@ private[yarn] class YarnAllocator(
       val container = executorIdToContainer.remove(executorId).get
       internalReleaseContainer(container)
       numExecutorsRunning -= 1
-      targetNumExecutors -= 1
-      assert(targetNumExecutors >= 0, "Allocator killed more executors than 
are allocated!")
     } else {
       logWarning(s"Attempted to kill unknown executor $executorId!")
     }
@@ -351,7 +353,8 @@ private[yarn] class YarnAllocator(
     }
   }
 
-  private def processCompletedContainers(completedContainers: 
Seq[ContainerStatus]): Unit = {
+  // Visible for testing.
+  private[yarn] def processCompletedContainers(completedContainers: 
Seq[ContainerStatus]): Unit = {
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1723f059/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 3c224f1..c09b01b 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -206,6 +206,28 @@ class YarnAllocatorSuite extends FunSuite with Matchers 
with BeforeAndAfterEach
     handler.getNumExecutorsRunning should be (2)
   }
 
+  test("kill executors") {
+    val handler = createAllocator(4)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (4)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2))
+
+    handler.requestTotalExecutors(1)
+    handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id 
) }
+
+    val statuses = Seq(container1, container2).map { c =>
+      ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, 
"Finished", 0)
+    }
+    handler.updateResourceRequests()
+    handler.processCompletedContainers(statuses.toSeq)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (1)
+  }
+
   test("memory exceeded diagnostic regexes") {
     val diagnostics =
       "Container 
[pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to