[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2018-09-11 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r216597619
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
 // If our target has not changed, do not send a message
 // to the cluster manager and reset our exponential growth
 if (delta == 0) {
-  numExecutorsToAdd = 1
-  return 0
+  // Check if there is any speculative jobs pending
+  if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 
0) {
+numExecutorsTarget =
+  math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), 
minNumExecutors)
--- End diff --

Also confused by `+1` here. And I think we have already task 
`pendingSpeculativeTasks` into account @advancedxy :

```
def totalPendingTasks(): Int = {
  pendingTasks + pendingSpeculativeTasks
}
```
Seems this check is redundant.
And it doesn't sync to CM if `numExecutorsTarget ` change(after `+1`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2018-04-16 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r181706342
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
 // If our target has not changed, do not send a message
 // to the cluster manager and reset our exponential growth
 if (delta == 0) {
-  numExecutorsToAdd = 1
-  return 0
+  // Check if there is any speculative jobs pending
+  if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 
0) {
+numExecutorsTarget =
+  math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), 
minNumExecutors)
--- End diff --

Same here. 
`maxNumExecutorsNeeded + 1` doesn't quite make sense.
@janewangfb could you please post/update some comments here? And I wonder 
why we didn't take `pendingSpeculativeTasks` into account when calculating 
`maxNumExecutorsNeeded()`

Or @jerryshao, do you know the rationale?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-10-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r146190420
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
 // If our target has not changed, do not send a message
 // to the cluster manager and reset our exponential growth
 if (delta == 0) {
-  numExecutorsToAdd = 1
-  return 0
+  // Check if there is any speculative jobs pending
+  if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 
0) {
+numExecutorsTarget =
+  math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), 
minNumExecutors)
--- End diff --

@janewangfb Would you please explain why here `+ 1` if there's pending 
speculativeTasks, should the number of executors be calculated based on the 
number of pending tasks? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18492


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-22 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134637603
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+assert(numExecutorsTarget(manager) === 0)
+assert(numExecutorsToAdd(manager) === 1)
+assert(addExecutors(manager) === 1)
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+assert(numExecutorsTarget(manager) === 1)
+assert(numExecutorsToAdd(manager) === 2)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 3)
+assert(numExecutorsToAdd(manager) === 4)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 5)
+assert(numExecutorsToAdd(manager) === 1)
+
+// Verify that running a task doesn't affect the target
--- End diff --

speculative task is also a task which needs executor to execute it, so, 
when we calculate how many executors are needed, we need to count the 
speculative tasks. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134634721
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+assert(numExecutorsTarget(manager) === 0)
+assert(numExecutorsToAdd(manager) === 1)
+assert(addExecutors(manager) === 1)
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+assert(numExecutorsTarget(manager) === 1)
+assert(numExecutorsToAdd(manager) === 2)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 3)
+assert(numExecutorsToAdd(manager) === 4)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 5)
+assert(numExecutorsToAdd(manager) === 1)
+
+// Verify that running a task doesn't affect the target
--- End diff --

then why speculative task submission adds running/appending jobs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-22 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134574304
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+assert(numExecutorsTarget(manager) === 0)
+assert(numExecutorsToAdd(manager) === 1)
+assert(addExecutors(manager) === 1)
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+assert(numExecutorsTarget(manager) === 1)
+assert(numExecutorsToAdd(manager) === 2)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 3)
+assert(numExecutorsToAdd(manager) === 4)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 5)
+assert(numExecutorsToAdd(manager) === 1)
+
+// Verify that running a task doesn't affect the target
--- End diff --

It is because we use the sum of (running + appending) jobs to calculate how 
many executors are needed (maxNumExecutorsNeeded). so wether a task is pending 
or running, the executors needed are the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134387763
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+assert(numExecutorsTarget(manager) === 0)
+assert(numExecutorsToAdd(manager) === 1)
+assert(addExecutors(manager) === 1)
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+assert(numExecutorsTarget(manager) === 1)
+assert(numExecutorsToAdd(manager) === 2)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 3)
+assert(numExecutorsToAdd(manager) === 4)
+assert(addExecutors(manager) === 2)
+assert(numExecutorsTarget(manager) === 5)
+assert(numExecutorsToAdd(manager) === 1)
+
+// Verify that running a task doesn't affect the target
--- End diff --

can you explain more about this test? Why the first 3 
`SparkListenerSpeculativeTaskSubmitted` events can trigger to allocate more 
executors, but here we don't?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-21 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134336371
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+assert(numExecutorsTarget(manager) === 0)
+assert(numExecutorsToAdd(manager) === 1)
+assert(addExecutors(manager) === 1)
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
--- End diff --

In real life, it is possible that a job has multiple stages, one stage is 
still running some tasks but the next stage starts already.  This test tries to 
micmic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-21 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134334889
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
--- End diff --

That is not likely. Speculative job is only submitted when certain 
percentage of jobs have finished successfully.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134243239
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
--- End diff --

is it a possible case? the first event is speculative task submitted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r134242290
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+sc = createSparkContext(0, 10, 0)
+val manager = sc.executorAllocationManager.get
+
+// Verify that we're capped at number of tasks including the 
speculative ones in the stage
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+assert(numExecutorsTarget(manager) === 0)
+assert(numExecutorsToAdd(manager) === 1)
+assert(addExecutors(manager) === 1)
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
--- End diff --

why the stage submitted event is posted after speculative task submitted 
event?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-17 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133786071
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
--- End diff --

I grepped, and dont think we have events related to normal task submitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133774907
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
--- End diff --

oh i see. So we don't track the submission of normal tasks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-17 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133774366
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
--- End diff --

I would keep the name onSpeculativeTaskSubmitted. Because when the event 
happens, it only submit a speculative task to be launched in the future, the 
task has not started yet. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-17 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133773675
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -980,10 +1014,12 @@ private object ExecutorAllocationManagerSuite 
extends PrivateMethodTester {
   taskLocalityPreferences = taskLocalityPreferences)
   }
 
-  private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: 
String): TaskInfo = {
-new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", 
TaskLocality.ANY, speculative = false)
+  private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: 
String,
--- End diff --

updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133726365
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -980,10 +1014,12 @@ private object ExecutorAllocationManagerSuite 
extends PrivateMethodTester {
   taskLocalityPreferences = taskLocalityPreferences)
   }
 
-  private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: 
String): TaskInfo = {
-new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", 
TaskLocality.ANY, speculative = false)
+  private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: 
String,
--- End diff --

nit:
```
private def (
para1: XX,
para2: XX)
```

4 spaces indention for the parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133726091
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
--- End diff --

For normal task, we have `onTaskStart`, `onTaskEnd`, etc. but don't have 
`onTaskSubmitted`. Shall we make the name consistent for speculative task?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-15 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r133355548
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +297,16 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
+
+  /**
+   * Called when an extra executor is needed
+   */
+  def onExtraExecutorNeeded(): Unit
--- End diff --

@cloud-fan after thoughts, yes, I think we can get rid of 
extraExecutorNeeded event and handle it in ExecutorAllocationManager.scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r132628227
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +297,16 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
+
+  /**
+   * Called when an extra executor is needed
+   */
+  def onExtraExecutorNeeded(): Unit
--- End diff --

do you know how executor allocation manager adjust the number of executors 
currently? can we follow it instead of hacking the `SparkListener`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r132628016
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -291,6 +297,16 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted): Unit
+
+  /**
+   * Called when an extra executor is needed
+   */
+  def onExtraExecutorNeeded(): Unit
--- End diff --

I'm hesitant to add this. `SparkListener` is public and should be the 
interface to listen to various Spark internal events, and do whatever they 
want. However, `onExtraExecutorNeeded` sounds like something Spark asks the 
listener to do, which is against the pattern.

In another word, `onExtraExecutorNeeded` looks specific to the executor 
allocation manager, but not a general spark event.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-07 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r131822909
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -53,6 +53,12 @@ case class SparkListenerTaskStart(stageId: Int, 
stageAttemptId: Int, taskInfo: T
 case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends 
SparkListenerEvent
 
 @DeveloperApi
+case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends 
SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExtraExecutorNeeded() extends SparkListenerEvent
--- End diff --

case object wont compile. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-07 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r131820476
  
--- Diff: core/src/main/java/org/apache/spark/SparkFirehoseListener.java ---
@@ -140,6 +140,16 @@ public void onBlockUpdated(SparkListenerBlockUpdated 
blockUpdated) {
   }
 
   @Override
+  public void 
onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted 
speculativeTask) {
+onEvent(speculativeTask);
+  }
+
+  @Override
+  public void onExtraExecutorNeeded() {
+onEvent(null);
--- End diff --

ok. updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r131820095
  
--- Diff: core/src/main/java/org/apache/spark/SparkFirehoseListener.java ---
@@ -140,6 +140,16 @@ public void onBlockUpdated(SparkListenerBlockUpdated 
blockUpdated) {
   }
 
   @Override
+  public void 
onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted 
speculativeTask) {
+onEvent(speculativeTask);
+  }
+
+  @Override
+  public void onExtraExecutorNeeded() {
+onEvent(null);
--- End diff --

`SparkFirehoseListener` is a public API and users may assume `onEvent` will 
never accept null, how about we just do nothing here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r131819639
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -53,6 +53,12 @@ case class SparkListenerTaskStart(stageId: Int, 
stageAttemptId: Int, taskInfo: T
 case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends 
SparkListenerEvent
 
 @DeveloperApi
+case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends 
SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExtraExecutorNeeded() extends SparkListenerEvent
--- End diff --

case object?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-07 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r131818512
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invoked when an extra executor is needed (See SPARK-19326)
+   */
+  private def onExtraExecutorNeeded(): Unit = synchronized {
+val maxNeeded = math.max(math.min(maxNumExecutorsNeeded + 1, 
maxNumExecutors), minNumExecutors)
+val addRequestAcknowledged = testing ||
+  client.requestTotalExecutors(maxNeeded, localityAwareTasks, 
hostToLocalTaskCount)
+if (addRequestAcknowledged) {
+  logInfo(s"Requesting one new executor because speculative tasks are 
backlogged")
+}
+  }
+
+  /**
* A listener that notifies the given allocation manager of when to add 
and remove executors.
*
* This class is intentionally conservative in its assumptions about the 
relative ordering
-   * and consistency of events returned by the listener. For simplicity, 
it does not account
-   * for speculated tasks.
+   * and consistency of events returned by the listener.
*/
   private class ExecutorAllocationListener extends SparkListener {
 
 private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
 private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
 private val executorIdToTaskIds = new mutable.HashMap[String, 
mutable.HashSet[Long]]
-// Number of tasks currently running on the cluster.  Should be 0 when 
no stages are active.
+// Number of tasks currently running on the cluster including 
speculative tasks.
+// Should be 0 when no stages are active.
 private var numRunningTasks: Int = _
 
+private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, 
Int]
--- End diff --

Added comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r131602232
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invoked when an extra executor is needed (See SPARK-19326)
+   */
+  private def onExtraExecutorNeeded(): Unit = synchronized {
+val maxNeeded = math.max(math.min(maxNumExecutorsNeeded + 1, 
maxNumExecutors), minNumExecutors)
+val addRequestAcknowledged = testing ||
+  client.requestTotalExecutors(maxNeeded, localityAwareTasks, 
hostToLocalTaskCount)
+if (addRequestAcknowledged) {
+  logInfo(s"Requesting one new executor because speculative tasks are 
backlogged")
+}
+  }
+
+  /**
* A listener that notifies the given allocation manager of when to add 
and remove executors.
*
* This class is intentionally conservative in its assumptions about the 
relative ordering
-   * and consistency of events returned by the listener. For simplicity, 
it does not account
-   * for speculated tasks.
+   * and consistency of events returned by the listener.
*/
   private class ExecutorAllocationListener extends SparkListener {
 
 private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
 private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
 private val executorIdToTaskIds = new mutable.HashMap[String, 
mutable.HashSet[Long]]
-// Number of tasks currently running on the cluster.  Should be 0 when 
no stages are active.
+// Number of tasks currently running on the cluster including 
speculative tasks.
+// Should be 0 when no stages are active.
 private var numRunningTasks: Int = _
 
+private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, 
Int]
--- End diff --

at the first glance I thought `stageIdToNumSpeculativeTasks` is just 
`stageIdToSpeculativeTaskIndices.mapValues(_.size)`, but seems it's not true, 
can you add some comments to explain these 2 variables?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-20 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128683971
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invoked when an extra executor is needed (See SPARK-19326)
+   */
+  private def onExtraExecutorNeeded(): Unit = synchronized {
+val maxNeeded = math.max(math.min(maxNumExecutorsNeeded + 1, 
maxNumExecutors), minNumExecutors)
--- End diff --

we cannot just ensure there are more than two active executors left. it 
will depends on if there is any speculative jobs not launched.

hostToLocalTaskCount will make the new executors request satisfies the 
locality requirement. Also, on the other hand, if even the new executor was not 
guaranteed to be different host, it will idle and then die. and the speculative 
task will request another executor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128452352
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invoked when an extra executor is needed (See SPARK-19326)
+   */
+  private def onExtraExecutorNeeded(): Unit = synchronized {
+val maxNeeded = math.max(math.min(maxNumExecutorsNeeded + 1, 
maxNumExecutors), minNumExecutors)
--- End diff --

Perhaps what we need is just to ensure there are more than two active 
executors left, so we may meet the locality requirement and launch the 
speculative jobs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128452499
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invoked when an extra executor is needed (See SPARK-19326)
+   */
+  private def onExtraExecutorNeeded(): Unit = synchronized {
+val maxNeeded = math.max(math.min(maxNumExecutorsNeeded + 1, 
maxNumExecutors), minNumExecutors)
--- End diff --

BTW how do we ensure we run new executors on a different host?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-19 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128339669
  
--- Diff: core/src/main/java/org/apache/spark/SparkFirehoseListener.java ---
@@ -140,6 +140,16 @@ public void onBlockUpdated(SparkListenerBlockUpdated 
blockUpdated) {
   }
 
   @Override
+  public void 
onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted 
speculativeTask) {
+onEvent(speculativeTask);
+  }
+
+  @Override
+  public void onExtraExecutorNeeded() {
--- End diff --

When there is one executor left but with multiple cpu cores, the task is 
running on that executor. But due to locality requirements, speculative jobs 
cannot launch on the same host. We will have to request for one extra executor. 
Thats is what this event is for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-19 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128338765
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -281,6 +281,20 @@ class DAGScheduler(
 eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
   }
 
+  /**
+   * Called by the TaskSetManager when it needs a speculative task is 
needed.
--- End diff --

updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-19 Thread janewangfb
Github user janewangfb commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128338491
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invokded when an extra executor is needed (See SPARK-19326)
--- End diff --

updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128215501
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Callback invokded when an extra executor is needed (See SPARK-19326)
--- End diff --

nit: `invokded` -> `invoked`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128215419
  
--- Diff: core/src/main/java/org/apache/spark/SparkFirehoseListener.java ---
@@ -140,6 +140,16 @@ public void onBlockUpdated(SparkListenerBlockUpdated 
blockUpdated) {
   }
 
   @Override
+  public void 
onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted 
speculativeTask) {
+onEvent(speculativeTask);
+  }
+
+  @Override
+  public void onExtraExecutorNeeded() {
--- End diff --

Maybe I'm missing something obvious, but what's use of this except for 
adding some log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r128215595
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -281,6 +281,20 @@ class DAGScheduler(
 eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
   }
 
+  /**
+   * Called by the TaskSetManager when it needs a speculative task is 
needed.
--- End diff --

nit: `needs` -> `decides`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2017-06-30 Thread janewangfb
GitHub user janewangfb opened a pull request:

https://github.com/apache/spark/pull/18492

[SPARK-19326] Speculated task attempts do not get launched in few scenarios

## What changes were proposed in this pull request?

Add a new listener event when a speculative task is created and notify it 
to ExecutorAllocationManager for requesting more executor.

## How was this patch tested?

- Added Unittests.
- For the test snippet in the jira:
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
With this code change, spark indicates 101 jobs are running (99 succeeded, 
2 running and 1 is speculative job)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/janewangfb/spark speculated_task_not_launched

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18492.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18492


commit 9d4886e621d17b8e5871e8beebbcf133f009b14b
Author: Jane Wang 
Date:   2017-06-29T18:50:33Z

add speculative job for executor calculation

commit d4dd56c7ef83e3061641e40ae00ef7389d7cee95
Author: Jane Wang 
Date:   2017-06-29T21:49:56Z

Merge branch 'master' into speculated_task_not_launched

commit 210ba70fdad5b1f4355deed1b84feeec6a265281
Author: Jane Wang 
Date:   2017-06-30T00:06:13Z

Add Unittests

commit cb2840496723326796cd2b36e6aa515e836bd3c1
Author: Jane Wang 
Date:   2017-06-30T00:41:00Z

remove debug lines

commit 41170df18bd70f58ff9c4562bc16eddbef5ad191
Author: Jane Wang 
Date:   2017-06-30T00:52:37Z

remove debug

commit 9784b4e6e30066fa03082ba12262fd8b0e2a5694
Author: Jane Wang 
Date:   2017-06-30T00:58:03Z

remove debug lines

commit 5331ab93d80515ce46bbe766036ef9dd8fef6e64
Author: Jane Wang 
Date:   2017-06-30T17:28:23Z

Merge branch 'master' into speculated_task_not_launched




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org