Repository: spark
Updated Branches:
  refs/heads/master d9c25dec8 -> f023aa2fc


[SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers 
returns balanced results

This PR fixes the following cases for `ReceiverSchedulingPolicy`.

1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: 
r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will 
return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).
Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested,  
and try to register with ReceiverTracker. But the previous 
`ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) 
according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 
0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected 
since r1 is starting exactly where `scheduleReceivers` suggested.

This case can be fixed by ignoring the information of the receiver that is 
rescheduling in `receiverTrackingInfoMap`.

2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 
cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is 
restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will 
always return (host1, host2, host3). So it's possible that r2 will be scheduled 
to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that 
there are 3 receivers running on host1, while host2 and host3 are idle.

This issue can be fixed by returning only executors that have the minimum wight 
rather than returning at least 3 executors.

Author: zsxwing <zsxw...@gmail.com>

Closes #8340 from zsxwing/fix-receiver-scheduling.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f023aa2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f023aa2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f023aa2f

Branch: refs/heads/master
Commit: f023aa2fcc1d1dbb82aee568be0a8f2457c309ae
Parents: d9c25de
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon Aug 24 23:34:50 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 24 23:34:50 2015 -0700

----------------------------------------------------------------------
 .../scheduler/ReceiverSchedulingPolicy.scala    |  58 +++++++---
 .../streaming/scheduler/ReceiverTracker.scala   | 106 ++++++++++++-------
 .../ReceiverSchedulingPolicySuite.scala         |  13 +--
 3 files changed, 120 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f023aa2f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index ef5b687..10b5a7f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -22,6 +22,36 @@ import scala.collection.mutable
 
 import org.apache.spark.streaming.receiver.Receiver
 
+/**
+ * A class that tries to schedule receivers with evenly distributed. There are 
two phases for
+ * scheduling receivers.
+ *
+ * - The first phase is global scheduling when ReceiverTracker is starting and 
we need to schedule
+ *   all receivers at the same time. ReceiverTracker will call 
`scheduleReceivers` at this phase.
+ *   It will try to schedule receivers with evenly distributed. 
ReceiverTracker should update its
+ *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
+ *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to 
an executor list that
+ *   contains the scheduled locations. Then when a receiver is starting, it 
will send a register
+ *   request and `ReceiverTracker.registerReceiver` will be called. In
+ *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors 
is set, it should check
+ *   if the location of this receiver is one of the scheduled executors, if 
not, the register will
+ *   be rejected.
+ * - The second phase is local scheduling when a receiver is restarting. There 
are two cases of
+ *   receiver restarting:
+ *   - If a receiver is restarting because it's rejected due to the real 
location and the scheduled
+ *     executors mismatching, in other words, it fails to start in one of the 
locations that
+ *     `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose 
the executors that are
+ *     still alive in the list of scheduled executors, then use them to launch 
the receiver job.
+ *   - If a receiver is restarting without a scheduled executors list, or the 
executors in the list
+ *     are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, 
`ReceiverTracker` should
+ *     not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, 
instead, it should clear
+ *     it. Then when this receiver is registering, we can know this is a local 
scheduling, and
+ *     `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check 
if the launching
+ *     location is matching.
+ *
+ * In conclusion, we should make a global schedule, try to achieve that 
exactly as long as possible,
+ * otherwise do local scheduling.
+ */
 private[streaming] class ReceiverSchedulingPolicy {
 
   /**
@@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {
 
   /**
    * Return a list of candidate executors to run the receiver. If the list is 
empty, the caller can
-   * run this receiver in arbitrary executor. The caller can use 
`preferredNumExecutors` to require
-   * returning `preferredNumExecutors` executors if possible.
+   * run this receiver in arbitrary executor.
    *
    * This method tries to balance executors' load. Here is the approach to 
schedule executors
    * for a receiver.
@@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy {
    *         If a receiver is scheduled to an executor but has not yet run, it 
contributes
    *         `1.0 / #candidate_executors_of_this_receiver` to the executor's 
