Repository: spark
Updated Branches:
  refs/heads/master de4792605 -> 9af5423ec


[SPARK-12133][STREAMING] Streaming dynamic allocation

## What changes were proposed in this pull request?

Added a new Executor Allocation Manager for the Streaming scheduler for doing 
Streaming Dynamic Allocation.

## How was this patch tested
Unit tests, and cluster tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #12154 from tdas/streaming-dynamic-allocation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9af5423e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9af5423e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9af5423e

Branch: refs/heads/master
Commit: 9af5423ec28258becf27dbe89833b4f7d324d26a
Parents: de47926
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Apr 6 15:46:20 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Apr 6 15:46:20 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/ExecutorAllocationClient.scala |   4 +
 .../scala/org/apache/spark/SparkContext.scala   |  10 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |   4 +
 .../spark/streaming/StreamingContext.scala      |   7 +-
 .../scheduler/ExecutorAllocationManager.scala   | 233 +++++++++++
 .../streaming/scheduler/JobScheduler.scala      |  14 +
 .../streaming/scheduler/ReceiverTracker.scala   |  19 +
 .../ExecutorAllocationManagerSuite.scala        | 395 +++++++++++++++++++
 8 files changed, 683 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 842bfdb..8baddf4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -23,6 +23,10 @@ package org.apache.spark
  */
 private[spark] trait ExecutorAllocationClient {
 
+
+  /** Get the list of currently active executors */
+  private[spark] def getExecutorIds(): Seq[String]
+
   /**
    * Update the cluster manager on our scheduling needs. Three bits of 
information are included
    * to help it make decisions.

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4b3264c..c40fada 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
     listenerBus.addListener(listener)
   }
 
+  private[spark] override def getExecutorIds(): Seq[String] = {
+    schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.getExecutorIds()
+      case _ =>
+        logWarning("Requesting executors is only supported in coarse-grained 
mode")
+        Nil
+    }
+  }
+
   /**
    * Update the cluster manager on our scheduling needs. Three bits of 
information are included
    * to help it make decisions.

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71bfd4..e5abf0e 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -430,6 +430,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    */
   private def numExistingExecutors: Int = executorDataMap.size
 
+  override def getExecutorIds(): Seq[String] = {
+    executorDataMap.keySet.toSeq
+  }
+
   /**
    * Request an additional number of executors from the cluster manager.
    * @return whether the request is acknowledged.

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 83a1092..cc187f5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
+import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, 
JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
 import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, 
Utils}
 
@@ -527,11 +527,12 @@ class StreamingContext private[streaming] (
       }
     }
 
-    if (Utils.isDynamicAllocationEnabled(sc.conf)) {
+    if (Utils.isDynamicAllocationEnabled(sc.conf) ||
+        ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) {
       logWarning("Dynamic Allocation is enabled for this application. " +
         "Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if " +
         "Write Ahead Log is not enabled for non-replayable sources like Flume. 
" +
-        "See the programming guide for details on how to enable the Write 
Ahead Log")
+        "See the programming guide for details on how to enable the Write 
Ahead Log.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
new file mode 100644
index 0000000..f7b6584
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming.scheduler
+
+import scala.util.Random
+
+import org.apache.spark.{ExecutorAllocationClient, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, Utils}
+
+/**
+ * Class that manages executor allocated to a StreamingContext, and 
dynamically request or kill
+ * executors based on the statistics of the streaming computation. This is 
different from the core
+ * dynamic allocation policy; the core policy relies on executors being idle 
for a while, but the
+ * micro-batch model of streaming prevents any particular executors from being 
idle for a long
+ * time. Instead, the measure of "idle-ness" needs to be based on the time 
taken to process
+ * each batch.
+ *
+ * At a high level, the policy implemented by this class is as follows:
+ * - Use StreamingListener interface get batch processing times of completed 
batches
+ * - Periodically take the average batch completion times and compare with the 
batch interval
+ * - If (avg. proc. time / batch interval) >= scaling up ratio, then request 
more executors.
+ *   The number of executors requested is based on the ratio = (avg. proc. 
time / batch interval).
+ * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to 
kill a executor that
+ *   is not running a receiver.
+ *
+ * This features should ideally be used in conjunction with backpressure, as 
backpressure ensures
+ * system stability, while executors are being readjusted.
+ */
+private[streaming] class ExecutorAllocationManager(
+    client: ExecutorAllocationClient,
+    receiverTracker: ReceiverTracker,
+    conf: SparkConf,
+    batchDurationMs: Long,
+    clock: Clock) extends StreamingListener with Logging {
+
+  import ExecutorAllocationManager._
+
+  private val scalingIntervalSecs = conf.getTimeAsSeconds(
+    SCALING_INTERVAL_KEY,
+    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
+  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, 
SCALING_UP_RATIO_DEFAULT)
+  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, 
SCALING_DOWN_RATIO_DEFAULT)
+  private val minNumExecutors = conf.getInt(
+    MIN_EXECUTORS_KEY,
+    math.max(1, receiverTracker.numReceivers))
+  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, 
Integer.MAX_VALUE)
+  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
+    _ => manageAllocation(), "streaming-executor-allocation-manager")
+
+  @volatile private var batchProcTimeSum = 0L
+  @volatile private var batchProcTimeCount = 0
+
+  validateSettings()
+
+  def start(): Unit = {
+    timer.start()
+    logInfo(s"ExecutorAllocationManager started with " +
+      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = 
$scalingIntervalSecs sec")
+  }
+
+  def stop(): Unit = {
+    timer.stop(interruptTimer = true)
+    logInfo("ExecutorAllocationManager stopped")
+  }
+
+  /**
+   * Manage executor allocation by requesting or killing executors based on 
the collected
+   * batch statistics.
+   */
+  private def manageAllocation(): Unit = synchronized {
+    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, 
$scalingDownRatio]")
+    if (batchProcTimeCount > 0) {
+      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
+      val ratio = averageBatchProcTime.toDouble / batchDurationMs
+      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
+      if (ratio >= scalingUpRatio) {
+        logDebug("Requesting executors")
+        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
+        requestExecutors(numNewExecutors)
+      } else if (ratio <= scalingDownRatio) {
+        logDebug("Killing executors")
+        killExecutor()
+      }
+    }
+    batchProcTimeSum = 0
+    batchProcTimeCount = 0
+  }
+
+  /** Request the specified number of executors over the currently active one 
*/
+  private def requestExecutors(numNewExecutors: Int): Unit = {
+    require(numNewExecutors >= 1)
+    val allExecIds = client.getExecutorIds()
+    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
+    val targetTotalExecutors =
+      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), 
minNumExecutors)
+    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
+    logInfo(s"Requested total $targetTotalExecutors executors")
+  }
+
+  /** Kill an executor that is not running any receiver, if possible */
+  private def killExecutor(): Unit = {
+    val allExecIds = client.getExecutorIds()
+    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
+
+    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
+      val execIdsWithReceivers = 
receiverTracker.allocatedExecutors.values.flatten.toSeq
+      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): 
${execIdsWithReceivers}")
+
+      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
+      logDebug(s"Removable executors (${removableExecIds.size}): 
${removableExecIds}")
+      if (removableExecIds.nonEmpty) {
+        val execIdToRemove = 
removableExecIds(Random.nextInt(removableExecIds.size))
+        client.killExecutor(execIdToRemove)
+        logInfo(s"Requested to kill executor $execIdToRemove")
+      } else {
+        logInfo(s"No non-receiver executors to kill")
+      }
+    } else {
+      logInfo("No available executor to kill")
+    }
+  }
+
+  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
+    batchProcTimeSum += timeMs
+    batchProcTimeCount += 1
+    logDebug(
+      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = 
$batchProcTimeCount")
+  }
+
+  private def validateSettings(): Unit = {
+    require(
+      scalingIntervalSecs > 0,
+      s"Config $SCALING_INTERVAL_KEY must be more than 0")
+
+    require(
+      scalingUpRatio > 0,
+      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
+
+    require(
+      scalingDownRatio > 0,
+      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
+
+    require(
+      minNumExecutors > 0,
+      s"Config $MIN_EXECUTORS_KEY must be more than 0")
+
+    require(
+      maxNumExecutors > 0,
+      s"$MAX_EXECUTORS_KEY must be more than 0")
+
+    require(
+      scalingUpRatio > scalingDownRatio,
+      s"Config $SCALING_UP_RATIO_KEY must be more than config 
$SCALING_DOWN_RATIO_KEY")
+
+    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
+      require(
+        maxNumExecutors >= minNumExecutors,
+        s"Config $MAX_EXECUTORS_KEY must be more than config 
$MIN_EXECUTORS_KEY")
+    }
+  }
+
+  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
+    logDebug("onBatchCompleted called: " + batchCompleted)
+    if 
(!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty))
 {
+      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
+    }
+  }
+}
+
+private[streaming] object ExecutorAllocationManager extends Logging {
+  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
+
+  val SCALING_INTERVAL_KEY = 
"spark.streaming.dynamicAllocation.scalingInterval"
+  val SCALING_INTERVAL_DEFAULT_SECS = 60
+
+  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
+  val SCALING_UP_RATIO_DEFAULT = 0.9
+
+  val SCALING_DOWN_RATIO_KEY = 
"spark.streaming.dynamicAllocation.scalingDownRatio"
+  val SCALING_DOWN_RATIO_DEFAULT = 0.3
+
+  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
+
+  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
+
+  def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+    val numExecutor = conf.getInt("spark.executor.instances", 0)
+    val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
+    if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
+      throw new IllegalArgumentException(
+        "Dynamic Allocation for streaming cannot be enabled while 
spark.executor.instances is set.")
+    }
+    if (Utils.isDynamicAllocationEnabled(conf) && 
streamingDynamicAllocationEnabled) {
+      throw new IllegalArgumentException(
+        """
+          |Dynamic Allocation cannot be enabled for both streaming and core at 
the same time.
+          |Please disable core Dynamic Allocation by setting 
spark.dynamicAllocation.enabled to
+          |false to use Dynamic Allocation in streaming.
+        """.stripMargin)
+    }
+    val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", 
false)
+    numExecutor == 0 && streamingDynamicAllocationEnabled && 
(!Utils.isLocalMaster(conf) || testing)
+  }
+
+  def createIfEnabled(
+      client: ExecutorAllocationClient,
+      receiverTracker: ReceiverTracker,
+      conf: SparkConf,
+      batchDurationMs: Long,
+      clock: Clock): Option[ExecutorAllocationManager] = {
+    if (isDynamicAllocationEnabled(conf)) {
+      Some(new ExecutorAllocationManager(client, receiverTracker, conf, 
batchDurationMs, clock))
+    } else None
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 61f9e09..303c325 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -57,6 +57,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
   // A tracker to track all the input stream information as well as processed 
record number
   var inputInfoTracker: InputInfoTracker = null
 
+  private var executorAllocationManager: Option[ExecutorAllocationManager] = 
None
+
   private var eventLoop: EventLoop[JobSchedulerEvent] = null
 
   def start(): Unit = synchronized {
@@ -79,8 +81,16 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
     listenerBus.start()
     receiverTracker = new ReceiverTracker(ssc)
     inputInfoTracker = new InputInfoTracker(ssc)
+    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
+      ssc.sparkContext,
+      receiverTracker,
+      ssc.conf,
+      ssc.graph.batchDuration.milliseconds,
+      clock)
+    executorAllocationManager.foreach(ssc.addStreamingListener)
     receiverTracker.start()
     jobGenerator.start()
+    executorAllocationManager.foreach(_.start())
     logInfo("Started JobScheduler")
   }
 
@@ -93,6 +103,10 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
       receiverTracker.stop(processAllReceivedData)
     }
 
+    if (executorAllocationManager != null) {
+      executorAllocationManager.foreach(_.stop())
+    }
+
     // Second, stop generating jobs. If it has to process all received data,
     // then this will wait for all the processing through JobScheduler to be 
over.
     jobGenerator.stop(processAllReceivedData)

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b3ae287..d67f707 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -92,6 +92,8 @@ private[streaming] case object AllReceiverIds extends 
ReceiverTrackerLocalMessag
 private[streaming] case class UpdateReceiverRateLimit(streamUID: Int, newRate: 
Long)
   extends ReceiverTrackerLocalMessage
 
+private[streaming] case object GetAllReceiverInfo extends 
ReceiverTrackerLocalMessage
+
 /**
  * This class manages the execution of the receivers of ReceiverInputDStreams. 
Instance of
  * this class must be created after all input streams have been added and 
StreamingContext.start()
@@ -234,6 +236,20 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
     }
   }
 
+  /**
+   * Get the executors allocated to each receiver.
+   * @return a map containing receiver ids to optional executor ids.
+   */
+  def allocatedExecutors(): Map[Int, Option[String]] = {
+    endpoint.askWithRetry[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+      _.runningExecutor.map { _.executorId }
+    }
+  }
+
+  def numReceivers(): Int = {
+    receiverInputStreams.size
+  }
+
   /** Register a receiver */
   private def registerReceiver(
       streamId: Int,
@@ -506,9 +522,12 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
       case DeregisterReceiver(streamId, message, error) =>
         deregisterReceiver(streamId, message, error)
         context.reply(true)
+
       // Local messages
       case AllReceiverIds =>
         context.reply(receiverTrackingInfos.filter(_._2.state != 
ReceiverState.INACTIVE).keys.toSeq)
+      case GetAllReceiverInfo =>
+        context.reply(receiverTrackingInfos.toMap)
       case StopAllReceivers =>
         assert(isTrackerStopping || isTrackerStopped)
         stopReceivers()

http://git-wip-us.apache.org/repos/asf/spark/blob/9af5423e/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
new file mode 100644
index 0000000..7630f4a
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.mockito.Matchers.{eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually.{eventually, timeout}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite}
+import org.apache.spark.streaming.{DummyInputDStream, Seconds, 
StreamingContext}
+import org.apache.spark.util.{ManualClock, Utils}
+
+
+class ExecutorAllocationManagerSuite extends SparkFunSuite
+  with BeforeAndAfter with BeforeAndAfterAll with MockitoSugar with 
PrivateMethodTester {
+
+  import ExecutorAllocationManager._
+
+  private val batchDurationMillis = 1000L
+  private var allocationClient: ExecutorAllocationClient = null
+  private var clock: ManualClock = null
+
+  before {
+    allocationClient = mock[ExecutorAllocationClient]
+    clock = new ManualClock()
+  }
+
+  test("basic functionality") {
+    // Test that adding batch processing time info to allocation manager
+    // causes executors to be requested and killed accordingly
+
+    // There is 1 receiver, and exec 1 has been allocated to it
+    withAllocationManager(numReceivers = 1) { case (receiverTracker, 
allocationManager) =>
+      when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1")))
+
+      /** Add data point for batch processing time and verify executor 
allocation */
+      def addBatchProcTimeAndVerifyAllocation(batchProcTimeMs: Double)(body: 
=> Unit): Unit = {
+        // 2 active executors
+        reset(allocationClient)
+        when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
+        addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
+        clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+        eventually(timeout(10 seconds)) {
+          body
+        }
+      }
+
+      /** Verify that the expected number of total executor were requested */
+      def verifyTotalRequestedExecs(expectedRequestedTotalExecs: Option[Int]): 
Unit = {
+        if (expectedRequestedTotalExecs.nonEmpty) {
+          require(expectedRequestedTotalExecs.get > 0)
+          verify(allocationClient, times(1)).requestTotalExecutors(
+            meq(expectedRequestedTotalExecs.get), meq(0), meq(Map.empty))
+        } else {
+          verify(allocationClient, never).requestTotalExecutors(0, 0, 
Map.empty)
+        }
+      }
+
+      /** Verify that a particular executor was killed */
+      def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
+        if (expectedKilledExec.nonEmpty) {
+          verify(allocationClient, 
times(1)).killExecutor(meq(expectedKilledExec.get))
+        } else {
+          verify(allocationClient, never).killExecutor(null)
+        }
+      }
+
+      // Batch proc time = batch interval, should increase allocation by 1
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
+        verifyTotalRequestedExecs(Some(3)) // one already allocated, increase 
allocation by 1
+        verifyKilledExec(None)
+      }
+
+      // Batch proc time = batch interval * 2, should increase allocation by 2
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
+        verifyTotalRequestedExecs(Some(4))
+        verifyKilledExec(None)
+      }
+
+      // Batch proc time slightly more than the scale up ratio, should 
increase allocation by 1
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 
SCALING_UP_RATIO_DEFAULT + 1) {
+        verifyTotalRequestedExecs(Some(3))
+        verifyKilledExec(None)
+      }
+
+      // Batch proc time slightly less than the scale up ratio, should not 
change allocation
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 
SCALING_UP_RATIO_DEFAULT - 1) {
+        verifyTotalRequestedExecs(None)
+        verifyKilledExec(None)
+      }
+
+      // Batch proc time slightly more than the scale down ratio, should not 
change allocation
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 
SCALING_DOWN_RATIO_DEFAULT + 1) {
+        verifyTotalRequestedExecs(None)
+        verifyKilledExec(None)
+      }
+
+      // Batch proc time slightly more than the scale down ratio, should not 
change allocation
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 
SCALING_DOWN_RATIO_DEFAULT - 1) {
+        verifyTotalRequestedExecs(None)
+        verifyKilledExec(Some("2"))
+      }
+    }
+  }
+
+  test("requestExecutors policy") {
+
+    /** Verify that the expected number of total executor were requested */
+    def verifyRequestedExecs(
+        numExecs: Int,
+        numNewExecs: Int,
+        expectedRequestedTotalExecs: Int)(
+      implicit allocationManager: ExecutorAllocationManager): Unit = {
+      reset(allocationClient)
+      when(allocationClient.getExecutorIds()).thenReturn((1 to 
numExecs).map(_.toString))
+      requestExecutors(allocationManager, numNewExecs)
+      verify(allocationClient, times(1)).requestTotalExecutors(
+        meq(expectedRequestedTotalExecs), meq(0), meq(Map.empty))
+    }
+
+    withAllocationManager(numReceivers = 1) { case (_, allocationManager) =>
+      implicit val am = allocationManager
+      intercept[IllegalArgumentException] {
+        verifyRequestedExecs(numExecs = 0, numNewExecs = 0, 0)
+      }
+      verifyRequestedExecs(numExecs = 0, numNewExecs = 1, 
expectedRequestedTotalExecs = 1)
+      verifyRequestedExecs(numExecs = 1, numNewExecs = 1, 
expectedRequestedTotalExecs = 2)
+      verifyRequestedExecs(numExecs = 2, numNewExecs = 2, 
expectedRequestedTotalExecs = 4)
+    }
+
+    withAllocationManager(numReceivers = 2) { case(_, allocationManager) =>
+      implicit val am = allocationManager
+
+      verifyRequestedExecs(numExecs = 0, numNewExecs = 1, 
expectedRequestedTotalExecs = 2)
+      verifyRequestedExecs(numExecs = 1, numNewExecs = 1, 
expectedRequestedTotalExecs = 2)
+      verifyRequestedExecs(numExecs = 2, numNewExecs = 2, 
expectedRequestedTotalExecs = 4)
+    }
+
+    withAllocationManager(
+      // Test min 2 executors
+      new SparkConf().set("spark.streaming.dynamicAllocation.minExecutors", 
"2")) {
+      case (_, allocationManager) =>
+        implicit val am = allocationManager
+
+        verifyRequestedExecs(numExecs = 0, numNewExecs = 1, 
expectedRequestedTotalExecs = 2)
+        verifyRequestedExecs(numExecs = 0, numNewExecs = 3, 
expectedRequestedTotalExecs = 3)
+        verifyRequestedExecs(numExecs = 1, numNewExecs = 1, 
expectedRequestedTotalExecs = 2)
+        verifyRequestedExecs(numExecs = 1, numNewExecs = 2, 
expectedRequestedTotalExecs = 3)
+        verifyRequestedExecs(numExecs = 2, numNewExecs = 1, 
expectedRequestedTotalExecs = 3)
+        verifyRequestedExecs(numExecs = 2, numNewExecs = 2, 
expectedRequestedTotalExecs = 4)
+    }
+
+    withAllocationManager(
+      // Test with max 2 executors
+      new SparkConf().set("spark.streaming.dynamicAllocation.maxExecutors", 
"2")) {
+      case (_, allocationManager) =>
+        implicit val am = allocationManager
+
+        verifyRequestedExecs(numExecs = 0, numNewExecs = 1, 
expectedRequestedTotalExecs = 1)
+        verifyRequestedExecs(numExecs = 0, numNewExecs = 3, 
expectedRequestedTotalExecs = 2)
+        verifyRequestedExecs(numExecs = 1, numNewExecs = 2, 
expectedRequestedTotalExecs = 2)
+        verifyRequestedExecs(numExecs = 2, numNewExecs = 1, 
expectedRequestedTotalExecs = 2)
+        verifyRequestedExecs(numExecs = 2, numNewExecs = 2, 
expectedRequestedTotalExecs = 2)
+    }
+  }
+
+  test("killExecutor policy") {
+
+    /**
+     * Verify that a particular executor was killed, given active executors 
and executors
+     * allocated to receivers.
+     */
+    def verifyKilledExec(
+        execIds: Seq[String],
+        receiverExecIds: Map[Int, Option[String]],
+        expectedKilledExec: Option[String])(
+        implicit x: (ReceiverTracker, ExecutorAllocationManager)): Unit = {
+      val (receiverTracker, allocationManager) = x
+
+      reset(allocationClient)
+      when(allocationClient.getExecutorIds()).thenReturn(execIds)
+      when(receiverTracker.allocatedExecutors).thenReturn(receiverExecIds)
+      killExecutor(allocationManager)
+      if (expectedKilledExec.nonEmpty) {
+        verify(allocationClient, 
times(1)).killExecutor(meq(expectedKilledExec.get))
+      } else {
+        verify(allocationClient, never).killExecutor(null)
+      }
+    }
+
+    withAllocationManager() { case (receiverTracker, allocationManager) =>
+      implicit val rcvrTrackerAndExecAllocMgr = (receiverTracker, 
allocationManager)
+
+      verifyKilledExec(Nil, Map.empty, None)
+      verifyKilledExec(Seq("1", "2"), Map.empty, None)
+      verifyKilledExec(Seq("1"), Map(1 -> Some("1")), None)
+      verifyKilledExec(Seq("1", "2"), Map(1 -> Some("1")), Some("2"))
+      verifyKilledExec(Seq("1", "2"), Map(1 -> Some("1"), 2 -> Some("2")), 
None)
+    }
+
+    withAllocationManager(
+      new SparkConf().set("spark.streaming.dynamicAllocation.minExecutors", 
"2")) {
+      case (receiverTracker, allocationManager) =>
+        implicit val rcvrTrackerAndExecAllocMgr = (receiverTracker, 
allocationManager)
+
+        verifyKilledExec(Seq("1", "2"), Map.empty, None)
+        verifyKilledExec(Seq("1", "2", "3"), Map(1 -> Some("1"), 2 -> 
Some("2")), Some("3"))
+    }
+  }
+
+  test("parameter validation") {
+
+    def validateParams(
+        numReceivers: Int = 1,
+        scalingIntervalSecs: Option[Int] = None,
+        scalingUpRatio: Option[Double] = None,
+        scalingDownRatio: Option[Double] = None,
+        minExecs: Option[Int] = None,
+        maxExecs: Option[Int] = None): Unit = {
+      require(numReceivers > 0)
+      val receiverTracker = mock[ReceiverTracker]
+      when(receiverTracker.numReceivers()).thenReturn(numReceivers)
+      val conf = new SparkConf()
+      if (scalingIntervalSecs.nonEmpty) {
+        conf.set(
+          "spark.streaming.dynamicAllocation.scalingInterval",
+          s"${scalingIntervalSecs.get}s")
+      }
+      if (scalingUpRatio.nonEmpty) {
+        conf.set("spark.streaming.dynamicAllocation.scalingUpRatio", 
scalingUpRatio.get.toString)
+      }
+      if (scalingDownRatio.nonEmpty) {
+        conf.set(
+          "spark.streaming.dynamicAllocation.scalingDownRatio",
+          scalingDownRatio.get.toString)
+      }
+      if (minExecs.nonEmpty) {
+        conf.set("spark.streaming.dynamicAllocation.minExecutors", 
minExecs.get.toString)
+      }
+      if (maxExecs.nonEmpty) {
+        conf.set("spark.streaming.dynamicAllocation.maxExecutors", 
maxExecs.get.toString)
+      }
+      new ExecutorAllocationManager(
+        allocationClient, receiverTracker, conf, batchDurationMillis, clock)
+    }
+
+    validateParams(numReceivers = 1)
+    validateParams(numReceivers = 2, minExecs = Some(1))
+    validateParams(numReceivers = 2, minExecs = Some(3))
+    validateParams(numReceivers = 2, maxExecs = Some(3))
+    validateParams(numReceivers = 2, maxExecs = Some(1))
+    validateParams(minExecs = Some(3), maxExecs = Some(3))
+    validateParams(scalingIntervalSecs = Some(1))
+    validateParams(scalingUpRatio = Some(1.1))
+    validateParams(scalingDownRatio = Some(0.1))
+    validateParams(scalingUpRatio = Some(1.1), scalingDownRatio = Some(0.1))
+
+    intercept[IllegalArgumentException] {
+      validateParams(minExecs = Some(0))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(minExecs = Some(-1))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(maxExecs = Some(0))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(maxExecs = Some(-1))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(minExecs = Some(4), maxExecs = Some(3))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingIntervalSecs = Some(-1))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingIntervalSecs = Some(0))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingUpRatio = Some(-0.1))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingUpRatio = Some(0))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingDownRatio = Some(-0.1))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingDownRatio = Some(0))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingUpRatio = Some(0.5), scalingDownRatio = Some(0.5))
+    }
+    intercept[IllegalArgumentException] {
+      validateParams(scalingUpRatio = Some(0.3), scalingDownRatio = Some(0.5))
+    }
+  }
+
+  test("enabling and disabling") {
+    withStreamingContext(new SparkConf()) { ssc =>
+      ssc.start()
+      assert(getExecutorAllocationManager(ssc).isEmpty)
+    }
+
+    withStreamingContext(
+      new SparkConf().set("spark.streaming.dynamicAllocation.enabled", 
"true")) { ssc =>
+      ssc.start()
+      assert(getExecutorAllocationManager(ssc).nonEmpty)
+    }
+
+    val confWithBothDynamicAllocationEnabled = new SparkConf()
+      .set("spark.streaming.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.testing", "true")
+    
require(Utils.isDynamicAllocationEnabled(confWithBothDynamicAllocationEnabled) 
=== true)
+    withStreamingContext(confWithBothDynamicAllocationEnabled) { ssc =>
+      intercept[IllegalArgumentException] {
+        ssc.start()
+      }
+    }
+  }
+
+  private def withAllocationManager(
+      conf: SparkConf = new SparkConf,
+      numReceivers: Int = 1
+    )(body: (ReceiverTracker, ExecutorAllocationManager) => Unit): Unit = {
+
+    val receiverTracker = mock[ReceiverTracker]
+    when(receiverTracker.numReceivers()).thenReturn(numReceivers)
+
+    val manager = new ExecutorAllocationManager(
+      allocationClient, receiverTracker, conf, batchDurationMillis, clock)
+    try {
+      manager.start()
+      body(receiverTracker, manager)
+    } finally {
+      manager.stop()
+    }
+  }
+
+  private val _addBatchProcTime = PrivateMethod[Unit]('addBatchProcTime)
+  private val _requestExecutors = PrivateMethod[Unit]('requestExecutors)
+  private val _killExecutor = PrivateMethod[Unit]('killExecutor)
+  private val _executorAllocationManager =
+    
PrivateMethod[Option[ExecutorAllocationManager]]('executorAllocationManager)
+
+  private def addBatchProcTime(manager: ExecutorAllocationManager, timeMs: 
Long): Unit = {
+    manager invokePrivate _addBatchProcTime(timeMs)
+  }
+
+  private def requestExecutors(manager: ExecutorAllocationManager, newExecs: 
Int): Unit = {
+    manager invokePrivate _requestExecutors(newExecs)
+  }
+
+  private def killExecutor(manager: ExecutorAllocationManager): Unit = {
+    manager invokePrivate _killExecutor()
+  }
+
+  private def getExecutorAllocationManager(
+      ssc: StreamingContext): Option[ExecutorAllocationManager] = {
+    ssc.scheduler invokePrivate _executorAllocationManager()
+  }
+
+  private def withStreamingContext(conf: SparkConf)(body: StreamingContext => 
Unit): Unit = {
+    conf.setMaster("local").setAppName(this.getClass.getSimpleName).set(
+      "spark.streaming.dynamicAllocation.testing", "true")  // to test dynamic 
allocation
+
+    var ssc: StreamingContext = null
+    try {
+      ssc = new  StreamingContext(conf, Seconds(1))
+      new DummyInputDStream(ssc).foreachRDD(_ => { })
+      body(ssc)
+    } finally {
+      if (ssc != null) ssc.stop()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to