[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73764141 @sryza looks like I narrowly missed your last commit. I have copied the changes there to this HOTFIX commit: b640c841fca92bb0bca77267db2965ff8f79586f --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73763274 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27220/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73763270 [Test build #27220 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27220/consoleFull) for PR 4168 at commit [`3cca880`](https://github.com/apache/spark/commit/3cca88085a7b9fd72746c8e6e34d7820deb8d2eb). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73762745 [Test build #27220 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27220/consoleFull) for PR 4168 at commit [`3cca880`](https://github.com/apache/spark/commit/3cca88085a7b9fd72746c8e6e34d7820deb8d2eb). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/4168 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24440514 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1105,6 +1105,25 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. + * This is currently only supported in Yarn mode. Return whether the request is received. + */ + @DeveloperApi --- End diff -- not developer api if it's `private[spark]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24440168 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size, --- End diff -- ok, I will fix this myself when I merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscr
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73761510 Hey @sryza thanks a lot for fixing this. I will merge this into master and 1.3 after fixing the last batch of comments that I pointed out when I merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24438965 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size, --- End diff -- Oy, yup, you're definitely right. Not sure what I was thinking last night. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24435726 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size, --- End diff -- > If your new total was up to 10, YARN could grant you that 10, and then would have 18, including the 8 pending to be removed. YARN could grant you the 10 *total*, not an additional 10. Right? A few lines later we use this value to cap what we provide to `sc.requestTotalExecutors`, which now takes in 2 instead of 10. I still don't see how it's possible for us to get 18 executors. --- If your project is set up for it, you can r
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24406500 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size, --- End diff -- yes,i agree with @andrewor14. we just do maxNumExecutors here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- -
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73656534 (I only looked at the public API's, but those look fine to me now - there are none!) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24391913 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size, --- End diff -- If your new total was up to 10, YARN could grant you that 10, and then would have 18, including the 8 pending to be removed. The previous behavior was that we would never go above maxNumExecutors, when some of the live executors were pending removal. I'm not adverse to changing that if you think we should, but subtracting the pending executors here preserves the existing behavior. --- If your project is set up for it, you can repl
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73645547 [Test build #27168 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27168/consoleFull) for PR 4168 at commit [`f80b7ec`](https://github.com/apache/spark/commit/f80b7ec16bf3f9ceda58bb2220cc8a681575cd66). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class GetField(child: Expression, field: StructField, ordinal: Int) extends UnaryExpression ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73645552 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27168/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73645180 Hey @sryza thanks for reiterating quickly on the reviews. I left 1 question but other than that this looks pretty close. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24389414 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,59 +240,90 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def addOrCancelExecutorRequests(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + val newTotalExecutors = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(newTotalExecutors) + numExecutorsToAdd = 1 + updateNumExecutorsPending(newTotalExecutors) +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. - * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill + * @return the number of additional executors actually requested. */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors - executorsPendingToRemove.size, --- End diff -- I don't see why we subtract `executorsPendingToRemove` here. If the max is 10, and I currently have 9, and 8 of these are pending to be removed, then this difference is 10 - 8 = 2, which means my new total can only be up to 2. This doesn't make sense though, because my total could really be up to 10. If I'm understanding this correctly we should just do `maxNumExecutors` here. --- If your project is set up for it, you can reply to this
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24388610 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1105,6 +1105,25 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. + * This is currently only supported in Yarn mode. Return whether the request is received. --- End diff -- YARN --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24388465 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -23,10 +23,16 @@ package org.apache.spark private[spark] trait ExecutorAllocationClient { /** + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. + */ + private[spark] def requestTotalExecutors(numExecutors: Int): Boolean + + /** * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged by the cluster manager. + * This is currently only supported in YARN mode. Return whether the request is received. --- End diff -- If we want to add in the javadocs that it's only supported in YARN mode, we should do it for all the methods here or just move this to the class javadocs. I prefer the latter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73641698 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27170/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73641693 [Test build #27170 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27170/consoleFull) for PR 4168 at commit [`37ce77d`](https://github.com/apache/spark/commit/37ce77dd603e9755ddb1cffcdb4eac622cf9e042). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73641562 [Test build #27170 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27170/consoleFull) for PR 4168 at commit [`37ce77d`](https://github.com/apache/spark/commit/37ce77dd603e9755ddb1cffcdb4eac622cf9e042). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73640262 [Test build #27168 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27168/consoleFull) for PR 4168 at commit [`f80b7ec`](https://github.com/apache/spark/commit/f80b7ec16bf3f9ceda58bb2220cc8a681575cd66). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24383247 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -311,18 +303,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste /** * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged. + * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors // Account for executors pending to be added or removed -val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size +val newTotal = numExistingExecutors + numPendingExecutors doRequestTotalExecutors(newTotal) } /** + * Request an additional number of executors from the cluster manager. + * @return whether the request is acknowledged. + */ + final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized { +numPendingExecutors = numExecutors - numExistingExecutors --- End diff -- Disallowing `numPendingExecutors` from being negative could lead to some strange behavior. If I call `requestTotalExecutors(5)` and then `requestExecutors(2)`, I would expect to eventually end up with 7 executors. But if I started with 6 executors, then I would end up with 8. I don't have a strong opinion either way. If we allow it to be negative, maybe it would make sense to change the name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24381198 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -311,18 +303,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste /** * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged. + * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors // Account for executors pending to be added or removed -val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size +val newTotal = numExistingExecutors + numPendingExecutors doRequestTotalExecutors(newTotal) } /** + * Request an additional number of executors from the cluster manager. --- End diff -- the docs are wrong here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73628826 @andrewor14 when I mentioned rederiving `numPendingExecutors` I was actually talking about the way we calculate the version of it that lives in `ExecutorAllocationManager`. Sorry for being unclear here. Taking out the code that decrements it in `CoarseGrainedSchedulerBackend` was just an accidental omission on my part when I added all that code back in. I'll put this code back. I don't have much opinion on where / how in the request executors methods in `CoarseGrainedSchedulerBackend`, `numExecutorsPending` should be updated. Can't tell if you do either, but happy to change it if you do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73625504 @sryza I may be missing something but I don't understand why `numPendingExecutors` should be rederived from the user calling `requestTotalExecutors` every time. What exactly was the issue with decrementing it as we did before? Also I believe the current code is incorrect because the user (or us internally) may call either `requestExecutor` or `requestTotalExecutors`, and now we only reset it for the latter. If we were to do this we should do it in `doRequestTotalExecutors` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24382301 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -23,12 +23,19 @@ package org.apache.spark private[spark] trait ExecutorAllocationClient { /** - * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged by the cluster manager. + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. */ - def requestExecutors(numAdditionalExecutors: Int): Boolean + def requestTotalExecutors(numExecutors: Int): Boolean /** + * Request an additional number of executors from the cluster manager. + * This is currently only supported in Yarn mode. Return whether the request is received. --- End diff -- Holdover from previous doc, but I agree, worth changing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24381293 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -311,18 +303,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste /** * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged. + * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors // Account for executors pending to be added or removed -val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size +val newTotal = numExistingExecutors + numPendingExecutors doRequestTotalExecutors(newTotal) } /** + * Request an additional number of executors from the cluster manager. + * @return whether the request is acknowledged. + */ + final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized { +numPendingExecutors = numExecutors - numExistingExecutors --- End diff -- this could make `numPendingExecutors` negative right? We should probably do a math.min here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24381075 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -23,12 +23,19 @@ package org.apache.spark private[spark] trait ExecutorAllocationClient { /** - * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged by the cluster manager. + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. */ - def requestExecutors(numAdditionalExecutors: Int): Boolean + def requestTotalExecutors(numExecutors: Int): Boolean /** + * Request an additional number of executors from the cluster manager. + * This is currently only supported in Yarn mode. Return whether the request is received. --- End diff -- YARN? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24381071 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -23,12 +23,19 @@ package org.apache.spark private[spark] trait ExecutorAllocationClient { /** - * Request an additional number of executors from the cluster manager. - * Return whether the request is acknowledged by the cluster manager. + * Express a preference to the cluster manager for a given total number of executors. This can + * result in canceling pending requests or filing additional requests. */ - def requestExecutors(numAdditionalExecutors: Int): Boolean + def requestTotalExecutors(numExecutors: Int): Boolean /** + * Request an additional number of executors from the cluster manager. + * This is currently only supported in Yarn mode. Return whether the request is received. + */ + def requestExecutors(numExecutors: Int): Boolean + + +/** --- End diff -- weird indentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73623282 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27138/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73623276 [Test build #27138 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27138/consoleFull) for PR 4168 at commit [`a19e6d2`](https://github.com/apache/spark/commit/a19e6d23346ecfb673e1db81dddb2d0e4242179e). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73614961 [Test build #27138 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27138/consoleFull) for PR 4168 at commit [`a19e6d2`](https://github.com/apache/spark/commit/a19e6d23346ecfb673e1db81dddb2d0e4242179e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73613671 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27129/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73613658 [Test build #27129 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27129/consoleFull) for PR 4168 at commit [`985ffe7`](https://github.com/apache/spark/commit/985ffe7a08ca3fa303789d7f76ef32757df14988). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24375632 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: + * Express a preference to the cluster manager for a given total number of executors. This will + * result in either canceling pending requests or filing additional requests. + * This is currently only supported in Yarn mode. Return whether the request is received. + */ + @DeveloperApi + override def requestTotalExecutors(numExecutors: Int): Boolean = { +assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") +schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => +b.requestTotalExecutors(numExecutors) + case _ => +logWarning("Requesting executors is only supported in coarse-grained mode") +false +} + } + + /** + * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - override def requestExecutors(numAdditionalExecutors: Int): Boolean = { + override def requestExecutors(numExecutors: Int): Boolean = { --- End diff -- Oops this wasn't even intentional, I'll change it back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24375535 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: + * Express a preference to the cluster manager for a given total number of executors. This will + * result in either canceling pending requests or filing additional requests. + * This is currently only supported in Yarn mode. Return whether the request is received. + */ + @DeveloperApi + override def requestTotalExecutors(numExecutors: Int): Boolean = { +assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") +schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => +b.requestTotalExecutors(numExecutors) + case _ => +logWarning("Requesting executors is only supported in coarse-grained mode") +false +} + } + + /** + * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - override def requestExecutors(numAdditionalExecutors: Int): Boolean = { + override def requestExecutors(numExecutors: Int): Boolean = { --- End diff -- I'd argue that `requestExecutors(numExecutors)` could potentially be mistaken for "I want 6 total executors and therefore I call requestExecutor(6)", but if we have `numAdditionalExecutors` there is no possibility for ambiguity --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24373580 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: + * Express a preference to the cluster manager for a given total number of executors. This will + * result in either canceling pending requests or filing additional requests. + * This is currently only supported in Yarn mode. Return whether the request is received. + */ + @DeveloperApi + override def requestTotalExecutors(numExecutors: Int): Boolean = { +assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") +schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => +b.requestTotalExecutors(numExecutors) + case _ => +logWarning("Requesting executors is only supported in coarse-grained mode") +false +} + } + + /** + * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - override def requestExecutors(numAdditionalExecutors: Int): Boolean = { + override def requestExecutors(numExecutors: Int): Boolean = { --- End diff -- Changing this name is also an API break since in Scala you can refer to variables by name. I do agree `numExecutors` seems better since it's obvious that they are additional. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24373541 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1105,16 +1105,35 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: + * Express a preference to the cluster manager for a given total number of executors. This will + * result in either canceling pending requests or filing additional requests. --- End diff -- Is this true? What if I set it to the number I currently have? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73602915 @andrewor14 updated patch adds back `requestExecutors` and incorporates your other comments. I also made some small changes to the way we update `numExecutorsPending` - it now gets rederived from the desired total executors and the number of running executors each time we change the desired total. The result should be the same, but we make it clear that everything flows from the new total and avoid the possibility of bugs that accumulate junk in `numExecutorsPending`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73601805 [Test build #27129 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27129/consoleFull) for PR 4168 at commit [`985ffe7`](https://github.com/apache/spark/commit/985ffe7a08ca3fa303789d7f76ef32757df14988). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24368946 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) + numExecutorsPending += delta + numExecutorsToAdd = 1 + delta +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } --- End diff -- ok, I suppose that is clearer... let's just use that for now and we can revisit this again later if we want --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24367502 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) + numExecutorsPending += delta + numExecutorsToAdd = 1 + delta +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } --- End diff -- I see what you mean. `maybeAddExecutors` would be a little confusing because we might cancel requests. What do you think of `addOrCancelExecutorRequests`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24366945 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * The number of executors we would have if the cluster manager were to fulfill all our requests. + */ + private def targetNumExecutors(): Int = +numExecutorsPending + executorIds.size --- End diff -- I see, that's fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24366884 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) + numExecutorsPending += delta + numExecutorsToAdd = 1 + delta +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } --- End diff -- I see, otherwise it's hard to test it. Maybe we should just rename this then to `maybeAddExecutors` or something. The current name sounds more like it's updating a few local variables rather than actually sending the request. Then in the javadoc we can explain why it's "maybe" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24363707 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * The number of executors we would have if the cluster manager were to fulfill all our requests. + */ + private def targetNumExecutors(): Int = +numExecutorsPending + executorIds.size --- End diff -- This fixes a change in behavior where it became possible for us to request executors beyond the max when executors were pending removal. I noticed this because the "interleaving add and remove" test was failing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24358135 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) + numExecutorsPending += delta + numExecutorsToAdd = 1 + delta +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } --- End diff -- I broke this out and added the return value for testing. My understanding was that `addExecutors` was broken out for a similar reason? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24358128 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * The number of executors we would have if the cluster manager were to fulfill all our requests. + */ + private def targetNumExecutors(): Int = +numExecutorsPending + executorIds.size --- End diff -- an earlier version of this subtracted the number of executors pending to be removed. Is there a reason why you took that out? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24357953 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) + numExecutorsPending += delta + numExecutorsToAdd = 1 + delta +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } + + /** * Request a number of executors from the cluster manager. * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { -// Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { + private def addExecutors(maxNumExecutorsNeeded: Int): Int = { +// Do not request more executors if it would put our target over the upper bound +val currentTarget = targetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded) +val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors) +// Number of additional executors requested at this round --- End diff -- in this round --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- -
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24357759 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -201,18 +201,35 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * The number of executors we would have if the cluster manager were to fulfill all our requests. + */ + private def targetNumExecutors(): Int = +numExecutorsPending + executorIds.size + + /** + * The maximum number of executors we would need under the current load to satisfy all running + * and pending tasks. + */ + private def maxNumExecutorsNeeded(): Int = { +// The maximum number of executors we need under the current load is the total number of +// running or pending tasks, divided by the full task capacity of each executor, rounded up. --- End diff -- this comment is a little redundant given the javadocs. I would just add `rounded up` to the end of the javadoc itself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24357547 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) --- End diff -- why are you doing `math.min` here? Since `maxNeeded < currentTarget`, `maxNeeded - currentTarget` is always going to be negative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24357469 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -224,54 +241,75 @@ private[spark] class ExecutorAllocationManager( } /** + * Check to see whether our existing allocation and the requests we've made previously exceed our + * current needs. If so, let the cluster manager know so that it can cancel pending requests that + * are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * @return the delta in the target number of executors. + */ + private def adjustRequestedExecutors(now: Long): Int = synchronized { +val currentTarget = targetNumExecutors +val maxNeeded = maxNumExecutorsNeeded + +if (maxNeeded < currentTarget) { + // The target number exceeds the number we actually need, so stop adding new + // executors and inform the cluster manager to cancel the extra pending requests. + client.requestTotalExecutors(maxNeeded) + val delta = math.min(maxNeeded - currentTarget, 0) + numExecutorsPending += delta + numExecutorsToAdd = 1 + delta +} else if (addTime != NOT_SET && now >= addTime) { + val delta = addExecutors(maxNeeded) + logDebug(s"Starting timer to add more executors (to " + +s"expire in $sustainedSchedulerBacklogTimeout seconds)") + addTime += sustainedSchedulerBacklogTimeout * 1000 + delta +} else { + 0 +} + } --- End diff -- I actually preferred keeping this in `schedule` as you have done before. We don't ever use the return value here and this doesn't actually hide that much detail from `schedule` as long as we have sufficiently informative comments there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24356890 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -26,7 +26,7 @@ private[spark] trait ExecutorAllocationClient { * Request an additional number of executors from the cluster manager. --- End diff -- this is still outdated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73571161 [Test build #27111 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27111/consoleFull) for PR 4168 at commit [`b78806e`](https://github.com/apache/spark/commit/b78806ee59eaa0fcca99b21df7a96ef1ccb307b3). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73571179 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27111/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73563743 Hey @sryza we definitely need to keep the API because applications building on top of Spark may want to implement their own heuristics (e.g. Spark streaming may want to scale up and down based on the amount of receiver traffic, but not so much on whether there are tasks waiting to be scheduled). By the way I had a chat with @pwendell about this offline. The other option is to just expose both if that makes implementing this patch easier. Even though it's only developer API there are probably already people experimenting with it, and we should make an effort to not break their code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73558148 [Test build #27111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27111/consoleFull) for PR 4168 at commit [`b78806e`](https://github.com/apache/spark/commit/b78806ee59eaa0fcca99b21df7a96ef1ccb307b3). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73557433 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27110/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73557431 [Test build #27110 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27110/consoleFull) for PR 4168 at commit [`aa01555`](https://github.com/apache/spark/commit/aa01555e5eaff3e83ecfa4b5bca89b7d56a3e96e). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73557303 [Test build #27110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27110/consoleFull) for PR 4168 at commit [`aa01555`](https://github.com/apache/spark/commit/aa01555e5eaff3e83ecfa4b5bca89b7d56a3e96e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24313087 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -413,6 +418,7 @@ private[spark] class ExecutorAllocationManager( private val stageIdToNumTasks = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] +private var numRunningTasks: Int = _ --- End diff -- i think that's ok, because onTaskStart and onTaskEnd are consider to speculation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24311057 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager( } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + numRunningTasks += 1 --- End diff -- I take that back on further inspection. It should be synchronized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24310281 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager( } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + numRunningTasks += 1 --- End diff -- Good point. It doesn't need synchronization because only one thread is writing to it, but we should make it volatile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73459200 Thanks for the review @andrewor14. Updated patch coming soon. Regarding the public API, what I'm contending is that we don't really have any idea whether or how people would use `requestExecutors`, so it's not worthwhile to add extra bookkeeping and complexity to support it. I can't personally envision a realistic situation where I would want to use either `requestExecutors` or `requestTotalExecutors`. Maybe it would be better to just remove the API until someone asks for it? Unless you've come across uses for it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73457967 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27077/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73457963 [Test build #27077 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27077/consoleFull) for PR 4168 at commit [`16db9f4`](https://github.com/apache/spark/commit/16db9f4aa94ec28500eff08c6d23a5f4da6a152e). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73456491 @sryza Is the API change in `ExecutorAllocationManager` necessary for this change? The new API essentially pushes the responsibility of maintaining the number of executors that the application currently has to the user. If I as a Spark application want to incrementally add executors, then I must additionally keep track of the number of executors we currently have as we used to do in `CoarseGrainedSchedulerBackend`. I actually don't see a great use case for something like `sc.setTotalExecutors` because it kinda expects the user to know how many executors they think they should need, and this estimation is often difficult. The rest of it looks fairly straightforward. My comments mostly have to do with variable naming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308654 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { + private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized { // Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { +val currentTarget = currentTargetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, + math.min(maxNumExecutors, maxNumNeededExecutors)) +val numRequested = newTotalExecutors - currentTarget --- End diff -- I think it's confusing to have both `numRequested` and `numExecutorsToAdd`. I prefer the old name `actualNumExecutorsToAdd` actually, and there we can add a short comment to explain what constitutes as "actual" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308697 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1029,12 +1029,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * This is currently only supported in Yarn mode. Return whether the request is received. */ @DeveloperApi - override def requestExecutors(numAdditionalExecutors: Int): Boolean = { + override def requestTotalExecutors(numExecutors: Int): Boolean = { --- End diff -- javadoc is outdated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308687 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -413,6 +418,7 @@ private[spark] class ExecutorAllocationManager( private val stageIdToNumTasks = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] +private var numRunningTasks: Int = _ --- End diff -- what are the semantics here? Does this count speculation? In steady state is this guaranteed to go back to 0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73456090 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27073/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73456084 [Test build #27073 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27073/consoleFull) for PR 4168 at commit [`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308610 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending --- End diff -- `maxNumExecutorsNeeded` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308577 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { + private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized { // Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { +val currentTarget = currentTargetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, + math.min(maxNumExecutors, maxNumNeededExecutors)) --- End diff -- this double `math.min` is really hard to read. I would at least put the inner math.min into a separate variable and add a comment to explain what's going on --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308537 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager( t.start() } + def currentTargetNumExecutors(): Int = +numExecutorsPending + executorIds.size - executorsPendingToRemove.size + /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests + * and number of executors running. + * + * First, check to see whether our existing allocation and the requests we've made previously + * exceed our current needs. If so, let the cluster manager know so that it can cancel pending + * requests that are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * Last, if the remove time for an existing executor has expired, kill the executor. + * * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { +// The maximum number of executors we need under the current load is the total number of +// running or pending tasks, divided by the full task capacity of each executor, rounded up. +val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks +val maxNumNeededExecutors = (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor --- End diff -- `maxNumExecutorsNeeded`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308517 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager( t.start() } + def currentTargetNumExecutors(): Int = --- End diff -- I would just call this `targetNumExecutors: Int` to be more concise, and add a javadoc to explain what it is --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308501 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager( t.start() } + def currentTargetNumExecutors(): Int = +numExecutorsPending + executorIds.size - executorsPendingToRemove.size + /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests + * and number of executors running. + * + * First, check to see whether our existing allocation and the requests we've made previously + * exceed our current needs. If so, let the cluster manager know so that it can cancel pending + * requests that are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * Last, if the remove time for an existing executor has expired, kill the executor. + * * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { +// The maximum number of executors we need under the current load is the total number of +// running or pending tasks, divided by the full task capacity of each executor, rounded up. +val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks +val maxNumNeededExecutors = (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + val now = clock.getTimeMillis -if (addTime != NOT_SET && now >= addTime) { - addExecutors() +val currentTarget = currentTargetNumExecutors +if (maxNumNeededExecutors < currentTarget) { + client.requestTotalExecutors(maxNumNeededExecutors) + numExecutorsPending -= math.min(currentTarget - maxNumNeededExecutors, 0) --- End diff -- this looks incorrect. If `currentTarget > maxNumNeededExecutors` the min will always return 0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308485 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager( t.start() } + def currentTargetNumExecutors(): Int = +numExecutorsPending + executorIds.size - executorsPendingToRemove.size + /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests + * and number of executors running. + * + * First, check to see whether our existing allocation and the requests we've made previously + * exceed our current needs. If so, let the cluster manager know so that it can cancel pending + * requests that are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * Last, if the remove time for an existing executor has expired, kill the executor. + * * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { +// The maximum number of executors we need under the current load is the total number of +// running or pending tasks, divided by the full task capacity of each executor, rounded up. +val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks +val maxNumNeededExecutors = (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + val now = clock.getTimeMillis -if (addTime != NOT_SET && now >= addTime) { - addExecutors() +val currentTarget = currentTargetNumExecutors +if (maxNumNeededExecutors < currentTarget) { + client.requestTotalExecutors(maxNumNeededExecutors) + numExecutorsPending -= math.min(currentTarget - maxNumNeededExecutors, 0) + numExecutorsToAdd = 1 --- End diff -- can you add a comment on this block to explain what's going on, maybe something like ``` // The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requests ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308449 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { + private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized { // Do not request more executors if we have already reached the upper bound -val numExistingExecutors = executorIds.size + numExecutorsPending -if (numExistingExecutors >= maxNumExecutors) { +val currentTarget = currentTargetNumExecutors +if (currentTarget >= maxNumExecutors) { logDebug(s"Not adding executors because there are already ${executorIds.size} " + s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") --- End diff -- you may have to update this comment because it doesn't take into account executors pending to be removed, so the math doesn't quite add up right --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308401 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager( t.start() } + def currentTargetNumExecutors(): Int = +numExecutorsPending + executorIds.size - executorsPendingToRemove.size + /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests --- End diff -- Oops definitely --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r24308360 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -198,15 +198,38 @@ private[spark] class ExecutorAllocationManager( t.start() } + def currentTargetNumExecutors(): Int = +numExecutorsPending + executorIds.size - executorsPendingToRemove.size + /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests --- End diff -- I don't think relegate here is the right word. Did you mean regulate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73455047 [Test build #27077 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27077/consoleFull) for PR 4168 at commit [`16db9f4`](https://github.com/apache/spark/commit/16db9f4aa94ec28500eff08c6d23a5f4da6a152e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73451518 [Test build #27073 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27073/consoleFull) for PR 4168 at commit [`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8). * This patch **does not merge cleanly**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73451192 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73155456 @andrewor14 it's no longer a WIP, and I am aiming for it for 1.3. I just updated the title - sorry for the confusion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-73154727 @sryza is this still WIP? Are we aiming for this to go into 1.3? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23683544 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { --- End diff -- i think we need to redefine a addExecutors function that have used in ExecutorAllocationManagerSuite. This function can be called to add executors at any time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23682980 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,50 +249,32 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { + private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized { --- End diff -- i think addExecutors donot need to return Int value. It is unnecessary because the returned value have not be used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23682774 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -470,6 +477,7 @@ private[spark] class ExecutorAllocationManager( } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + numRunningTasks -= 1 --- End diff -- this also should be synchronized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23682749 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -438,6 +444,7 @@ private[spark] class ExecutorAllocationManager( } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + numRunningTasks += 1 --- End diff -- i think we should let numRunningTasks to synchronized,because schedule thread and listener thread are two different thread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-71757464 @sryza looks like the test failures are legit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-71697091 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26168/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-71697071 [Test build #26168 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26168/consoleFull) for PR 4168 at commit [`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4168#issuecomment-71687375 [Test build #26168 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26168/consoleFull) for PR 4168 at commit [`9ba0e01`](https://github.com/apache/spark/commit/9ba0e0161e4554839c6dbf3a097b69af3de263b8). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23566205 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -199,14 +199,31 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests + * and number of executors running. + * + * First, check to see whether our existing allocation and the requests we've made previously + * exceed our current needs. If so, let the cluster manager know so that it can cancel pending + * requests that are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * Last, if the remove time for an existing executor has expired, kill the executor. * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { +// The maximum number of executors we need under the current load is the total number of +// running or pending tasks, divided by the full task capacity of each executor, rounded up. +val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks +val maxNumNeededExecutors = (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + val now = clock.getTimeMillis -if (addTime != NOT_SET && now >= addTime) { - addExecutors() +if (maxNumNeededExecutors < numExecutorsPending + executorIds.size) { --- End diff -- Good point, I think you are right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23502091 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -226,8 +243,11 @@ private[spark] class ExecutorAllocationManager( * If the cap on the number of executors is reached, give up and reset the * number of executors to add next round instead of continuing to double it. * Return the number actually requested. + * + * @param maxNumNeededExecutors the maximum number of executors all currently running or pending + * tasks could fill */ - private def addExecutors(): Int = synchronized { + private def addExecutors(maxNumNeededExecutors: Int): Int = synchronized { // Do not request more executors if we have already reached the upper bound val numExistingExecutors = executorIds.size + numExecutorsPending --- End diff -- i think there we should make numExistingExecutors exclude executorsPendingToRemove.size? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23502082 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -237,39 +257,18 @@ private[spark] class ExecutorAllocationManager( return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val newTotalExecutors = math.min(numExistingExecutors + numExecutorsToAdd, --- End diff -- i think there we should make numExistingExecutors exclude executorsPendingToRemove.size? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23502033 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -237,39 +257,18 @@ private[spark] class ExecutorAllocationManager( return 0 } -// The number of executors needed to satisfy all pending tasks is the number of tasks pending -// divided by the number of tasks each executor can fit, rounded up. -val maxNumExecutorsPending = - (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor -if (numExecutorsPending >= maxNumExecutorsPending) { - logDebug(s"Not adding executors because there are already $numExecutorsPending " + -s"pending and pending tasks could only fill $maxNumExecutorsPending") - numExecutorsToAdd = 1 - return 0 -} - -// It's never useful to request more executors than could satisfy all the pending tasks, so -// cap request at that amount. -// Also cap request with respect to the configured upper bound. -val maxNumExecutorsToAdd = math.min( - maxNumExecutorsPending - numExecutorsPending, - maxNumExecutors - numExistingExecutors) -assert(maxNumExecutorsToAdd > 0) - -val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) - -val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd -val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd) +val newTotalExecutors = math.min(numExistingExecutors + numExecutorsToAdd, + math.min(maxNumExecutors, maxNumNeededExecutors)) +val numRequested = newTotalExecutors - numExistingExecutors +val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors) --- End diff -- i think there we should exclude executorsPendingToRemove.size from newTotalExecutors? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4136. Under dynamic allocation, cancel o...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4168#discussion_r23502021 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -199,14 +199,31 @@ private[spark] class ExecutorAllocationManager( } /** - * If the add time has expired, request new executors and refresh the add time. - * If the remove time for an existing executor has expired, kill the executor. + * This is called at a fixed interval to relegate the number of pending executor requests + * and number of executors running. + * + * First, check to see whether our existing allocation and the requests we've made previously + * exceed our current needs. If so, let the cluster manager know so that it can cancel pending + * requests that are unneeded. + * + * If not, and the add time has expired, see if we can request new executors and refresh the add + * time. + * + * Last, if the remove time for an existing executor has expired, kill the executor. * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { +// The maximum number of executors we need under the current load is the total number of +// running or pending tasks, divided by the full task capacity of each executor, rounded up. +val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks +val maxNumNeededExecutors = (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + val now = clock.getTimeMillis -if (addTime != NOT_SET && now >= addTime) { - addExecutors() +if (maxNumNeededExecutors < numExecutorsPending + executorIds.size) { --- End diff -- Do we need to exclude executorsPendingToRemove.size?because YarnAllocator have killed toRemoveExecutors, but ExecutorAllocationManager maybe donot receive onExecutorRemoved message. so that time executorIds has removed executors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org