Repository: spark Updated Branches: refs/heads/branch-1.3 52386cf44 -> 9387dc1c8
[SPARK-5593][Core]Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener More strictly, in ExecutorAllocationListener, we need to replace onBlockManagerAdded, onBlockManagerRemoved with onExecutorAdded,onExecutorRemoved. because at some time, onExecutorAdded and onExecutorRemoved are more accurate to express these meanings. example at SPARK-5529, BlockManager has been removed,but executor is existed. andrewor14 sryza Author: lianhuiwang <lianhuiwan...@gmail.com> Closes #4369 from lianhuiwang/SPARK-5593 and squashes the following commits: 333367c [lianhuiwang] Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener (cherry picked from commit 6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9387dc1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9387dc1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9387dc1c Branch: refs/heads/branch-1.3 Commit: 9387dc1c856e706a6da76612240c4c6ab611713d Parents: 52386cf Author: lianhuiwang <lianhuiwan...@gmail.com> Authored: Fri Feb 6 11:09:37 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Fri Feb 6 11:09:43 2015 -0800 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 9 +++--- .../spark/ExecutorAllocationManagerSuite.scala | 32 +++++++++----------- 2 files changed, 19 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9387dc1c/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 8b38366..02d54bf 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager( } } - override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { - val executorId = blockManagerAdded.blockManagerId.executorId + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + val executorId = executorAdded.executorId if (executorId != SparkContext.DRIVER_IDENTIFIER) { // This guards against the race condition in which the `SparkListenerTaskStart` // event is posted before the `SparkListenerBlockManagerAdded` event, which is @@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager( } } - override def onBlockManagerRemoved( - blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { - allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId) + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + allocationManager.onExecutorRemoved(executorRemoved.executorId) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9387dc1c/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 57081dd..c286962 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.scalatest.{FunSuite, PrivateMethodTester} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ -import org.apache.spark.storage.BlockManagerId +import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Test add and remove behavior of ExecutorAllocationManager. @@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { // Verify that running a task reduces the cap sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsPending(manager) === 4) assert(addExecutors(manager) === 1) @@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(removeTimes(manager).isEmpty) // New executors have registered - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-2", "host2", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-2", new ExecutorInfo("host2", 1))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 2) assert(removeTimes(manager).contains("executor-2")) // Existing executors have disconnected - sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved( - 0L, BlockManagerId("executor-1", "host1", 1))) + sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", "")) assert(executorIds(manager).size === 1) assert(!executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(!removeTimes(manager).contains("executor-1")) // Unknown executor has disconnected - sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved( - 0L, BlockManagerId("executor-3", "host3", 1))) + sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", "")) assert(executorIds(manager).size === 1) assert(removeTimes(manager).size === 1) } @@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(removeTimes(manager).isEmpty) sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) @@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-2", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-2", new ExecutorInfo("host1", 1))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org