[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19580 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147328834 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager( (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } + private def totalRunningTasks(): Int = synchronized { --- End diff -- Its okay to add a method which is used for unit testing purpose only. I am not inclined towards the idea of using `maxNumExecutorsNeeded` to indirectly verify `totalRunningTasks` for the following reason - Currently, the test case is testing what it is supposed to. If you check for `maxNumExecutorsNeeded` instead, it might not be clear what we are testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147325260 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager( (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } + private def totalRunningTasks(): Int = synchronized { --- End diff -- I'm not sure why do we need to add a method which only used for unit test. If want to verify the behavior of `totalRunningTasks`, I think `maxNumExecutorsNeeded` can also be used indirectly for verification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147320096 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager( (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } + private def totalRunningTasks(): Int = synchronized { --- End diff -- This is being called from the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147304200 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager( (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } + private def totalRunningTasks(): Int = synchronized { --- End diff -- Looks like no one invoke this method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147303973 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -678,7 +679,9 @@ private[spark] class ExecutorAllocationManager( val executorId = taskStart.taskInfo.executorId allocationManager.synchronized { -numRunningTasks += 1 +if (stageIdToNumRunningTask.contains(stageId)) { + stageIdToNumRunningTask(stageId) = stageIdToNumRunningTask(stageId) + 1 --- End diff -- nit: this can be changed to `stageIdToNumRunningTask(stageId) += 1` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147304306 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -709,7 +712,9 @@ private[spark] class ExecutorAllocationManager( val taskIndex = taskEnd.taskInfo.index val stageId = taskEnd.stageId allocationManager.synchronized { -numRunningTasks -= 1 +if (stageIdToNumRunningTask.contains(stageId)) { + stageIdToNumRunningTask(stageId) = stageIdToNumRunningTask(stageId) - 1 --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147291760 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -787,7 +791,9 @@ private[spark] class ExecutorAllocationManager( /** * The number of tasks currently running across all stages. */ -def totalRunningTasks(): Int = numRunningTasks +def totalRunningTasks(): Int = { + stageIdToNumRunningTask.values.sum --- End diff -- It'd be nice to make the other method calling this synchronized, just to be paranoid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147289166 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -787,7 +791,9 @@ private[spark] class ExecutorAllocationManager( /** * The number of tasks currently running across all stages. */ -def totalRunningTasks(): Int = numRunningTasks +def totalRunningTasks(): Int = { + stageIdToNumRunningTask.values.sum --- End diff -- Nevermind, this is called from a synchronized context. Except in your unit tests, that is (which call the private`totalRunningTasks` you added to the manager). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147288373 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -787,7 +791,9 @@ private[spark] class ExecutorAllocationManager( /** * The number of tasks currently running across all stages. */ -def totalRunningTasks(): Int = numRunningTasks +def totalRunningTasks(): Int = { + stageIdToNumRunningTask.values.sum --- End diff -- This needs to be inside `allocationManager.synchronized`, no? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147288434 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -491,7 +495,6 @@ private[spark] class ExecutorAllocationManager( s"when it is already pending to be removed!") return false } - --- End diff -- nit: no need for this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19580: [SPARK-11334][CORE] Fix bug in Executor allocatio...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19580#discussion_r147288495 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -227,6 +227,23 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) } + test("Ignore task end events from completed stages") { --- End diff -- nit: lower case "ignore" to match other tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org