[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20203 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r162716271 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -59,31 +60,55 @@ class TaskSetBlacklistSuite extends SparkFunSuite with BeforeAndAfterEach with M val shouldBeBlacklisted = (executor == "exec1" && index == 0) assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === shouldBeBlacklisted) } + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerExecutorBlacklistedForStage])) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec1", index = 1, failureReason = "testing") + assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) -assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) verify(listenerBusMock).post( SparkListenerExecutorBlacklistedForStage(0, "exec1", 2, 0, attemptId)) + +assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) + // Mark one task as failed on exec2 -- not enough for any further blacklisting yet. taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec2", index = 0, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2")) +verify(listenerBusMock, never()).post( + SparkListenerNodeBlacklistedForStage(0, "hostA", 2, 0, attemptId)) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) --- End diff -- yes, you are right --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r162714257 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -59,31 +60,55 @@ class TaskSetBlacklistSuite extends SparkFunSuite with BeforeAndAfterEach with M val shouldBeBlacklisted = (executor == "exec1" && index == 0) assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === shouldBeBlacklisted) } + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerExecutorBlacklistedForStage])) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec1", index = 1, failureReason = "testing") + assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) -assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) verify(listenerBusMock).post( SparkListenerExecutorBlacklistedForStage(0, "exec1", 2, 0, attemptId)) + +assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) + // Mark one task as failed on exec2 -- not enough for any further blacklisting yet. taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec2", index = 0, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2")) +verify(listenerBusMock, never()).post( + SparkListenerNodeBlacklistedForStage(0, "hostA", 2, 0, attemptId)) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) --- End diff -- the `verify` you add just above this is pointless with this one too, right? I think you only need this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r162087946 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, } // Check if enough tasks have failed on the executor to blacklist it for the entire stage. -if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) { +val numFailures = execFailures.numUniqueTasksWithFailures +if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (blacklistedExecs.add(exec)) { logInfo(s"Blacklisting executor ${exec} for stage $stageId") // This executor has been pushed into the blacklist for this stage. Let's check if it // pushes the whole node into the blacklist. val blacklistedExecutorsOnNode = execsWithFailuresOnNode.filter(blacklistedExecs.contains(_)) +val now = clock.getTimeMillis() +listenerBus.post( + SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, stageId, stageAttemptId)) if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (blacklistedNodes.add(host)) { logInfo(s"Blacklisting ${host} for stage $stageId") --- End diff -- yes that makes sense to me -- totally agree with your point about handling late updates. After all, another executor can get added to the node at any time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r162041751 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, } // Check if enough tasks have failed on the executor to blacklist it for the entire stage. -if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) { +val numFailures = execFailures.numUniqueTasksWithFailures +if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (blacklistedExecs.add(exec)) { logInfo(s"Blacklisting executor ${exec} for stage $stageId") // This executor has been pushed into the blacklist for this stage. Let's check if it // pushes the whole node into the blacklist. val blacklistedExecutorsOnNode = execsWithFailuresOnNode.filter(blacklistedExecs.contains(_)) +val now = clock.getTimeMillis() +listenerBus.post( + SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, stageId, stageAttemptId)) if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (blacklistedNodes.add(host)) { logInfo(s"Blacklisting ${host} for stage $stageId") --- End diff -- There are two possible solution I can see for this right now: 1) Extending updateBlacklistForFailedTask with org.apache.spark.scheduler.TaskSetBlacklist#updateBlacklistForFailedTask() with the hostToExecutors map. And sending SparkListenerExecutorBlacklistedForStage for all the executors for the node. But this change would propagate to TaskSetManager and even to TaskSchedulerImpl too (where this data is available). 2) Introducing new event SparkListenerNodeBlacklistedForStage. This is more consistent to the existing solution we have in BlacklistTracker. In this case in the AppStatusListener I guess I should use the **liveExecutors** to iterate on the currently available executors for the blacklisted node and fill up the executor summaries for the stage (as Node relevant data is not stored like blacklisting just mapped to the current available executors). This way if the very first metrics (SparkListenerExecutorMetricsUpdate) arrives for an executor **after** the node blacklisting the blacklisted flag will be still correct but on the other hand not used executors will also appear for the stage (this side-effect would be probably the very same for the first point too). I plan to go for the 2nd solution with a new HistoryServerSuite test. What is your opinion? Do you see any problem regarding this solution? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161885726 --- Diff: core/src/test/resources/HistoryServerExpectations/stage_blacklisting_for_stage_expectation.json --- @@ -0,0 +1,639 @@ +{ --- End diff -- nit: "stage" twice in the filename is confusing, how about just "blacklisting_for_stage_expectation.json" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161884194 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -36,7 +36,9 @@ import org.apache.spark.util.Clock * [[TaskSetManager]] this class is designed only to be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. */ -private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, val clock: Clock) +private[scheduler] class TaskSetBlacklist(private val listenerBus: LiveListenerBus, + val conf: SparkConf, val stageId: Int, --- End diff -- style: if its multiline, each param on its own line, double-indented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161885207 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -211,6 +211,11 @@ private[spark] class AppStatusListener( updateBlackListStatus(event.executorId, true) } + override def onExecutorBlacklistedForStage( +event: SparkListenerExecutorBlacklistedForStage): Unit = { --- End diff -- double-indent this line (4 spaces) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161884916 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, } // Check if enough tasks have failed on the executor to blacklist it for the entire stage. -if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) { +val numFailures = execFailures.numUniqueTasksWithFailures +if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (blacklistedExecs.add(exec)) { logInfo(s"Blacklisting executor ${exec} for stage $stageId") // This executor has been pushed into the blacklist for this stage. Let's check if it // pushes the whole node into the blacklist. val blacklistedExecutorsOnNode = execsWithFailuresOnNode.filter(blacklistedExecs.contains(_)) +val now = clock.getTimeMillis() +listenerBus.post( + SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, stageId, stageAttemptId)) if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (blacklistedNodes.add(host)) { logInfo(s"Blacklisting ${host} for stage $stageId") --- End diff -- if we're going to do this for executors, we should do it for nodes too. In the UI, you'd just show for each executor that it was blacklisted for the stage, I dont think you would need to distinguish whether it was blacklisted b/c of the entire node, or just the one executor was blacklisted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161070422 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -211,6 +211,11 @@ private[spark] class AppStatusListener( updateBlackListStatus(event.executorId, true) } + override def onExecutorBlacklistedForStage( +event: SparkListenerExecutorBlacklistedForStage): Unit = { +updateBlackListStatusForStage(event.executorId, event.stageId, event.stageAttemptId) + } + --- End diff -- Good catch. I will add the test tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161045636 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -223,6 +228,15 @@ private[spark] class AppStatusListener( updateNodeBlackList(event.hostId, false) } + def updateBlackListStatusForStage(executorId: String, stageId: Int, stageAttemptId: Int): Unit = { +Option(liveStages.get((stageId, stageAttemptId))).foreach { stage => + val now = System.nanoTime() + val esummary = stage.executorSummary(executorId) + esummary.isBlacklisted = true + maybeUpdate(esummary, now) +} + } + --- End diff -- `liveUpdate` / `maybeUpdate` are optimizations to avoid unnecessary writes to disk. They can be called for intermediate updates (which would be reflected in a live application), and only force the write at the very last update (so the data is written in the SHS). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161009520 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -110,7 +131,13 @@ class TaskSetBlacklistSuite extends SparkFunSuite { .set(config.MAX_TASK_ATTEMPTS_PER_NODE, 3) .set(config.MAX_FAILURES_PER_EXEC_STAGE, 2) .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3) -val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock()) +val clock = new ManualClock + +val attemptId = 0 +val taskSetBlacklist = new TaskSetBlacklist( + listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, clock = clock) + +clock.setTime(0) --- End diff -- You should set the time to a new value before each call of taskSetBlacklist.updateBlacklistForFailedTask to see that the events on the listenerbus has the correct time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161001141 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -211,6 +211,11 @@ private[spark] class AppStatusListener( updateBlackListStatus(event.executorId, true) } + override def onExecutorBlacklistedForStage( +event: SparkListenerExecutorBlacklistedForStage): Unit = { +updateBlackListStatusForStage(event.executorId, event.stageId, event.stageAttemptId) + } + --- End diff -- Consider covering this functionality (updating the status) in AppStatusListenerSuite. We already have a check for blacklisting an executor, we should have the same for a stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161009908 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -157,13 +187,19 @@ class TaskSetBlacklistSuite extends SparkFunSuite { // lead to any node blacklisting val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.BLACKLIST_ENABLED.key, "true") -val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock()) +val clock = new ManualClock + +val attemptId = 0 +val taskSetBlacklist = new TaskSetBlacklist( + listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, clock = clock) +clock.setTime(0) taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "1", index = 0, failureReason = "testing") taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "1", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) + verify(listenerBusMock).post(SparkListenerExecutorBlacklistedForStage(0, "1", 2, 0, attemptId)) taskSetBlacklist.updateBlacklistForFailedTask( --- End diff -- Set time to new value before this call. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161005667 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -223,6 +228,15 @@ private[spark] class AppStatusListener( updateNodeBlackList(event.hostId, false) } + def updateBlackListStatusForStage(executorId: String, stageId: Int, stageAttemptId: Int): Unit = { +Option(liveStages.get((stageId, stageAttemptId))).foreach { stage => + val now = System.nanoTime() + val esummary = stage.executorSummary(executorId) + esummary.isBlacklisted = true + maybeUpdate(esummary, now) +} + } + --- End diff -- LiveEntities periodically write an immutable view of the entity to the store. LiveExecutor and LiveExecutorStageSummary were modified in this PR to maintain blacklisted status and to write it to the state store. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r160985168 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -223,6 +228,15 @@ private[spark] class AppStatusListener( updateNodeBlackList(event.hostId, false) } + def updateBlackListStatusForStage(executorId: String, stageId: Int, stageAttemptId: Int): Unit = { +Option(liveStages.get((stageId, stageAttemptId))).foreach { stage => + val now = System.nanoTime() + val esummary = stage.executorSummary(executorId) + esummary.isBlacklisted = true + maybeUpdate(esummary, now) +} + } + --- End diff -- @vanzin you are more familiar with the new history server. I am wondering why is the updateBlacklistStatus only done with liveUpdate? Doesn't that mean it won't show up in history server for finished app? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
GitHub user attilapiros opened a pull request: https://github.com/apache/spark/pull/20203 [SPARK-22577] [core] executor page blacklist status should update with TaskSet level blacklisting ## What changes were proposed in this pull request? In this PR stage blacklisting is propagated to UI by introducing a new Spark listener event (SparkListenerExecutorBlacklistedForStage) which indicates the executor is blacklisted for a stage (see the existing configuration: spark.blacklist.stage.maxFailedTasksPerExecutor for details). Blacklisting state is propagated to the "Aggregated Metrics by Executor" table's blacklisting column (for a selected stage). Where after this change three possible labels could be seen: - "for application": when the executor is blacklisted for the application (see the configuration spark.blacklist.application.maxFailedTasksPerExecutor for details) - "for stage": when the executor is **only** blacklisted for the stage - "false" : when the executor is not blacklisted at all ## How was this patch tested? It is tested both manually and with unit tests (including API test via HistoryServerSuite). Manually it is tested with a local cluster running Spark as: ``` $ bin/spark-shell --master "local-cluster[2,1,1024]" --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true" ``` Executing: ``` scala import org.apache.spark.SparkEnv sc.parallelize(1 to 10, 10).map { x => if (SparkEnv.get.executorId == "0") throw new RuntimeException("Bad executor") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect() ``` To see result check the "Aggregated Metrics by Executor" section at the bottom of picture: [UI screenshot for stage level blacklisting](https://issues.apache.org/jira/secure/attachment/12905283/stage_blacklisting.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/attilapiros/spark SPARK-22577 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20203.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20203 commit 8d736c1cd56e341d4d7da88bae01ac3a47649f80 Author: âattilapirosâ Date: 2018-01-05T20:45:54Z Propagate stage blacklisting to UI. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org