This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 432ea69 [SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager. 432ea69 is described below commit 432ea6924142c9688d8b6c64b46a531810691a8c Author: Liupengcheng <liupengch...@xiaomi.com> AuthorDate: Tue Mar 12 13:53:42 2019 -0700 [SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager. There is a race condition in the `ExecutorAllocationManager` that the `SparkListenerExecutorRemoved` event is posted before the `SparkListenerTaskStart` event, which will cause the incorrect result of `executorIds`. Then, when some executor idles, the real executors will be removed even actual executor number is equal to `minNumExecutors` due to the incorrect computation of `newExecutorTotal`(may greater than the `minNumExecutors`), thus may finally causing zero available executors bu [...] What's more, even the `SparkListenerTaskEnd` event can not make the fake `executorIds` released, because later idle event for the fake executors can not cause the real removal of these executors, as they are already removed and they are not exist in the `executorDataMap` of `CoaseGrainedSchedulerBackend`, so that the `onExecutorRemoved` method will never be called again. For details see https://issues.apache.org/jira/browse/SPARK-26927 This PR is to fix this problem. existUT and added UT Closes #23842 from liupc/Fix-race-condition-that-casues-dyanmic-allocation-not-working. Lead-authored-by: Liupengcheng <liupengch...@xiaomi.com> Co-authored-by: liupengcheng <liupengch...@xiaomi.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> (cherry picked from commit d5cfe08fdc7ad07e948f329c0bdeeca5c2574a18) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../apache/spark/ExecutorAllocationManager.scala | 13 +++++++---- .../spark/ExecutorAllocationManagerSuite.scala | 26 +++++++++++++++++++++- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 49fa80c..36819aa 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -719,10 +719,15 @@ private[spark] class ExecutorAllocationManager( if (stageIdToNumRunningTask.contains(stageId)) { stageIdToNumRunningTask(stageId) += 1 } - // This guards against the race condition in which the `SparkListenerTaskStart` - // event is posted before the `SparkListenerBlockManagerAdded` event, which is - // possible because these events are posted in different threads. (see SPARK-4951) - if (!allocationManager.executorIds.contains(executorId)) { + // This guards against the following race condition: + // 1. The `SparkListenerTaskStart` event is posted before the + // `SparkListenerExecutorAdded` event + // 2. The `SparkListenerExecutorRemoved` event is posted before the + // `SparkListenerTaskStart` event + // Above cases are possible because these events are posted in different threads. + // (see SPARK-4951 SPARK-26927) + if (!allocationManager.executorIds.contains(executorId) && + client.getExecutorIds().contains(executorId)) { allocationManager.onExecutorAdded(executorId) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index f50ad78..a69045f 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -420,6 +420,7 @@ class ExecutorAllocationManagerSuite // Remove when numExecutorsTarget is the same as the current number of executors assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) + (1 to 8).foreach(execId => onExecutorAdded(manager, execId.toString)) (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } assert(executorIds(manager).size === 8) @@ -833,7 +834,7 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 1) } - test("SPARK-4951: call onTaskStart before onBlockManagerAdded") { + test("SPARK-4951: call onTaskStart before onExecutorAdded") { sc = createSparkContext(2, 10, 2) val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) @@ -1161,6 +1162,29 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 1) } + test("SPARK-26927 call onExecutorRemoved before onTaskStart") { + sc = createSparkContext(2, 5) + val manager = sc.executorAllocationManager.get + assert(executorIds(manager).isEmpty) + post(sc.listenerBus, SparkListenerExecutorAdded( + 0L, "1", new ExecutorInfo("host1", 1, Map.empty))) + post(sc.listenerBus, SparkListenerExecutorAdded( + 0L, "2", new ExecutorInfo("host2", 1, Map.empty))) + post(sc.listenerBus, SparkListenerExecutorAdded( + 0L, "3", new ExecutorInfo("host3", 1, Map.empty))) + assert(executorIds(manager).size === 3) + + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "3", "disconnected")) + assert(executorIds(manager).size === 2) + assert(executorIds(manager) === Set("1", "2")) + + val taskInfo1 = createTaskInfo(0, 0, "3") + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1)) + // Verify taskStart not adding already removed executors. + assert(executorIds(manager).size === 2) + assert(executorIds(manager) === Set("1", "2")) + } + private def createSparkContext( minExecutors: Int = 1, maxExecutors: Int = 5, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org