weight.</li>
    *     </ul>
-   *     At last, if there are more than `preferredNumExecutors` idle 
executors (weight = 0),
-   *     returns all idle executors. Otherwise, we only return 
`preferredNumExecutors` best options
-   *     according to the weights.
+   *     At last, if there are any idle executors (weight = 0), returns all 
idle executors.
+   *     Otherwise, returns the executors that have the minimum weight.
    *   </li>
    * </ol>
    *
@@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy {
       receiverId: Int,
       preferredLocation: Option[String],
       receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
-      executors: Seq[String],
-      preferredNumExecutors: Int = 3): Seq[String] = {
+      executors: Seq[String]): Seq[String] = {
     if (executors.isEmpty) {
       return Seq.empty
     }
@@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy {
       }
     }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
 
-    val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
-    if (idleExecutors.size >= preferredNumExecutors) {
-      // If there are more than `preferredNumExecutors` idle executors, return 
all of them
+    val idleExecutors = executors.toSet -- executorWeights.keys
+    if (idleExecutors.nonEmpty) {
       scheduledExecutors ++= idleExecutors
     } else {
-      // If there are less than `preferredNumExecutors` idle executors, return 
3 best options
-      scheduledExecutors ++= idleExecutors
-      val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
-      scheduledExecutors ++= (idleExecutors ++ 
sortedExecutors).take(preferredNumExecutors)
+      // There is no idle executor. So select all executors that have the 
minimum weight.
+      val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
+      if (sortedExecutors.nonEmpty) {
+        val minWeight = sortedExecutors(0)._2
+        scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == 
minWeight).map(_._1)
+      } else {
+        // This should not happen since "executors" is not empty
+      }
     }
     scheduledExecutors.toSeq
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f023aa2f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 30d25a6..3d532a6 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
     }
 
     if (isTrackerStopping || isTrackerStopped) {
-      false
-    } else if (!scheduleReceiver(streamId).contains(hostPort)) {
+      return false
+    }
+
+    val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
+    val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+        // This receiver is registering and it's scheduled by
+        // ReceiverSchedulingPolicy.scheduleReceivers. So use 
"scheduledExecutors" to check it.
+        scheduledExecutors.get
+      } else {
+        // This receiver is scheduled by 
"ReceiverSchedulingPolicy.rescheduleReceiver", so calling
+        // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
+        scheduleReceiver(streamId)
+      }
+
+    if (!accetableExecutors.contains(hostPort)) {
       // Refuse it since it's scheduled to a wrong executor
       false
     } else {
@@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
           startReceiver(receiver, executors)
         }
       case RestartReceiver(receiver) =>
-        val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
-          receiver.streamId,
-          receiver.preferredLocation,
-          receiverTrackingInfos,
-          getExecutors)
-        updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors)
+        // Old scheduled executors minus the ones that are not active any more
+        val oldScheduledExecutors = 
getStoredScheduledExecutors(receiver.streamId)
+        val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+            // Try global scheduling again
+            oldScheduledExecutors
+          } else {
+            val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
+            // Clear "scheduledExecutors" to indicate we are going to do local 
scheduling
+            val newReceiverInfo = oldReceiverInfo.copy(
+              state = ReceiverState.INACTIVE, scheduledExecutors = None)
+            receiverTrackingInfos(receiver.streamId) = newReceiverInfo
+            schedulingPolicy.rescheduleReceiver(
+              receiver.streamId,
+              receiver.preferredLocation,
+              receiverTrackingInfos,
+              getExecutors)
+          }
+        // Assume there is one receiver restarting at one time, so we don't 
need to update
+        // receiverTrackingInfos
         startReceiver(receiver, scheduledExecutors)
       case c: CleanupOldBlocks =>
         receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
