[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r186425765 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- @squito any thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r185159109 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- Hi @squito , thanks for your reply. > but only *when* pending tasks increase. `ExecutorAllocationManager ` will check pending (or backlog) tasks periodically. So, we do not have to wait for *increment* actually. And for `Dynamic Allocation` & `User` case, yeah, that's hard to define. And I checked `SchedulerBackendUtils.getInitialTargetExecutorNumbe`, it set `DEFAULT_NUMBER_EXECUTORS` = 2. But, this is not consistent with `Master`, which set `executorLimit` to `Int.MaxValue` if we are not under dynamic allocation mode. Maybe we can just init `requestedTotalExecutors ` with `Int.MaxValue`(only when we are not under dynamic allocation mode). Or, we do not call `doRequestTotalExecutors` if we call `requestExecutors` or `killExecutors`, except `requestTotalExecutors`(only when we are not under dynamic allocation mode). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r185005638 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- My point in general is that the semantics of combining `SparkContext.killExecutors()` (which is a publicly visible function, which the end user can call) with dynamic allocation aren't well defined, and I have no idea what the behavior really should be. I was giving some examples of weird behavior. >> If you've got just one executor, and then you kill it, should your app sit with 0 executors? > if app sit with 0 executors, then pending tasks increase, which lead to ExecutorAllocationManager increases target number of executors. So, app will not always sit with 0 executors. Thats true -- but only *when* pending tasks increase. But if you've got 0 executors, how do you expect pending tasks to increase? That would only happen when another taskset gets submitted, but with no executors your spark program will probably just be blocked. In the other case, I'm just trying to point out strange interactions between user control and dynamic allocation control. Imagine this sequence: Dynamic Allocation: 1000 tasks, so 1000 executors User: I only want 10 executors, so let me tell spark to kill 990 of them ... ... another taskset is submitted to add 1 more task ... Dynamic Allocation: 1001 tasks, so 1001 executors User: ??? I set the target to 10 executors, what happened? > So, if we are not using ExecutorAllocationManager, allocation client will request requestedTotalExecutors = 0 number of executors to cluster manager (this is really terrible) hmm, from a quick look, I think you're right. it doesn't seem that using `sc.killExecutors()` doesn't make sense even with dynamic allocation off. I think `CoarseGrainedSchedulerBackend` should actually initiliaze `requestedTotalExecutors` with `SchedulerBackendUtils.getInitialTargetExecutorNumber` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r182086337 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- Hi, @squito , I'm quite questioned about the cases: > If you've got just one executor, and then you kill it, should your app sit with 0 executors? if app sit with 0 executors, then pending tasks increase, which lead to `ExecutorAllocationManager` increases target number of executors. So, app will not always sit with 0 executors. > Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? for this case, to be honest, I really do not get your point. But, it must blame my poor English. And, what will happens if we use this method without `ExecutorAllocationManager `? Or do we really need adjust TargetNumExecutors (set `adjustTargetNumExecutors = true` below) if we are not using `ExecutorAllocationManager `? see these several lines in `killExecutors()`: ``` if (adjustTargetNumExecutors) { requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0) ... doRequestTotalExecutors(requestedTotalExecutors) } ``` Set `adjustTargetNumExecutors = true` will change `requestedTotalExecutors` . And IIUC, `requestedTotalExecutors ` is only used in dynamic allocation mode. So, if we are not using `ExecutorAllocationManager `, allocation client will request `requestedTotalExecutors = 0` number of executors to cluster manager (this is really terrible). But, actually, app without `ExecutorAllocationManager ` do not have a limit requesting executors (in default). Actually, I think this series methods, including `killAndReplaceExecutor `, `requestExecutors`, etc, are designed with dynamic allocation mode. And if we still want use these methods while app do not use `ExecutorAllocationManager`, we should not change `requestedTotalExecutors `, or even not request cluster manager with a specific number. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20604 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170650217 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } - executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures} --- End diff -- super nit: space before `}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170650225 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } - executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures} --- End diff -- super nit: space before `}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170383918 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. * - * When asking the executor to be replaced, the executor loss is considered a failure, and - * killed tasks that are running on the executor will count towards the failure limits. If no - * replacement is being requested, then the tasks will not count towards the limit. - * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones, default false + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been killed + * @param countFailures if there are tasks running on the executors when they are killed, whether --- End diff -- whoops, I was supposed to set `countFailures = true` in `sc.killAndReplaceExecutors`, thanks for catching that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170103841 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. * - * When asking the executor to be replaced, the executor loss is considered a failure, and - * killed tasks that are running on the executor will count towards the failure limits. If no - * replacement is being requested, then the tasks will not count towards the limit. - * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones, default false + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been killed + * @param countFailures if there are tasks running on the executors when they are killed, whether --- End diff -- I'm still a little confused about this parameter. If `force = false`, it's a no op. And all call sites I've seen seem to set this parameter to `false`. So is there something I'm missing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170102063 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager( // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget < oldNumExecutorsTarget) { +// We lower the target number of executors but don't actively kill any yet. Killing is +// controlled separately by an idle timeout. Its still helpful to reduce the target number --- End diff -- nit: it's --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170102582 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager( // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget < oldNumExecutorsTarget) { +// We lower the target number of executors but don't actively kill any yet. Killing is +// controlled separately by an idle timeout. Its still helpful to reduce the target number +// in case an executor just happens to get lost (eg., bad hardware, or the cluster manager +// preempts it) -- in that case, there is no point in trying to immediately get a new +// executor, since we couldn't even use it yet. --- End diff -- s/couldn't/wouldn't --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170102363 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. * - * When asking the executor to be replaced, the executor loss is considered a failure, and - * killed tasks that are running on the executor will count towards the failure limits. If no - * replacement is being requested, then the tasks will not count towards the limit. - * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones, default false + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been killed + * @param countFailures if there are tasks running on the executors when they are killed, whether + * those failures be counted to task failure limits? --- End diff -- nit: "whether to count those failures toward task failure limits" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169452326 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- I'm not sure why you'd use this with dynamic allocation, but it's been possible in the past. It's probably ok to change this though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169438419 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager( val executorsRemoved = if (testing) { executorIdsToBeRemoved } else { - client.killExecutors(executorIdsToBeRemoved) + // We don't want to change our target number of executors, because we already did that + // when the task backlog decreased. Normally there wouldn't be any tasks running on these + // executors, but maybe the scheduler *just* decided to run a task there -- in that case, --- End diff -- good point, I didn't look closely enough at the semantics of `force` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169437530 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- What would calling this mean with dynamic allocation on? Note this api explicitly says its meant to adjust resource usage downwards. If you've got just one executor, and then you kill it, should your app sit with 0 executors? Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? I can't think of useful clear semantics for this (though this is not necessary to fix the bug, I could pull this out and move to a discussion in a new jira) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169436456 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager( // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget < oldNumExecutorsTarget) { +// We lower the target number of executors but don't actively kill any yet. We do this --- End diff -- I was trying to answer a different question -- if we don't kill the executor now, why even bother lowering the target number? as that would be an alternative solution -- don't adjust the target number here at all, just wait until you kill the executors for being idle. (and really I'm just guessing at the logic.) lemme try to reword this some ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169421040 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- This is a developer api, so probably ok, but this is a change in behavior. Is it just not possible to support this with dynamic allocation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169420133 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager( // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget < oldNumExecutorsTarget) { +// We lower the target number of executors but don't actively kill any yet. We do this --- End diff -- I'm not sure I follow this comment. From my reading of it, it's saying that you don't want to kill executors because you don't want to immediately get a new one to replace it. But how can that happen, if you're also lowering the target number? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169420544 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager( val executorsRemoved = if (testing) { executorIdsToBeRemoved } else { - client.killExecutors(executorIdsToBeRemoved) + // We don't want to change our target number of executors, because we already did that + // when the task backlog decreased. Normally there wouldn't be any tasks running on these + // executors, but maybe the scheduler *just* decided to run a task there -- in that case, --- End diff -- Also don't follow this part. If `force = false`, doesn't that mean the executor won't be killed if tasks are running on it? So wouldn't `countFailures` be meaningless in that context? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org