Repository: spark
Updated Branches:
  refs/heads/branch-1.3 52386cf44 -> 9387dc1c8


[SPARK-5593][Core]Replace BlockManagerListener with ExecutorListener in 
ExecutorAllocationListener

More strictly, in ExecutorAllocationListener, we need to replace 
onBlockManagerAdded, onBlockManagerRemoved with 
onExecutorAdded,onExecutorRemoved. because at some time, onExecutorAdded and 
onExecutorRemoved are more accurate to express these meanings. example at 
SPARK-5529, BlockManager has been removed,but executor is existed.
 andrewor14 sryza

Author: lianhuiwang <lianhuiwan...@gmail.com>

Closes #4369 from lianhuiwang/SPARK-5593 and squashes the following commits:

333367c [lianhuiwang] Replace BlockManagerListener with ExecutorListener in 
ExecutorAllocationListener

(cherry picked from commit 6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9387dc1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9387dc1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9387dc1c

Branch: refs/heads/branch-1.3
Commit: 9387dc1c856e706a6da76612240c4c6ab611713d
Parents: 52386cf
Author: lianhuiwang <lianhuiwan...@gmail.com>
Authored: Fri Feb 6 11:09:37 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Feb 6 11:09:43 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  9 +++---
 .../spark/ExecutorAllocationManagerSuite.scala  | 32 +++++++++-----------
 2 files changed, 19 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9387dc1c/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 8b38366..02d54bf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
-    override def onBlockManagerAdded(blockManagerAdded: 
SparkListenerBlockManagerAdded): Unit = {
-      val executorId = blockManagerAdded.blockManagerId.executorId
+    override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
+      val executorId = executorAdded.executorId
       if (executorId != SparkContext.DRIVER_IDENTIFIER) {
         // This guards against the race condition in which the 
`SparkListenerTaskStart`
         // event is posted before the `SparkListenerBlockManagerAdded` event, 
which is
@@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
-    override def onBlockManagerRemoved(
-        blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
-      
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
+    override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = {
+      allocationManager.onExecutorRemoved(executorRemoved.executorId)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9387dc1c/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 57081dd..c286962 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
 import org.scalatest.{FunSuite, PrivateMethodTester}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
@@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with 
LocalSparkContext {
 
     // Verify that running a task reduces the cap
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 
3)))
-    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
-      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+      0L, "executor-1", new ExecutorInfo("host1", 1)))
     sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
     assert(numExecutorsPending(manager) === 4)
     assert(addExecutors(manager) === 1)
@@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite 
with LocalSparkContext {
     assert(removeTimes(manager).isEmpty)
 
     // New executors have registered
-    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
-      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+      0L, "executor-1", new ExecutorInfo("host1", 1)))
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 1)
     assert(removeTimes(manager).contains("executor-1"))
-    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
-      0L, BlockManagerId("executor-2", "host2", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+      0L, "executor-2", new ExecutorInfo("host2", 1)))
     assert(executorIds(manager).size === 2)
     assert(executorIds(manager).contains("executor-2"))
     assert(removeTimes(manager).size === 2)
     assert(removeTimes(manager).contains("executor-2"))
 
     // Existing executors have disconnected
-    sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
-      0L, BlockManagerId("executor-1", "host1", 1)))
+    sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", 
""))
     assert(executorIds(manager).size === 1)
     assert(!executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 1)
     assert(!removeTimes(manager).contains("executor-1"))
 
     // Unknown executor has disconnected
-    sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
-      0L, BlockManagerId("executor-3", "host3", 1)))
+    sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", 
""))
     assert(executorIds(manager).size === 1)
     assert(removeTimes(manager).size === 1)
   }
@@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with 
LocalSparkContext {
     assert(removeTimes(manager).isEmpty)
 
     sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
-    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
-      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+      0L, "executor-1", new ExecutorInfo("host1", 1)))
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 0)
@@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite 
with LocalSparkContext {
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
-    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
-      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+      0L, "executor-1", new ExecutorInfo("host1", 1)))
     sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
 
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 0)
 
-    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
-      0L, BlockManagerId("executor-2", "host1", 1), 100L))
+    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+      0L, "executor-2", new ExecutorInfo("host1", 1)))
     assert(executorIds(manager).size === 2)
     assert(executorIds(manager).contains("executor-2"))
     assert(removeTimes(manager).size === 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to