[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216788096
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
--- End diff --

noted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216788079
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
+
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
--- End diff --

noted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216788016
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -623,8 +623,9 @@ private[spark] class TaskSetManager(
*
* It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
* blacklist.  The most common scenario would be if there are fewer 
executors than
-   * spark.task.maxFailures. We need to detect this so we can fail the 
task set, otherwise the job
-   * will hang.
+   * spark.task.maxFailures. We need to detect this so we can avoid the 
job from being hung.
+   * If dynamic allocation is enabled we try to acquire new executor/s by 
killing the existing one.
+   * In case of static allocation we abort the taskSet immediately to fail 
the job.
--- End diff --

Yes. The change of removing a single executor takes care of static 
allocation as well. I will update the comments. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216724373
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
--- End diff --

I think this should probably be logInfo (unless there is something else 
similar at INFO level elsewhere)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216725731
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
+
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
--- End diff --

I'd include a logInfo here that spark can't schedule anything because of 
blacklisting, but its going to try to kill blacklisted executors and acquire 
new ones.  Also mention how long it will wait before giving up and the 
associated conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216723175
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
--- End diff --

`hostToExecutors.head._2.head`

just thinking "aloud" -- I guess taking an arbitrary executor here is OK, 
as we know there is some task that can't run on any executor.  But I wonder if 
we could have some priority here -- eg. I'd much rather kill an executor which 
has been blacklisted for an entire stage or the whole app, rather than one that 
was blacklisted for just some task.  Need to look into if there is an efficient 
way to keep that priority list, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216726755
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -623,8 +623,9 @@ private[spark] class TaskSetManager(
*
* It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
* blacklist.  The most common scenario would be if there are fewer 
executors than
-   * spark.task.maxFailures. We need to detect this so we can fail the 
task set, otherwise the job
-   * will hang.
+   * spark.task.maxFailures. We need to detect this so we can avoid the 
job from being hung.
+   * If dynamic allocation is enabled we try to acquire new executor/s by 
killing the existing one.
+   * In case of static allocation we abort the taskSet immediately to fail 
the job.
--- End diff --

why do you want something different with static allocation?  If you kill an 
executor, static allocation will also request a replacement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216726323
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
+
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
+  abortTimer.schedule(new TimerTask() {
+override def run() {
+  if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+(unschedulableTaskSetToExpiryTime(taskSet)
+  + UNSCHEDULABLE_TASKSET_TIMEOUT_MS)
+  <= clock.getTimeMillis()
+  ) {
+logInfo("Cannot schedule any task because of 
complete blacklisting. " +
+  "Wait time for scheduling expired. Aborting the 
application.")
+
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+  } else {
+this.cancel()
+  }
+}
+  }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS)
+}
+case _ => // Do nothing.
+}
+  } else {
+// If a task was scheduled, we clear the expiry time for the 
taskSet. The abort timer
+// checks this entry to decide if we want to abort the taskSet.
+if (unschedulableTaskSetToExpiryTime.contains(taskSet)) {
--- End diff --

you can move this up to the `else` so its an `else if`.  Or you could also 
just call `remove` without checking `contains`, that avoids probing twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r215036162
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+  if (conf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+// If the taskSet is unschedulable we kill the existing 
blacklisted executor/s and
+// kick off an abortTimer which after waiting will abort 
the taskSet if we were
+// unable to get new executors and couldn't schedule a 
task from the taskSet.
+// Note: We keep a track of schedulability on a per 
taskSet basis rather than on a
+// per task basis.
+if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  hostToExecutors.valuesIterator.foreach(executors => 
executors.foreach({
+executor =>
+  logDebug("Killing executor because of task 
unschedulability: " + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
--- End diff --

- To refresh executors, you need to enable 
`spark.blacklist.killBlacklistedExecutors`.
- I was thinking about it, killing all the executors is a little too harsh. 
Killing only a single executor  would help mitigate this, although this would 
also lead to failing the running tasks on that executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r214720097
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+  if (conf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+// If the taskSet is unschedulable we kill the existing 
blacklisted executor/s and
+// kick off an abortTimer which after waiting will abort 
the taskSet if we were
+// unable to get new executors and couldn't schedule a 
task from the taskSet.
+// Note: We keep a track of schedulability on a per 
taskSet basis rather than on a
+// per task basis.
+if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  hostToExecutors.valuesIterator.foreach(executors => 
executors.foreach({
+executor =>
+  logDebug("Killing executor because of task 
unschedulability: " + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
+  })
+  )
+  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
+  abortTimer.schedule(new TimerTask() {
+override def run() {
+  if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+(unschedulableTaskSetToExpiryTime(taskSet)
+  + UNSCHEDULABLE_TASKSET_TIMEOUT_MS)
+  <= clock.getTimeMillis()
+  ) {
+logInfo("Cannot schedule any task because of 
complete blacklisting. " +
+  "Wait time for scheduling expired. Aborting the 
application.")
+
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+  } else {
+this.cancel()
+  }
+}
+  }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS)
+}
+  } else {
+// TODO: try acquiring new executors for static allocation 
before aborting.
--- End diff --

How ? Waiting for other tasks finish and release resources ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r214719743
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+  if (conf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+// If the taskSet is unschedulable we kill the existing 
blacklisted executor/s and
+// kick off an abortTimer which after waiting will abort 
the taskSet if we were
+// unable to get new executors and couldn't schedule a 
task from the taskSet.
+// Note: We keep a track of schedulability on a per 
taskSet basis rather than on a
+// per task basis.
+if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  hostToExecutors.valuesIterator.foreach(executors => 
executors.foreach({
+executor =>
+  logDebug("Killing executor because of task 
unschedulability: " + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
--- End diff --

Seriously? You killed all executors ? What if other taskSets' tasks are 
running on them ?

BTW, if you want to refresh executors, you have to enable 
`spark.blacklist.killBlacklistedExecutors`  also.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org