[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169420133
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager(
 
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget < oldNumExecutorsTarget) {
+// We lower the target number of executors but don't actively kill 
any yet.  We do this
--- End diff --

I'm not sure I follow this comment.

From my reading of it, it's saying that you don't want to kill executors 
because you don't want to immediately get a new one to replace it. But how can 
that happen, if you're also lowering the target number?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169420544
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager(
 val executorsRemoved = if (testing) {
   executorIdsToBeRemoved
 } else {
-  client.killExecutors(executorIdsToBeRemoved)
+  // We don't want to change our target number of executors, because 
we already did that
+  // when the task backlog decreased.  Normally there wouldn't be any 
tasks running on these
+  // executors, but maybe the scheduler *just* decided to run a task 
there -- in that case,
--- End diff --

Also don't follow this part. If `force = false`, doesn't that mean the 
executor won't be killed if tasks are running on it? So wouldn't 
`countFailures` be meaningless in that context?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169421040
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

This is a developer api, so probably ok, but this is a change in behavior. 
Is it just not possible to support this with dynamic allocation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169436456
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager(
 
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget < oldNumExecutorsTarget) {
+// We lower the target number of executors but don't actively kill 
any yet.  We do this
--- End diff --

I was trying to answer a different question -- if we don't kill the 
executor now, why even bother lowering the target number?  as that would be an 
alternative solution -- don't adjust the target number here at all, just wait 
until you kill the executors for being idle.  (and really I'm just guessing at 
the logic.)

lemme try to reword this some ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169437530
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

What would calling this mean with dynamic allocation on?  Note this api 
explicitly says its meant to adjust resource usage downwards.  If you've got 
just one executor, and then you kill it, should your app sit with 0 executors?  
Or even if you've got 10 executors, and you kill one -- when is dynamic 
allocation allowed to bump the total back up?  I can't think of useful clear 
semantics for this

(though this is not necessary to fix the bug, I could pull this out and 
move to a discussion in a new jira)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169438419
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager(
 val executorsRemoved = if (testing) {
   executorIdsToBeRemoved
 } else {
-  client.killExecutors(executorIdsToBeRemoved)
+  // We don't want to change our target number of executors, because 
we already did that
+  // when the task backlog decreased.  Normally there wouldn't be any 
tasks running on these
+  // executors, but maybe the scheduler *just* decided to run a task 
there -- in that case,
--- End diff --

good point,  I didn't look closely enough at the semantics of `force`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169452326
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

I'm not sure why you'd use this with dynamic allocation, but it's been 
possible in the past. It's probably ok to change this though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170102582
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(
 
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget < oldNumExecutorsTarget) {
+// We lower the target number of executors but don't actively kill 
any yet.  Killing is
+// controlled separately by an idle timeout.  Its still helpful to 
reduce the target number
+// in case an executor just happens to get lost (eg., bad 
hardware, or the cluster manager
+// preempts it) -- in that case, there is no point in trying to 
immediately  get a new
+// executor, since we couldn't even use it yet.
--- End diff --

s/couldn't/wouldn't


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170102363
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is 
considered a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards 
the limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of 
executors will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
+   *  those failures be counted to task failure limits?
--- End diff --

nit: "whether to count those failures toward task failure limits"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170103841
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is 
considered a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards 
the limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of 
executors will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
--- End diff --

I'm still a little confused about this parameter.

If `force = false`, it's a no op. And all call sites I've seen seem to set 
this parameter to `false`. So is there something I'm missing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170102063
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(
 
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget < oldNumExecutorsTarget) {
+// We lower the target number of executors but don't actively kill 
any yet.  Killing is
+// controlled separately by an idle timeout.  Its still helpful to 
reduce the target number
--- End diff --

nit: it's


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170383918
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is 
considered a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards 
the limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of 
executors will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
--- End diff --

whoops, I was supposed to set `countFailures = true` in 
`sc.killAndReplaceExecutors`, thanks for catching that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170650225
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   val executorsToKill = knownExecutors
 .filter { id => !executorsPendingToRemove.contains(id) }
 .filter { id => force || !scheduler.isExecutorBusy(id) }
-  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!countFailures}
--- End diff --

super nit: space before `}`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170650217
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   val executorsToKill = knownExecutors
 .filter { id => !executorsPendingToRemove.contains(id) }
 .filter { id => force || !scheduler.isExecutorBusy(id) }
-  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!countFailures}
--- End diff --

super nit: space before `}`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20604


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-04-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r185005638
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

My point in general is that the semantics of combining 
`SparkContext.killExecutors()` (which is a publicly visible function, which the 
end user can call) with dynamic allocation aren't well defined, and I have no 
idea what the behavior really should be.  I was giving some examples of weird 
behavior.  

>> If you've got just one executor, and then you kill it, should your app 
sit with 0 executors?

> if app sit with 0 executors, then pending tasks increase, which lead to 
ExecutorAllocationManager increases target number of executors. So, app will 
not always sit with 0 executors.

Thats true -- but only *when* pending tasks increase.  But if you've got 0 
executors, how do you expect pending tasks to increase?  That would only happen 
when another taskset gets submitted, but with no executors your spark program 
will probably just be blocked.

In the other case, I'm just trying to point out strange interactions 
between user control and dynamic allocation control.  Imagine this sequence:

Dynamic Allocation: 1000 tasks, so 1000 executors
User: I only want 10 executors, so let me tell spark to kill 990 of them
...
... another taskset is submitted to add 1 more task ...
Dynamic Allocation: 1001 tasks, so 1001 executors
User: ??? I set the target to 10 executors, what happened?


>  So, if we are not using ExecutorAllocationManager, allocation client 
will request requestedTotalExecutors = 0 number of executors to cluster manager 
(this is really terrible)

hmm, from a quick look, I think you're right.  it doesn't seem that using 
`sc.killExecutors()` doesn't make sense even with dynamic allocation off.  I 
think `CoarseGrainedSchedulerBackend` should actually initiliaze 
`requestedTotalExecutors` with 
`SchedulerBackendUtils.getInitialTargetExecutorNumber`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-04-30 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r185159109
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

Hi @squito , thanks for your reply.

> but only *when* pending tasks increase.

`ExecutorAllocationManager ` will check pending (or backlog) tasks 
periodically. So, we do not have to wait for *increment* actually.

And for `Dynamic Allocation` & `User` case, yeah, that's hard to define. 

And I checked `SchedulerBackendUtils.getInitialTargetExecutorNumbe`, it set 
`DEFAULT_NUMBER_EXECUTORS` = 2. But, this is not consistent with `Master`, 
which set `executorLimit` to `Int.MaxValue` if we are not under dynamic 
allocation mode. Maybe we can just init `requestedTotalExecutors ` with 
`Int.MaxValue`(only when we are not under dynamic allocation mode). 
Or, we do not call `doRequestTotalExecutors` if we call `requestExecutors` 
or `killExecutors`, except `requestTotalExecutors`(only when we are not under 
dynamic allocation mode).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-05-07 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r186425765
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

@squito any thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r182086337
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

Hi, @squito , I'm quite questioned about the cases:
>  If you've got just one executor, and then you kill it, should your app 
sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to 
`ExecutorAllocationManager` increases target number of executors. So, app will 
not always sit with 0 executors.

> Or even if you've got 10 executors, and you kill one -- when is dynamic 
allocation allowed to bump the total back up?

for this case, to be honest, I really do not get your point. But, it must 
blame my poor English.

And, what will happens if we use this method without 
`ExecutorAllocationManager `? Or do we really need adjust TargetNumExecutors 
(set `adjustTargetNumExecutors  = true` below) if we are not using 
`ExecutorAllocationManager `?

see these several lines in `killExecutors()`:
```
if (adjustTargetNumExecutors) {
  requestedTotalExecutors = math.max(requestedTotalExecutors - 
executorsToKill.size, 0)
  ...
  doRequestTotalExecutors(requestedTotalExecutors)
}
```
Set `adjustTargetNumExecutors  = true` will change 
`requestedTotalExecutors` . And IIUC, `requestedTotalExecutors ` is only used 
in dynamic allocation mode. So, if we are not  using `ExecutorAllocationManager 
`, allocation client will request `requestedTotalExecutors = 0`  number of 
executors to cluster manager (this is really terrible). But, actually, app 
without `ExecutorAllocationManager ` do not have a limit requesting executors 
(in default).

Actually, I think this series methods, including `killAndReplaceExecutor `, 
 `requestExecutors`, etc, are designed with dynamic allocation mode. And if we 
still want use these methods while app do not use `ExecutorAllocationManager`, 
we should not change `requestedTotalExecutors `, or even not request cluster 
manager with a specific number.

WDYT?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org