[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73764141
  
@sryza looks like I narrowly missed your last commit. I have copied the 
changes there to this HOTFIX commit: b640c841fca92bb0bca77267db2965ff8f79586f


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73763274
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27220/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73763270
  
  [Test build #27220 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27220/consoleFull)
 for   PR 4168 at commit 
[`3cca880`](https://github.com/apache/spark/commit/3cca88085a7b9fd72746c8e6e34d7820deb8d2eb).
 * This patch **fails Python style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73762745
  
  [Test build #27220 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27220/consoleFull)
 for   PR 4168 at commit 
[`3cca880`](https://github.com/apache/spark/commit/3cca88085a7b9fd72746c8e6e34d7820deb8d2eb).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/4168


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24440514
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1105,6 +1105,25 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
+   * Express a preference to the cluster manager for a given total number 
of executors. This can
+   * result in canceling pending requests or filing additional requests.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
+   */
+  @DeveloperApi
--- End diff --

not developer api if it's `private[spark]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24440168
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+  client.requestTotalExecutors(newTotalExecutors)
+  numExecutorsToAdd = 1
+  updateNumExecutorsPending(newTotalExecutors)
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
-   * Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
+   * @return the number of additional executors actually requested.
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors - 
executorsPendingToRemove.size,
--- End diff --

ok, I will fix this myself when I merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscr

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73761510
  
Hey @sryza thanks a lot for fixing this. I will merge this into master and 
1.3 after fixing the last batch of comments that I pointed out when I merge 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24438965
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+  client.requestTotalExecutors(newTotalExecutors)
+  numExecutorsToAdd = 1
+  updateNumExecutorsPending(newTotalExecutors)
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
-   * Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
+   * @return the number of additional executors actually requested.
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors - 
executorsPendingToRemove.size,
--- End diff --

Oy, yup, you're definitely right.  Not sure what I was thinking last night.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

--

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24435726
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+  client.requestTotalExecutors(newTotalExecutors)
+  numExecutorsToAdd = 1
+  updateNumExecutorsPending(newTotalExecutors)
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
-   * Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
+   * @return the number of additional executors actually requested.
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors - 
executorsPendingToRemove.size,
--- End diff --

> If your new total was up to 10, YARN could grant you that 10, and then 
would have 18, including the 8 pending to be removed.

YARN could grant you the 10 *total*, not an additional 10. Right? A few 
lines later we use this value to cap what we provide to 
`sc.requestTotalExecutors`, which now takes in 2 instead of 10. I still don't 
see how it's possible for us to get 18 executors.


---
If your project is set up for it, you can r

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-10 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24406500
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+  client.requestTotalExecutors(newTotalExecutors)
+  numExecutorsToAdd = 1
+  updateNumExecutorsPending(newTotalExecutors)
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
-   * Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
+   * @return the number of additional executors actually requested.
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors - 
executorsPendingToRemove.size,
--- End diff --

yes,i agree with @andrewor14. we just do maxNumExecutors here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73656534
  
(I only looked at the public API's, but those look fine to me now - there 
are none!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24391913
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+  client.requestTotalExecutors(newTotalExecutors)
+  numExecutorsToAdd = 1
+  updateNumExecutorsPending(newTotalExecutors)
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
-   * Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
+   * @return the number of additional executors actually requested.
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors - 
executorsPendingToRemove.size,
--- End diff --

If your new total was up to 10, YARN could grant you that 10, and then 
would have 18, including the 8 pending to be removed.

The previous behavior was that we would never go above maxNumExecutors, 
when some of the live executors were pending removal.  I'm not adverse to 
changing that if you think we should, but subtracting the pending executors 
here preserves the existing behavior.


---
If your project is set up for it, you can repl

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73645547
  
  [Test build #27168 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27168/consoleFull)
 for   PR 4168 at commit 
[`f80b7ec`](https://github.com/apache/spark/commit/f80b7ec16bf3f9ceda58bb2220cc8a681575cd66).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class GetField(child: Expression, field: StructField, ordinal: 
Int) extends UnaryExpression `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73645552
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27168/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73645180
  
Hey @sryza thanks for reiterating quickly on the reviews. I left 1 question 
but other than that this looks pretty close.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24389414
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
+  client.requestTotalExecutors(newTotalExecutors)
+  numExecutorsToAdd = 1
+  updateNumExecutorsPending(newTotalExecutors)
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
-   * Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
+   * @return the number of additional executors actually requested.
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors - 
executorsPendingToRemove.size,
--- End diff --

I don't see why we subtract `executorsPendingToRemove` here. If the max is 
10, and I currently have 9, and 8 of these are pending to be removed, then this 
difference is 10 - 8 = 2, which means my new total can only be up to 2. This 
doesn't make sense though, because my total could really be up to 10. If I'm 
understanding this correctly we should just do `maxNumExecutors` here.


---
If your project is set up for it, you can reply to this

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24388610
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1105,6 +1105,25 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
+   * Express a preference to the cluster manager for a given total number 
of executors. This can
+   * result in canceling pending requests or filing additional requests.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
--- End diff --

YARN


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24388465
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -23,10 +23,16 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
+   * Express a preference to the cluster manager for a given total number 
of executors. This can
+   * result in canceling pending requests or filing additional requests.
+   */
+  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+
+  /**
* Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged by the cluster manager.
+   * This is currently only supported in YARN mode. Return whether the 
request is received.
--- End diff --

If we want to add in the javadocs that it's only supported in YARN mode, we 
should do it for all the methods here or just move this to the class javadocs. 
I prefer the latter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73641698
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27170/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73641693
  
  [Test build #27170 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27170/consoleFull)
 for   PR 4168 at commit 
[`37ce77d`](https://github.com/apache/spark/commit/37ce77dd603e9755ddb1cffcdb4eac622cf9e042).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73641562
  
  [Test build #27170 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27170/consoleFull)
 for   PR 4168 at commit 
[`37ce77d`](https://github.com/apache/spark/commit/37ce77dd603e9755ddb1cffcdb4eac622cf9e042).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73640262
  
  [Test build #27168 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27168/consoleFull)
 for   PR 4168 at commit 
[`f80b7ec`](https://github.com/apache/spark/commit/f80b7ec16bf3f9ceda58bb2220cc8a681575cd66).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24383247
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -311,18 +303,27 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
 
   /**
* Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged.
+   * @return whether the request is acknowledged.
*/
   final override def requestExecutors(numAdditionalExecutors: Int): 
Boolean = synchronized {
 logInfo(s"Requesting $numAdditionalExecutors additional executor(s) 
from the cluster manager")
 logDebug(s"Number of pending executors is now $numPendingExecutors")
 numPendingExecutors += numAdditionalExecutors
 // Account for executors pending to be added or removed
-val newTotal = numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size
+val newTotal = numExistingExecutors + numPendingExecutors
 doRequestTotalExecutors(newTotal)
   }
 
   /**
+   * Request an additional number of executors from the cluster manager.
+   * @return whether the request is acknowledged.
+   */
+  final override def requestTotalExecutors(numExecutors: Int): Boolean = 
synchronized {
+numPendingExecutors = numExecutors - numExistingExecutors
--- End diff --

Disallowing `numPendingExecutors` from being negative could lead to some 
strange behavior.

If I call `requestTotalExecutors(5)` and then `requestExecutors(2)`, I 
would expect to eventually end up with 7 executors.  But if I started with 6 
executors, then I would end up with 8.

I don't have a strong opinion either way.  If we allow it to be negative, 
maybe it would make sense to change the name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24381198
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -311,18 +303,27 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
 
   /**
* Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged.
+   * @return whether the request is acknowledged.
*/
   final override def requestExecutors(numAdditionalExecutors: Int): 
Boolean = synchronized {
 logInfo(s"Requesting $numAdditionalExecutors additional executor(s) 
from the cluster manager")
 logDebug(s"Number of pending executors is now $numPendingExecutors")
 numPendingExecutors += numAdditionalExecutors
 // Account for executors pending to be added or removed
-val newTotal = numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size
+val newTotal = numExistingExecutors + numPendingExecutors
 doRequestTotalExecutors(newTotal)
   }
 
   /**
+   * Request an additional number of executors from the cluster manager.
--- End diff --

the docs are wrong here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73628826
  
@andrewor14 when I mentioned rederiving `numPendingExecutors` I was 
actually talking about the way we calculate the version of it that lives in 
`ExecutorAllocationManager`.  Sorry for being unclear here.  Taking out the 
code that decrements it in `CoarseGrainedSchedulerBackend` was just an 
accidental omission on my part when I added all that code back in.  I'll put 
this code back.

I don't have much opinion on where / how in the request executors methods 
in `CoarseGrainedSchedulerBackend`, `numExecutorsPending` should be updated.  
Can't tell if you do either, but happy to change it if you do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73625504
  
@sryza I may be missing something but I don't understand why 
`numPendingExecutors` should be rederived from the user calling 
`requestTotalExecutors` every time. What exactly was the issue with 
decrementing it as we did before? Also I believe the current code is incorrect 
because the user (or us internally) may call either `requestExecutor` or 
`requestTotalExecutors`, and now we only reset it for the latter. If we were to 
do this we should do it in `doRequestTotalExecutors` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24382301
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -23,12 +23,19 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged by the cluster manager.
+   * Express a preference to the cluster manager for a given total number 
of executors. This can
+   * result in canceling pending requests or filing additional requests.
*/
-  def requestExecutors(numAdditionalExecutors: Int): Boolean
+  def requestTotalExecutors(numExecutors: Int): Boolean
 
   /**
+   * Request an additional number of executors from the cluster manager.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
--- End diff --

Holdover from previous doc, but I agree, worth changing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24381293
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -311,18 +303,27 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val actorSyste
 
   /**
* Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged.
+   * @return whether the request is acknowledged.
*/
   final override def requestExecutors(numAdditionalExecutors: Int): 
Boolean = synchronized {
 logInfo(s"Requesting $numAdditionalExecutors additional executor(s) 
from the cluster manager")
 logDebug(s"Number of pending executors is now $numPendingExecutors")
 numPendingExecutors += numAdditionalExecutors
 // Account for executors pending to be added or removed
-val newTotal = numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size
+val newTotal = numExistingExecutors + numPendingExecutors
 doRequestTotalExecutors(newTotal)
   }
 
   /**
+   * Request an additional number of executors from the cluster manager.
+   * @return whether the request is acknowledged.
+   */
+  final override def requestTotalExecutors(numExecutors: Int): Boolean = 
synchronized {
+numPendingExecutors = numExecutors - numExistingExecutors
--- End diff --

this could make `numPendingExecutors` negative right? We should probably do 
a math.min here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24381075
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -23,12 +23,19 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged by the cluster manager.
+   * Express a preference to the cluster manager for a given total number 
of executors. This can
+   * result in canceling pending requests or filing additional requests.
*/
-  def requestExecutors(numAdditionalExecutors: Int): Boolean
+  def requestTotalExecutors(numExecutors: Int): Boolean
 
   /**
+   * Request an additional number of executors from the cluster manager.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
--- End diff --

YARN?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24381071
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -23,12 +23,19 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Request an additional number of executors from the cluster manager.
-   * Return whether the request is acknowledged by the cluster manager.
+   * Express a preference to the cluster manager for a given total number 
of executors. This can
+   * result in canceling pending requests or filing additional requests.
*/
-  def requestExecutors(numAdditionalExecutors: Int): Boolean
+  def requestTotalExecutors(numExecutors: Int): Boolean
 
   /**
+   * Request an additional number of executors from the cluster manager.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
+   */
+  def requestExecutors(numExecutors: Int): Boolean
+
+
+/**
--- End diff --

weird indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73623282
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27138/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73623276
  
  [Test build #27138 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27138/consoleFull)
 for   PR 4168 at commit 
[`a19e6d2`](https://github.com/apache/spark/commit/a19e6d23346ecfb673e1db81dddb2d0e4242179e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73614961
  
  [Test build #27138 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27138/consoleFull)
 for   PR 4168 at commit 
[`a19e6d2`](https://github.com/apache/spark/commit/a19e6d23346ecfb673e1db81dddb2d0e4242179e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73613671
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27129/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73613658
  
  [Test build #27129 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27129/consoleFull)
 for   PR 4168 at commit 
[`985ffe7`](https://github.com/apache/spark/commit/985ffe7a08ca3fa303789d7f76ef32757df14988).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24375632
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
+   * Express a preference to the cluster manager for a given total number 
of executors. This will
+   * result in either canceling pending requests or filing additional 
requests.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
+   */
+  @DeveloperApi
+  override def requestTotalExecutors(numExecutors: Int): Boolean = {
+assert(master.contains("yarn") || dynamicAllocationTesting,
+  "Requesting executors is currently only supported in YARN mode")
+schedulerBackend match {
+  case b: CoarseGrainedSchedulerBackend =>
+b.requestTotalExecutors(numExecutors)
+  case _ =>
+logWarning("Requesting executors is only supported in 
coarse-grained mode")
+false
+}
+  }
+
+  /**
+   * :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in Yarn mode. Return whether the 
request is received.
*/
   @DeveloperApi
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+  override def requestExecutors(numExecutors: Int): Boolean = {
--- End diff --

Oops this wasn't even intentional, I'll change it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24375535
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
+   * Express a preference to the cluster manager for a given total number 
of executors. This will
+   * result in either canceling pending requests or filing additional 
requests.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
+   */
+  @DeveloperApi
+  override def requestTotalExecutors(numExecutors: Int): Boolean = {
+assert(master.contains("yarn") || dynamicAllocationTesting,
+  "Requesting executors is currently only supported in YARN mode")
+schedulerBackend match {
+  case b: CoarseGrainedSchedulerBackend =>
+b.requestTotalExecutors(numExecutors)
+  case _ =>
+logWarning("Requesting executors is only supported in 
coarse-grained mode")
+false
+}
+  }
+
+  /**
+   * :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in Yarn mode. Return whether the 
request is received.
*/
   @DeveloperApi
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+  override def requestExecutors(numExecutors: Int): Boolean = {
--- End diff --

I'd argue that `requestExecutors(numExecutors)` could potentially be 
mistaken for "I want 6 total executors and therefore I call 
requestExecutor(6)", but if we have `numAdditionalExecutors` there is no 
possibility for ambiguity


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24373580
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
+   * Express a preference to the cluster manager for a given total number 
of executors. This will
+   * result in either canceling pending requests or filing additional 
requests.
+   * This is currently only supported in Yarn mode. Return whether the 
request is received.
+   */
+  @DeveloperApi
+  override def requestTotalExecutors(numExecutors: Int): Boolean = {
+assert(master.contains("yarn") || dynamicAllocationTesting,
+  "Requesting executors is currently only supported in YARN mode")
+schedulerBackend match {
+  case b: CoarseGrainedSchedulerBackend =>
+b.requestTotalExecutors(numExecutors)
+  case _ =>
+logWarning("Requesting executors is only supported in 
coarse-grained mode")
+false
+}
+  }
+
+  /**
+   * :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in Yarn mode. Return whether the 
request is received.
*/
   @DeveloperApi
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+  override def requestExecutors(numExecutors: Int): Boolean = {
--- End diff --

Changing this name is also an API break since in Scala you can refer to 
variables by name. I do agree `numExecutors` seems better since it's obvious 
that they are additional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24373541
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
+   * Express a preference to the cluster manager for a given total number 
of executors. This will
+   * result in either canceling pending requests or filing additional 
requests.
--- End diff --

Is this true? What if I set it to the number I currently have?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73602915
  
@andrewor14 updated patch adds back `requestExecutors` and incorporates 
your other comments.

I also made some small changes to the way we update `numExecutorsPending` - 
it now gets rederived from the desired total executors and the number of 
running executors each time we change the desired total.  The result should be 
the same, but we make it clear that everything flows from the new total and 
avoid the possibility of bugs that accumulate junk in `numExecutorsPending`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73601805
  
  [Test build #27129 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27129/consoleFull)
 for   PR 4168 at commit 
[`985ffe7`](https://github.com/apache/spark/commit/985ffe7a08ca3fa303789d7f76ef32757df14988).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24368946
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
+  numExecutorsPending += delta
+  numExecutorsToAdd = 1
+  delta
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
--- End diff --

ok, I suppose that is clearer... let's just use that for now and we can 
revisit this again later if we want


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24367502
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
+  numExecutorsPending += delta
+  numExecutorsToAdd = 1
+  delta
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
--- End diff --

I see what you mean.  `maybeAddExecutors` would be a little confusing 
because we might cancel requests.  What do you think of 
`addOrCancelExecutorRequests`?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24366945
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * The number of executors we would have if the cluster manager were to 
fulfill all our requests.
+   */
+  private def targetNumExecutors(): Int =
+numExecutorsPending + executorIds.size
--- End diff --

I see, that's fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24366884
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
+  numExecutorsPending += delta
+  numExecutorsToAdd = 1
+  delta
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
--- End diff --

I see, otherwise it's hard to test it. Maybe we should just rename this 
then to `maybeAddExecutors` or something. The current name sounds more like 
it's updating a few local variables rather than actually sending the request. 
Then in the javadoc we can explain why it's "maybe"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24363707
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * The number of executors we would have if the cluster manager were to 
fulfill all our requests.
+   */
+  private def targetNumExecutors(): Int =
+numExecutorsPending + executorIds.size
--- End diff --

This fixes a change in behavior where it became possible for us to request 
executors beyond the max when executors were pending removal.  I noticed this 
because the "interleaving add and remove" test was failing.  



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24358135
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
+  numExecutorsPending += delta
+  numExecutorsToAdd = 1
+  delta
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
--- End diff --

I broke this out and added the return value for testing.  My understanding 
was that `addExecutors` was broken out for a similar reason?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24358128
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * The number of executors we would have if the cluster manager were to 
fulfill all our requests.
+   */
+  private def targetNumExecutors(): Int =
+numExecutorsPending + executorIds.size
--- End diff --

an earlier version of this subtracted the number of executors pending to be 
removed. Is there a reason why you took that out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24357953
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
+  numExecutorsPending += delta
+  numExecutorsToAdd = 1
+  delta
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
+
+  /**
* Request a number of executors from the cluster manager.
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumExecutorsNeeded the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
-// Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+// Do not request more executors if it would put our target over the 
upper bound
+val currentTarget = targetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val actualMaxNumExecutors = math.min(maxNumExecutors, 
maxNumExecutorsNeeded)
+val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, 
actualMaxNumExecutors)
+// Number of additional executors requested at this round
--- End diff --

in this round


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-

[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24357759
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * The number of executors we would have if the cluster manager were to 
fulfill all our requests.
+   */
+  private def targetNumExecutors(): Int =
+numExecutorsPending + executorIds.size
+
+  /**
+   * The maximum number of executors we would need under the current load 
to satisfy all running
+   * and pending tasks.
+   */
+  private def maxNumExecutorsNeeded(): Int = {
+// The maximum number of executors we need under the current load is 
the total number of
+// running or pending tasks, divided by the full task capacity of each 
executor, rounded up.
--- End diff --

this comment is a little redundant given the javadocs. I would just add 
`rounded up` to the end of the javadoc itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24357547
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
--- End diff --

why are you doing `math.min` here? Since `maxNeeded < currentTarget`, 
`maxNeeded - currentTarget` is always going to be negative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24357469
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Check to see whether our existing allocation and the requests we've 
made previously exceed our
+   * current needs. If so, let the cluster manager know so that it can 
cancel pending requests that
+   * are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * @return the delta in the target number of executors.
+   */
+  private def adjustRequestedExecutors(now: Long): Int = synchronized {
+val currentTarget = targetNumExecutors
+val maxNeeded = maxNumExecutorsNeeded
+
+if (maxNeeded < currentTarget) {
+  // The target number exceeds the number we actually need, so stop 
adding new
+  // executors and inform the cluster manager to cancel the extra 
pending requests.
+  client.requestTotalExecutors(maxNeeded)
+  val delta = math.min(maxNeeded - currentTarget, 0)
+  numExecutorsPending += delta
+  numExecutorsToAdd = 1
+  delta
+} else if (addTime != NOT_SET && now >= addTime) {
+  val delta = addExecutors(maxNeeded)
+  logDebug(s"Starting timer to add more executors (to " +
+s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+  addTime += sustainedSchedulerBacklogTimeout * 1000
+  delta
+} else {
+  0
+}
+  }
--- End diff --

I actually preferred keeping this in `schedule` as you have done before. We 
don't ever use the return value here and this doesn't actually hide that much 
detail from `schedule` as long as we have sufficiently informative comments 
there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24356890
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -26,7 +26,7 @@ private[spark] trait ExecutorAllocationClient {
* Request an additional number of executors from the cluster manager.
--- End diff --

this is still outdated 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73571161
  
  [Test build #27111 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27111/consoleFull)
 for   PR 4168 at commit 
[`b78806e`](https://github.com/apache/spark/commit/b78806ee59eaa0fcca99b21df7a96ef1ccb307b3).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73571179
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27111/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73563743
  
Hey @sryza we definitely need to keep the API because applications building 
on top of Spark may want to implement their own heuristics (e.g. Spark 
streaming may want to scale up and down based on the amount of receiver 
traffic, but not so much on whether there are tasks waiting to be scheduled).

By the way I had a chat with @pwendell about this offline. The other option 
is to just expose both if that makes implementing this patch easier. Even 
though it's only developer API there are probably already people experimenting 
with it, and we should make an effort to not break their code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73558148
  
  [Test build #27111 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27111/consoleFull)
 for   PR 4168 at commit 
[`b78806e`](https://github.com/apache/spark/commit/b78806ee59eaa0fcca99b21df7a96ef1ccb307b3).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73557433
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27110/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73557431
  
  [Test build #27110 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27110/consoleFull)
 for   PR 4168 at commit 
[`aa01555`](https://github.com/apache/spark/commit/aa01555e5eaff3e83ecfa4b5bca89b7d56a3e96e).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73557303
  
  [Test build #27110 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27110/consoleFull)
 for   PR 4168 at commit 
[`aa01555`](https://github.com/apache/spark/commit/aa01555e5eaff3e83ecfa4b5bca89b7d56a3e96e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-09 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24313087
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -413,6 +418,7 @@ private[spark] class ExecutorAllocationManager(
 private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
 private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
 private val executorIdToTaskIds = new mutable.HashMap[String, 
mutable.HashSet[Long]]
+private var numRunningTasks: Int = _
--- End diff --

i think that's ok, because onTaskStart and onTaskEnd are consider to 
speculation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24311057
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager(
 }
 
 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+  numRunningTasks += 1
--- End diff --

I take that back on further inspection.  It should be synchronized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24310281
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager(
 }
 
 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+  numRunningTasks += 1
--- End diff --

Good point.  It doesn't need synchronization because only one thread is 
writing to it, but we should make it volatile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73459200
  
Thanks for the review @andrewor14.  Updated patch coming soon.

Regarding the public API, what I'm contending is that we don't really have 
any idea whether or how people would use `requestExecutors`, so it's not 
worthwhile to add extra bookkeeping and complexity to support it.  I can't 
personally envision a realistic situation where I would want to use either 
`requestExecutors` or `requestTotalExecutors`.  Maybe it would be better to 
just remove the API until someone asks for it?  Unless you've come across uses 
for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73457967
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27077/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73457963
  
  [Test build #27077 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27077/consoleFull)
 for   PR 4168 at commit 
[`16db9f4`](https://github.com/apache/spark/commit/16db9f4aa94ec28500eff08c6d23a5f4da6a152e).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73456491
  
@sryza Is the API change in `ExecutorAllocationManager` necessary for this 
change? The new API essentially pushes the responsibility of maintaining the 
number of executors that the application currently has to the user. If I as a 
Spark application want to incrementally add executors, then I must additionally 
keep track of the number of executors we currently have as we used to do in 
`CoarseGrainedSchedulerBackend`. I actually don't see a great use case for 
something like `sc.setTotalExecutors` because it kinda expects the user to know 
how many executors they think they should need, and this estimation is often 
difficult.

The rest of it looks fairly straightforward. My comments mostly have to do 
with variable naming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308654
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
+  private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized 
{
 // Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+val currentTarget = currentTargetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd,
+  math.min(maxNumExecutors, maxNumNeededExecutors))
+val numRequested = newTotalExecutors - currentTarget
--- End diff --

I think it's confusing to have both `numRequested` and `numExecutorsToAdd`. 
I prefer the old name `actualNumExecutorsToAdd` actually, and there we can add 
a short comment to explain what constitutes as "actual"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308697
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1029,12 +1029,12 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
* This is currently only supported in Yarn mode. Return whether the 
request is received.
*/
   @DeveloperApi
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+  override def requestTotalExecutors(numExecutors: Int): Boolean = {
--- End diff --

javadoc is outdated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308687
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -413,6 +418,7 @@ private[spark] class ExecutorAllocationManager(
 private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
 private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
 private val executorIdToTaskIds = new mutable.HashMap[String, 
mutable.HashSet[Long]]
+private var numRunningTasks: Int = _
--- End diff --

what are the semantics here? Does this count speculation? In steady state 
is this guaranteed to go back to 0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73456090
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27073/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73456084
  
  [Test build #27073 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27073/consoleFull)
 for   PR 4168 at commit 
[`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8).
 * This patch **fails Spark unit tests**.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308610
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
--- End diff --

`maxNumExecutorsNeeded`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308577
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
+  private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized 
{
 // Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+val currentTarget = currentTargetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
   numExecutorsToAdd = 1
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd,
+  math.min(maxNumExecutors, maxNumNeededExecutors))
--- End diff --

this double `math.min` is really hard to read. I would at least put the 
inner math.min into a separate variable and add a comment to explain what's 
going on


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308537
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager(
 t.start()
   }
 
+  def currentTargetNumExecutors(): Int =
+numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
+   * and number of executors running.
+   *
+   * First, check to see whether our existing allocation and the requests 
we've made previously
+   * exceed our current needs. If so, let the cluster manager know so that 
it can cancel pending
+   * requests that are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * Last, if the remove time for an existing executor has expired, kill 
the executor.
+   *
* This is factored out into its own method for testing.
*/
   private def schedule(): Unit = synchronized {
+// The maximum number of executors we need under the current load is 
the total number of
+// running or pending tasks, divided by the full task capacity of each 
executor, rounded up.
+val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
+val maxNumNeededExecutors = (numRunningOrPendingTasks + 
tasksPerExecutor - 1) / tasksPerExecutor
--- End diff --

`maxNumExecutorsNeeded`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308517
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager(
 t.start()
   }
 
+  def currentTargetNumExecutors(): Int =
--- End diff --

I would just call this `targetNumExecutors: Int` to be more concise, and 
add a javadoc to explain what it is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308501
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager(
 t.start()
   }
 
+  def currentTargetNumExecutors(): Int =
+numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
+   * and number of executors running.
+   *
+   * First, check to see whether our existing allocation and the requests 
we've made previously
+   * exceed our current needs. If so, let the cluster manager know so that 
it can cancel pending
+   * requests that are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * Last, if the remove time for an existing executor has expired, kill 
the executor.
+   *
* This is factored out into its own method for testing.
*/
   private def schedule(): Unit = synchronized {
+// The maximum number of executors we need under the current load is 
the total number of
+// running or pending tasks, divided by the full task capacity of each 
executor, rounded up.
+val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
+val maxNumNeededExecutors = (numRunningOrPendingTasks + 
tasksPerExecutor - 1) / tasksPerExecutor
+
 val now = clock.getTimeMillis
-if (addTime != NOT_SET && now >= addTime) {
-  addExecutors()
+val currentTarget = currentTargetNumExecutors
+if (maxNumNeededExecutors < currentTarget) {
+  client.requestTotalExecutors(maxNumNeededExecutors)
+  numExecutorsPending -= math.min(currentTarget - 
maxNumNeededExecutors, 0)
--- End diff --

this looks incorrect. If `currentTarget > maxNumNeededExecutors` the min 
will always return 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308485
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager(
 t.start()
   }
 
+  def currentTargetNumExecutors(): Int =
+numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
+   * and number of executors running.
+   *
+   * First, check to see whether our existing allocation and the requests 
we've made previously
+   * exceed our current needs. If so, let the cluster manager know so that 
it can cancel pending
+   * requests that are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * Last, if the remove time for an existing executor has expired, kill 
the executor.
+   *
* This is factored out into its own method for testing.
*/
   private def schedule(): Unit = synchronized {
+// The maximum number of executors we need under the current load is 
the total number of
+// running or pending tasks, divided by the full task capacity of each 
executor, rounded up.
+val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
+val maxNumNeededExecutors = (numRunningOrPendingTasks + 
tasksPerExecutor - 1) / tasksPerExecutor
+
 val now = clock.getTimeMillis
-if (addTime != NOT_SET && now >= addTime) {
-  addExecutors()
+val currentTarget = currentTargetNumExecutors
+if (maxNumNeededExecutors < currentTarget) {
+  client.requestTotalExecutors(maxNumNeededExecutors)
+  numExecutorsPending -= math.min(currentTarget - 
maxNumNeededExecutors, 0)
+  numExecutorsToAdd = 1
--- End diff --

can you add a comment on this block to explain what's going on, maybe 
something like
```
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending 
requests
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308449
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
+  private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized 
{
 // Do not request more executors if we have already reached the upper 
bound
-val numExistingExecutors = executorIds.size + numExecutorsPending
-if (numExistingExecutors >= maxNumExecutors) {
+val currentTarget = currentTargetNumExecutors
+if (currentTarget >= maxNumExecutors) {
   logDebug(s"Not adding executors because there are already 
${executorIds.size} " +
 s"registered and $numExecutorsPending pending executor(s) (limit 
$maxNumExecutors)")
--- End diff --

you may have to update this comment because it doesn't take into account 
executors pending to be removed, so the math doesn't quite add up right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308401
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager(
 t.start()
   }
 
+  def currentTargetNumExecutors(): Int =
+numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
--- End diff --

Oops definitely


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r24308360
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager(
 t.start()
   }
 
+  def currentTargetNumExecutors(): Int =
+numExecutorsPending + executorIds.size - executorsPendingToRemove.size
+
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
--- End diff --

I don't think relegate here is the right word. Did you mean regulate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73455047
  
  [Test build #27077 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27077/consoleFull)
 for   PR 4168 at commit 
[`16db9f4`](https://github.com/apache/spark/commit/16db9f4aa94ec28500eff08c6d23a5f4da6a152e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73451518
  
  [Test build #27073 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27073/consoleFull)
 for   PR 4168 at commit 
[`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8).
 * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-08 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73451192
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73155456
  
@andrewor14 it's no longer a WIP, and I am aiming for it for 1.3.  I just 
updated the title - sorry for the confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-02-05 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-73154727
  
@sryza is this still WIP? Are we aiming for this to go into 1.3?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-28 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23683544
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
--- End diff --

i think we need to redefine a addExecutors function that have used in 
ExecutorAllocationManagerSuite. This function can be called to add executors at 
any time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-28 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23682980
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
+  private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized 
{
--- End diff --

i think addExecutors donot need to return Int value. It is unnecessary 
because the returned value have not be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-28 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23682774
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -470,6 +477,7 @@ private[spark] class ExecutorAllocationManager(
 }
 
 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+  numRunningTasks -= 1
--- End diff --

this also should be synchronized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-28 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23682749
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager(
 }
 
 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+  numRunningTasks += 1
--- End diff --

i think we should let numRunningTasks  to synchronized,because schedule 
thread and listener  thread are two different thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-27 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-71757464
  
@sryza looks like the test failures are legit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-71697091
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26168/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-71697071
  
  [Test build #26168 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26168/consoleFull)
 for   PR 4168 at commit 
[`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4168#issuecomment-71687375
  
  [Test build #26168 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26168/consoleFull)
 for   PR 4168 at commit 
[`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-26 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23566205
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -199,14 +199,31 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
+   * and number of executors running.
+   *
+   * First, check to see whether our existing allocation and the requests 
we've made previously
+   * exceed our current needs. If so, let the cluster manager know so that 
it can cancel pending
+   * requests that are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * Last, if the remove time for an existing executor has expired, kill 
the executor.
* This is factored out into its own method for testing.
*/
   private def schedule(): Unit = synchronized {
+// The maximum number of executors we need under the current load is 
the total number of
+// running or pending tasks, divided by the full task capacity of each 
executor, rounded up.
+val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
+val maxNumNeededExecutors = (numRunningOrPendingTasks + 
tasksPerExecutor - 1) / tasksPerExecutor
+
 val now = clock.getTimeMillis
-if (addTime != NOT_SET && now >= addTime) {
-  addExecutors()
+if (maxNumNeededExecutors < numExecutorsPending + executorIds.size) {
--- End diff --

Good point, I think you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-24 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23502091
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -226,8 +243,11 @@ private[spark] class ExecutorAllocationManager(
* If the cap on the number of executors is reached, give up and reset 
the
* number of executors to add next round instead of continuing to double 
it.
* Return the number actually requested.
+   *
+   * @param maxNumNeededExecutors the maximum number of executors all 
currently running or pending
+   *  tasks could fill
*/
-  private def addExecutors(): Int = synchronized {
+  private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized 
{
 // Do not request more executors if we have already reached the upper 
bound
 val numExistingExecutors = executorIds.size + numExecutorsPending
--- End diff --

i think there we should make  numExistingExecutors  exclude 
executorsPendingToRemove.size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-24 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23502082
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -237,39 +257,18 @@ private[spark] class ExecutorAllocationManager(
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val newTotalExecutors = math.min(numExistingExecutors + 
numExecutorsToAdd,
--- End diff --

i think there we should make numExistingExecutors  exclude 
executorsPendingToRemove.size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-24 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23502033
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -237,39 +257,18 @@ private[spark] class ExecutorAllocationManager(
   return 0
 }
 
-// The number of executors needed to satisfy all pending tasks is the 
number of tasks pending
-// divided by the number of tasks each executor can fit, rounded up.
-val maxNumExecutorsPending =
-  (listener.totalPendingTasks() + tasksPerExecutor - 1) / 
tasksPerExecutor
-if (numExecutorsPending >= maxNumExecutorsPending) {
-  logDebug(s"Not adding executors because there are already 
$numExecutorsPending " +
-s"pending and pending tasks could only fill 
$maxNumExecutorsPending")
-  numExecutorsToAdd = 1
-  return 0
-}
-
-// It's never useful to request more executors than could satisfy all 
the pending tasks, so
-// cap request at that amount.
-// Also cap request with respect to the configured upper bound.
-val maxNumExecutorsToAdd = math.min(
-  maxNumExecutorsPending - numExecutorsPending,
-  maxNumExecutors - numExistingExecutors)
-assert(maxNumExecutorsToAdd > 0)
-
-val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, 
maxNumExecutorsToAdd)
-
-val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
-val addRequestAcknowledged = testing || 
client.requestExecutors(actualNumExecutorsToAdd)
+val newTotalExecutors = math.min(numExistingExecutors + 
numExecutorsToAdd,
+  math.min(maxNumExecutors, maxNumNeededExecutors))
+val numRequested = newTotalExecutors - numExistingExecutors
+val addRequestAcknowledged = testing || 
client.requestTotalExecutors(newTotalExecutors)
--- End diff --

i think there we should exclude executorsPendingToRemove.size from 
newTotalExecutors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...

2015-01-24 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4168#discussion_r23502021
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -199,14 +199,31 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * If the add time has expired, request new executors and refresh the 
add time.
-   * If the remove time for an existing executor has expired, kill the 
executor.
+   * This is called at a fixed interval to relegate the number of pending 
executor requests
+   * and number of executors running.
+   *
+   * First, check to see whether our existing allocation and the requests 
we've made previously
+   * exceed our current needs. If so, let the cluster manager know so that 
it can cancel pending
+   * requests that are unneeded.
+   *
+   * If not, and the add time has expired, see if we can request new 
executors and refresh the add
+   * time.
+   *
+   * Last, if the remove time for an existing executor has expired, kill 
the executor.
* This is factored out into its own method for testing.
*/
   private def schedule(): Unit = synchronized {
+// The maximum number of executors we need under the current load is 
the total number of
+// running or pending tasks, divided by the full task capacity of each 
executor, rounded up.
+val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
+val maxNumNeededExecutors = (numRunningOrPendingTasks + 
tasksPerExecutor - 1) / tasksPerExecutor
+
 val now = clock.getTimeMillis
-if (addTime != NOT_SET && now >= addTime) {
-  addExecutors()
+if (maxNumNeededExecutors < numExecutorsPending + executorIds.size) {
--- End diff --

Do we need to exclude executorsPendingToRemove.size?because YarnAllocator 
have killed toRemoveExecutors, but ExecutorAllocationManager maybe donot 
receive onExecutorRemoved message. so that time executorIds has removed 
executors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >