[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20203


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-19 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r162716271
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -59,31 +60,55 @@ class TaskSetBlacklistSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
   val shouldBeBlacklisted = (executor == "exec1" && index == 0)
   assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, 
index) === shouldBeBlacklisted)
 }
+
 assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerExecutorBlacklistedForStage]))
+
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
 
 // Mark task 1 failed on exec1 -- this pushes the executor into the 
blacklist
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "exec1", index = 1, failureReason = "testing")
+
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
-assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 verify(listenerBusMock).post(
   SparkListenerExecutorBlacklistedForStage(0, "exec1", 2, 0, 
attemptId))
+
+assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
+
 // Mark one task as failed on exec2 -- not enough for any further 
blacklisting yet.
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "exec2", index = 0, failureReason = "testing")
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+
 assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+verify(listenerBusMock, never()).post(
+  SparkListenerNodeBlacklistedForStage(0, "hostA", 2, 0, attemptId))
+
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
--- End diff --

yes, you are right


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r162714257
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -59,31 +60,55 @@ class TaskSetBlacklistSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
   val shouldBeBlacklisted = (executor == "exec1" && index == 0)
   assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, 
index) === shouldBeBlacklisted)
 }
+
 assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerExecutorBlacklistedForStage]))
+
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
 
 // Mark task 1 failed on exec1 -- this pushes the executor into the 
blacklist
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "exec1", index = 1, failureReason = "testing")
+
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
-assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 verify(listenerBusMock).post(
   SparkListenerExecutorBlacklistedForStage(0, "exec1", 2, 0, 
attemptId))
+
+assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
+
 // Mark one task as failed on exec2 -- not enough for any further 
blacklisting yet.
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "exec2", index = 0, failureReason = "testing")
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+
 assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+verify(listenerBusMock, never()).post(
+  SparkListenerNodeBlacklistedForStage(0, "hostA", 2, 0, attemptId))
+
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
--- End diff --

the `verify` you add just above this is pointless with this one too, right? 
 I think you only need this one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r162087946
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
 }
 
 // Check if enough tasks have failed on the executor to blacklist it 
for the entire stage.
-if (execFailures.numUniqueTasksWithFailures >= 
MAX_FAILURES_PER_EXEC_STAGE) {
+val numFailures = execFailures.numUniqueTasksWithFailures
+if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
   if (blacklistedExecs.add(exec)) {
 logInfo(s"Blacklisting executor ${exec} for stage $stageId")
 // This executor has been pushed into the blacklist for this 
stage.  Let's check if it
 // pushes the whole node into the blacklist.
 val blacklistedExecutorsOnNode =
   execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+val now = clock.getTimeMillis()
+listenerBus.post(
+  SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, 
stageId, stageAttemptId))
 if (blacklistedExecutorsOnNode.size >= 
MAX_FAILED_EXEC_PER_NODE_STAGE) {
   if (blacklistedNodes.add(host)) {
 logInfo(s"Blacklisting ${host} for stage $stageId")
--- End diff --

yes that makes sense to me -- totally agree with your point about handling 
late updates.  After all, another executor can get added to the node at any 
time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-17 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r162041751
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
 }
 
 // Check if enough tasks have failed on the executor to blacklist it 
for the entire stage.
-if (execFailures.numUniqueTasksWithFailures >= 
MAX_FAILURES_PER_EXEC_STAGE) {
+val numFailures = execFailures.numUniqueTasksWithFailures
+if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
   if (blacklistedExecs.add(exec)) {
 logInfo(s"Blacklisting executor ${exec} for stage $stageId")
 // This executor has been pushed into the blacklist for this 
stage.  Let's check if it
 // pushes the whole node into the blacklist.
 val blacklistedExecutorsOnNode =
   execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+val now = clock.getTimeMillis()
+listenerBus.post(
+  SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, 
stageId, stageAttemptId))
 if (blacklistedExecutorsOnNode.size >= 
MAX_FAILED_EXEC_PER_NODE_STAGE) {
   if (blacklistedNodes.add(host)) {
 logInfo(s"Blacklisting ${host} for stage $stageId")
--- End diff --

There are two possible solution I can see for this right now:

1) Extending updateBlacklistForFailedTask with 
org.apache.spark.scheduler.TaskSetBlacklist#updateBlacklistForFailedTask() with 
the hostToExecutors map. And sending SparkListenerExecutorBlacklistedForStage 
for all the executors for the node. But this change would propagate to 
TaskSetManager and even to TaskSchedulerImpl too (where this data is 
available). 

2) Introducing new event SparkListenerNodeBlacklistedForStage. This is more 
consistent to the existing solution we have in BlacklistTracker. In this case 
in the AppStatusListener I guess I should use the **liveExecutors** to iterate 
on the currently available executors for the blacklisted node and fill up the 
executor summaries for the stage (as Node relevant data is not stored like 
blacklisting just mapped to the current available executors). This way if the 
very first metrics (SparkListenerExecutorMetricsUpdate) arrives for an executor 
**after** the node blacklisting the blacklisted flag will be still correct but 
on the other hand not used executors will also appear for the stage (this 
side-effect would be probably the very same for the first point too).

I plan to go for the 2nd solution with a new HistoryServerSuite test. 
What is your opinion? Do you see any problem regarding this solution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161885726
  
--- Diff: 
core/src/test/resources/HistoryServerExpectations/stage_blacklisting_for_stage_expectation.json
 ---
@@ -0,0 +1,639 @@
+{
--- End diff --

nit: "stage" twice in the filename is confusing, how about just 
"blacklisting_for_stage_expectation.json"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161884194
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -36,7 +36,9 @@ import org.apache.spark.util.Clock
  * [[TaskSetManager]] this class is designed only to be called from code 
with a lock on the
  * TaskScheduler (e.g. its event handlers). It should not be called from 
other threads.
  */
-private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val 
stageId: Int, val clock: Clock)
+private[scheduler] class TaskSetBlacklist(private val listenerBus: 
LiveListenerBus,
+  val conf: SparkConf, val 
stageId: Int,
--- End diff --

style: if its multiline, each param on its own line, double-indented


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161885207
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -211,6 +211,11 @@ private[spark] class AppStatusListener(
 updateBlackListStatus(event.executorId, true)
   }
 
+  override def onExecutorBlacklistedForStage(
+event: SparkListenerExecutorBlacklistedForStage): Unit = {
--- End diff --

double-indent this line (4 spaces)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161884916
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
 }
 
 // Check if enough tasks have failed on the executor to blacklist it 
for the entire stage.
-if (execFailures.numUniqueTasksWithFailures >= 
MAX_FAILURES_PER_EXEC_STAGE) {
+val numFailures = execFailures.numUniqueTasksWithFailures
+if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
   if (blacklistedExecs.add(exec)) {
 logInfo(s"Blacklisting executor ${exec} for stage $stageId")
 // This executor has been pushed into the blacklist for this 
stage.  Let's check if it
 // pushes the whole node into the blacklist.
 val blacklistedExecutorsOnNode =
   execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+val now = clock.getTimeMillis()
+listenerBus.post(
+  SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, 
stageId, stageAttemptId))
 if (blacklistedExecutorsOnNode.size >= 
MAX_FAILED_EXEC_PER_NODE_STAGE) {
   if (blacklistedNodes.add(host)) {
 logInfo(s"Blacklisting ${host} for stage $stageId")
--- End diff --

if we're going to do this for executors, we should do it for nodes too.  In 
the UI, you'd just show for each executor that it was blacklisted for the 
stage, I dont think you would need to distinguish whether it was blacklisted 
b/c of the entire node, or just the one executor was blacklisted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161070422
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -211,6 +211,11 @@ private[spark] class AppStatusListener(
 updateBlackListStatus(event.executorId, true)
   }
 
+  override def onExecutorBlacklistedForStage(
+event: SparkListenerExecutorBlacklistedForStage): Unit = {
+updateBlackListStatusForStage(event.executorId, event.stageId, 
event.stageAttemptId)
+  }
+
--- End diff --

Good catch. I will add the test tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161045636
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -223,6 +228,15 @@ private[spark] class AppStatusListener(
 updateNodeBlackList(event.hostId, false)
   }
 
+  def updateBlackListStatusForStage(executorId: String, stageId: Int, 
stageAttemptId: Int): Unit = {
+Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
+  val now = System.nanoTime()
+  val esummary = stage.executorSummary(executorId)
+  esummary.isBlacklisted = true
+  maybeUpdate(esummary, now)
+}
+  }
+
--- End diff --

`liveUpdate` / `maybeUpdate` are optimizations to avoid unnecessary writes 
to disk. They can be called for intermediate updates (which would be reflected 
in a live application), and only force the write at the very last update (so 
the data is written in the SHS).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161009520
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -110,7 +131,13 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
   .set(config.MAX_TASK_ATTEMPTS_PER_NODE, 3)
   .set(config.MAX_FAILURES_PER_EXEC_STAGE, 2)
   .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3)
-val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new 
SystemClock())
+val clock = new ManualClock
+
+val attemptId = 0
+val taskSetBlacklist = new TaskSetBlacklist(
+  listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, 
clock = clock)
+
+clock.setTime(0)
--- End diff --

You should set the time to a new value before each call of 
taskSetBlacklist.updateBlacklistForFailedTask to see that the events on the 
listenerbus has the correct time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161001141
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -211,6 +211,11 @@ private[spark] class AppStatusListener(
 updateBlackListStatus(event.executorId, true)
   }
 
+  override def onExecutorBlacklistedForStage(
+event: SparkListenerExecutorBlacklistedForStage): Unit = {
+updateBlackListStatusForStage(event.executorId, event.stageId, 
event.stageAttemptId)
+  }
+
--- End diff --

Consider covering this functionality (updating the status) in 
AppStatusListenerSuite. We already have a check for blacklisting an executor, 
we should have the same for a stage.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161009908
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -157,13 +187,19 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
 // lead to any node blacklisting
 val conf = new SparkConf().setAppName("test").setMaster("local")
   .set(config.BLACKLIST_ENABLED.key, "true")
-val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new 
SystemClock())
+val clock = new ManualClock
+
+val attemptId = 0
+val taskSetBlacklist = new TaskSetBlacklist(
+  listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, 
clock = clock)
+clock.setTime(0)
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "1", index = 0, failureReason = "testing")
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "1", index = 1, failureReason = "testing")
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
verify(listenerBusMock).post(SparkListenerExecutorBlacklistedForStage(0, "1", 
2, 0, attemptId))
 
 taskSetBlacklist.updateBlacklistForFailedTask(
--- End diff --

Set time to new value before this call.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161005667
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -223,6 +228,15 @@ private[spark] class AppStatusListener(
 updateNodeBlackList(event.hostId, false)
   }
 
+  def updateBlackListStatusForStage(executorId: String, stageId: Int, 
stageAttemptId: Int): Unit = {
+Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
+  val now = System.nanoTime()
+  val esummary = stage.executorSummary(executorId)
+  esummary.isBlacklisted = true
+  maybeUpdate(esummary, now)
+}
+  }
+
--- End diff --

LiveEntities periodically write an immutable view of the entity to the 
store. LiveExecutor and LiveExecutorStageSummary were modified in this PR to 
maintain blacklisted status and to write it to the state store.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r160985168
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -223,6 +228,15 @@ private[spark] class AppStatusListener(
 updateNodeBlackList(event.hostId, false)
   }
 
+  def updateBlackListStatusForStage(executorId: String, stageId: Int, 
stageAttemptId: Int): Unit = {
+Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
+  val now = System.nanoTime()
+  val esummary = stage.executorSummary(executorId)
+  esummary.isBlacklisted = true
+  maybeUpdate(esummary, now)
+}
+  }
+
--- End diff --

