Repository: spark
Updated Branches:
  refs/heads/branch-1.1 6ec137450 -> 6c64d57fa


[SPARK-2931] In TaskSetManager, reset currentLocalityIndex after recomputing 
locality levels

This addresses SPARK-2931, a bug where getAllowedLocalityLevel() could throw 
ArrayIndexOutOfBoundsException.  The fix here is to reset currentLocalityIndex 
after recomputing the locality levels.

Thanks to kayousterhout, mridulm, and lirui-intel for helping me to debug this.

Author: Josh Rosen <joshro...@apache.org>

Closes #1896 from JoshRosen/SPARK-2931 and squashes the following commits:

48b60b5 [Josh Rosen] Move FakeRackUtil.cleanUp() info beforeEach().
6fec474 [Josh Rosen] Set currentLocalityIndex after recomputing locality levels.
9384897 [Josh Rosen] Update SPARK-2931 test to reflect changes in 
63bdb1f41b4895e3a9444f7938094438a94d3007.
9ecd455 [Josh Rosen] Apply @mridulm's patch for reproducing SPARK-2931.

(cherry picked from commit 7712e724ad69dd0b83754e938e9799d13a4d43b9)
Signed-off-by: Josh Rosen <joshro...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c64d57f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c64d57f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c64d57f

Branch: refs/heads/branch-1.1
Commit: 6c64d57fabd8ec08dcc03cdc94381ee7d431fbcf
Parents: 6ec1374
Author: Josh Rosen <joshro...@apache.org>
Authored: Mon Aug 11 19:15:01 2014 -0700
Committer: Josh Rosen <joshro...@apache.org>
Committed: Mon Aug 11 19:15:20 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala | 11 ++++--
 .../spark/scheduler/TaskSetManagerSuite.scala   | 40 +++++++++++++++++++-
 2 files changed, 46 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c64d57f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 20a4bd1..d9d53fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -690,8 +690,7 @@ private[spark] class TaskSetManager(
       handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
     }
     // recalculate valid locality levels and waits when executor is lost
-    myLocalityLevels = computeValidLocalityLevels()
-    localityWaits = myLocalityLevels.map(getLocalityWait)
+    recomputeLocality()
   }
 
   /**
@@ -775,9 +774,15 @@ private[spark] class TaskSetManager(
     levels.toArray
   }
 
-  def executorAdded() {
+  def recomputeLocality() {
+    val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
     myLocalityLevels = computeValidLocalityLevels()
     localityWaits = myLocalityLevels.map(getLocalityWait)
+    currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
+  }
+
+  def executorAdded() {
+    recomputeLocality()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c64d57f/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ffd2338..93e8dda 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -154,6 +154,11 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
   val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
   val MAX_TASK_FAILURES = 4
 
+  override def beforeEach() {
+    super.beforeEach()
+    FakeRackUtil.cleanUp()
+  }
+
   test("TaskSet with no preferences") {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
@@ -471,7 +476,6 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
 
   test("new executors get added and lost") {
     // Assign host2 to rack2
-    FakeRackUtil.cleanUp()
     FakeRackUtil.assignHostToRack("host2", "rack2")
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc)
@@ -504,7 +508,6 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
   }
 
   test("test RACK_LOCAL tasks") {
-    FakeRackUtil.cleanUp()
     // Assign host1 to rack1
     FakeRackUtil.assignHostToRack("host1", "rack1")
     // Assign host2 to rack1
@@ -607,6 +610,39 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
   }
 
+  test("Ensure TaskSetManager is usable after addition of levels") {
+    // Regression test for SPARK-2931
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc)
+    val taskSet = FakeTask.createTaskSet(2,
+      Seq(TaskLocation("host1", "execA")),
+      Seq(TaskLocation("host2", "execB.1")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    // Only ANY is valid
+    assert(manager.myLocalityLevels.sameElements(Array(ANY)))
+    // Add a new executor
+    sched.addExecutor("execA", "host1")
+    sched.addExecutor("execB.2", "host2")
+    manager.executorAdded()
+    assert(manager.pendingTasksWithNoPrefs.size === 0)
+    // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
+    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, ANY)))
+    assert(manager.resourceOffer("execA", "host1", ANY) !== None)
+    clock.advance(LOCALITY_WAIT * 4)
+    assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
+    sched.removeExecutor("execA")
+    sched.removeExecutor("execB.2")
+    manager.executorLost("execA", "host1")
+    manager.executorLost("execB.2", "host2")
+    clock.advance(LOCALITY_WAIT * 4)
+    sched.addExecutor("execC", "host3")
+    manager.executorAdded()
+    // Prior to the fix, this line resulted in an 
ArrayIndexOutOfBoundsException:
+    assert(manager.resourceOffer("execC", "host3", ANY) !== None)
+  }
+
+
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new 
TaskMetrics)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to