@@ -465,6 +491,24 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
     }
 
     /**
+     * Return the stored scheduled executors that are still alive.
+     */
+    private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+      if (receiverTrackingInfos.contains(receiverId)) {
+        val scheduledExecutors = 
receiverTrackingInfos(receiverId).scheduledExecutors
+        if (scheduledExecutors.nonEmpty) {
+          val executors = getExecutors.toSet
+          // Only return the alive executors
+          scheduledExecutors.get.filter(executors)
+        } else {
+          Nil
+        }
+      } else {
+        Nil
+      }
+    }
+
+    /**
      * Start a receiver along with its scheduled executors
      */
     private def startReceiver(receiver: Receiver[_], scheduledExecutors: 
Seq[String]): Unit = {
@@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
 
       // Function to start the receiver on the worker node
-      val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, 
serializableHadoopConf)
+      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
+        (iterator: Iterator[Receiver[_]]) => {
+          if (!iterator.hasNext) {
+            throw new SparkException(
+              "Could not start receiver as object not found.")
+          }
+          if (TaskContext.get().attemptNumber() == 0) {
+            val receiver = iterator.next()
+            assert(iterator.hasNext == false)
+            val supervisor = new ReceiverSupervisorImpl(
+              receiver, SparkEnv.get, serializableHadoopConf.value, 
checkpointDirOption)
+            supervisor.start()
+            supervisor.awaitTermination()
+          } else {
+            // It's restarted by TaskScheduler, but we want to reschedule it 
again. So exit it.
+          }
+        }
 
       // Create the RDD using the scheduledExecutors to run the receiver in a 
Spark job
       val receiverRDD: RDD[Receiver[_]] =
@@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
   }
 
 }
-
-/**
- * Function to start the receiver on the worker node. Use a class instead of 
closure to avoid
- * the serialization issue.
- */
-private[streaming] class StartReceiverFunc(
-    checkpointDirOption: Option[String],
-    serializableHadoopConf: SerializableConfiguration)
-  extends (Iterator[Receiver[_]] => Unit) with Serializable {
-
-  override def apply(iterator: Iterator[Receiver[_]]): Unit = {
-    if (!iterator.hasNext) {
-      throw new SparkException(
-        "Could not start receiver as object not found.")
-    }
-    if (TaskContext.get().attemptNumber() == 0) {
-      val receiver = iterator.next()
-      assert(iterator.hasNext == false)
-      val supervisor = new ReceiverSupervisorImpl(
-        receiver, SparkEnv.get, serializableHadoopConf.value, 
checkpointDirOption)
-      supervisor.start()
-      supervisor.awaitTermination()
-    } else {
-      // It's restarted by TaskScheduler, but we want to reschedule it again. 
So exit it.
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f023aa2f/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index 0418d77..b2a51d7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
     assert(scheduledExecutors.toSet === Set("host1", "host2"))
   }
 
-  test("rescheduleReceiver: return all idle executors if more than 3 idle 
executors") {
+  test("rescheduleReceiver: return all idle executors if there are any idle 
executors") {
     val executors = Seq("host1", "host2", "host3", "host4", "host5")
     // host3 is idle
     val receiverTrackingInfoMap = Map(
@@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
     assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", 
"host5"))
   }
 
-  test("rescheduleReceiver: return 3 best options if less than 3 idle 
executors") {
+  test("rescheduleReceiver: return all executors that have minimum weight if 
no idle executors") {
     val executors = Seq("host1", "host2", "host3", "host4", "host5")
-    // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
-    // host4 and host5 are idle
+    // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
     val receiverTrackingInfoMap = Map(
       0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
       1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", 
"host3")), None),
-      2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", 
"host3")), None))
+      2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", 
"host3")), None),
+      3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", 
"host5")), None))
     val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
-      3, None, receiverTrackingInfoMap, executors)
+      4, None, receiverTrackingInfoMap, executors)
     assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
   }
 
@@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
       assert(executors.isEmpty)
     }
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to