@vanzin you are more familiar with the new history server.  I am wondering 
why is the updateBlacklistStatus only done with liveUpdate?  Doesn't that mean 
it won't show up in history server for finished app?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-09 Thread attilapiros
GitHub user attilapiros opened a pull request:

https://github.com/apache/spark/pull/20203

[SPARK-22577] [core] executor page blacklist status should update with 
TaskSet level blacklisting

## What changes were proposed in this pull request?

In this PR stage blacklisting is propagated to UI by introducing a new 
Spark listener event (SparkListenerExecutorBlacklistedForStage) which indicates 
the executor is blacklisted for a stage (see the existing configuration: 
spark.blacklist.stage.maxFailedTasksPerExecutor for details). Blacklisting 
state is propagated to the "Aggregated Metrics by Executor" table's 
blacklisting column (for a selected stage). 

Where after this change three possible labels could be seen:
- "for application": when the executor is blacklisted for the application 
(see the configuration spark.blacklist.application.maxFailedTasksPerExecutor 
for details) 
- "for stage": when the executor is **only** blacklisted for the stage 
- "false" : when the executor is not blacklisted at all

## How was this patch tested?

It is tested both manually and with unit tests (including API test via 
HistoryServerSuite). 

Manually it is tested with a local cluster running Spark as:
```
$ bin/spark-shell --master "local-cluster[2,1,1024]" --conf 
"spark.blacklist.enabled=true" --conf 
"spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf 
"spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf 
"spark.eventLog.enabled=true"
```

Executing:
``` scala
import org.apache.spark.SparkEnv

sc.parallelize(1 to 10, 10).map { x =>
  if (SparkEnv.get.executorId == "0") throw new RuntimeException("Bad 
executor")
  else (x % 3, x)
}.reduceByKey((a, b) => a + b).collect()
```

To see result check the "Aggregated Metrics by Executor" section at the 
bottom of picture:
[UI screenshot for stage level 
blacklisting](https://issues.apache.org/jira/secure/attachment/12905283/stage_blacklisting.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/attilapiros/spark SPARK-22577

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20203


commit 8d736c1cd56e341d4d7da88bae01ac3a47649f80
Author: “attilapiros” 
Date:   2018-01-05T20:45:54Z

Propagate stage blacklisting to UI.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org