Repository: spark
Updated Branches:
  refs/heads/branch-2.0 214676d29 -> b8818d892


[SPARK-15725][YARN] Ensure ApplicationMaster sleeps for the min interval.

## What changes were proposed in this pull request?

Update `ApplicationMaster` to sleep for at least the minimum allocation 
interval before calling `allocateResources`. This prevents overloading the 
`YarnAllocator` that is happening because the thread is triggered when an 
executor is killed and its connections die. In YARN, this prevents the app from 
overloading the allocator and becoming unstable.

## How was this patch tested?

Tested that this allows the an app to recover instead of hanging. It is still 
possible for the YarnAllocator to be overwhelmed by requests, but this prevents 
the issue for the most common cause.

Author: Ryan Blue <b...@apache.org>

Closes #13482 from rdblue/SPARK-15725-am-sleep-work-around.

(cherry picked from commit a410814c87b120cb5cfbf095b1bd94b1de862844)
Signed-off-by: Tom Graves <tgra...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8818d89
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8818d89
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8818d89

Branch: refs/heads/branch-2.0
Commit: b8818d892a4f62ee4e8a1c16146b5ee4c7de9eab
Parents: 214676d
Author: Ryan Blue <b...@apache.org>
Authored: Thu Jun 23 13:54:37 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Thu Jun 23 13:54:53 2016 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 25 +++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8818d89/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 847d1de..b6f45dd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -457,8 +457,10 @@ private[spark] class ApplicationMaster(
           }
           try {
             val numPendingAllocate = allocator.getPendingAllocate.size
+            var sleepStart = 0L
+            var sleepInterval = 200L // ms
             allocatorLock.synchronized {
-              val sleepInterval =
+              sleepInterval =
                 if (numPendingAllocate > 0 || 
allocator.getNumPendingLossReasonRequests > 0) {
                   val currentAllocationInterval =
                     math.min(heartbeatInterval, nextAllocationInterval)
@@ -468,10 +470,27 @@ private[spark] class ApplicationMaster(
                   nextAllocationInterval = initialAllocationInterval
                   heartbeatInterval
                 }
-              logDebug(s"Number of pending allocations is $numPendingAllocate. 
" +
-                       s"Sleeping for $sleepInterval.")
+              sleepStart = System.currentTimeMillis()
               allocatorLock.wait(sleepInterval)
             }
+            val sleepDuration = System.currentTimeMillis() - sleepStart
+            if (sleepDuration < sleepInterval) {
+              // log when sleep is interrupted
+              logDebug(s"Number of pending allocations is $numPendingAllocate. 
" +
+                  s"Slept for $sleepDuration/$sleepInterval ms.")
+              // if sleep was less than the minimum interval, sleep for the 
rest of it
+              val toSleep = math.max(0, initialAllocationInterval - 
sleepDuration)
+              if (toSleep > 0) {
+                logDebug(s"Going back to sleep for $toSleep ms")
+                // use Thread.sleep instead of allocatorLock.wait. there is no 
need to be woken up
+                // by the methods that signal allocatorLock because this is 
just finishing the min
+                // sleep interval, which should happen even if this is 
signalled again.
+                Thread.sleep(toSleep)
+              }
+            } else {
+              logDebug(s"Number of pending allocations is $numPendingAllocate. 
" +
+                  s"Slept for $sleepDuration/$sleepInterval.")
+            }
           } catch {
             case e: InterruptedException =>
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to