Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 0e56f83ce -> c9b2cea81


[GEARPUMP-277] Allow user to configure retry times on app failure

Author: huafengw <[email protected]>

Closes #156 from huafengw/fix277.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c9b2cea8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c9b2cea8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c9b2cea8

Branch: refs/heads/master
Commit: c9b2cea813e5e8262fbbcf0e9e72c13cd463a424
Parents: 0e56f83
Author: huafengw <[email protected]>
Authored: Wed Feb 22 16:28:33 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Feb 22 16:29:01 2017 +0800

----------------------------------------------------------------------
 conf/gear.conf                                  |  3 ++
 core/src/main/resources/geardefault.conf        |  3 ++
 .../gearpump/cluster/master/AppManager.scala    |  8 ++-
 .../org/apache/gearpump/util/Constants.scala    |  2 +
 .../apache/gearpump/util/RestartPolicy.scala    | 19 +++----
 core/src/test/resources/test.conf               |  3 ++
 .../gearpump/util/RestartPolicySpec.scala       | 39 ++++++++++++++
 .../apache/gearpump/streaming/Constants.scala   |  3 --
 .../streaming/appmaster/TaskManager.scala       | 23 +++------
 .../executor/ExecutorRestartPolicy.scala        | 53 --------------------
 .../streaming/appmaster/AppMasterSpec.scala     | 16 +++---
 .../appmaster/ExecutorRestartPolicySpec.scala   | 46 -----------------
 12 files changed, 73 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index 52c67b8..92d6708 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -70,6 +70,9 @@ gearpump {
   ## Number of executors to launch when starting an application
   application.executor-num = 1
 
+  ## Application's total number of times that allowed to be restarted
+  application.total-retries = 5
+
   ###########################
   ### Change the dispather for tasks
   ### If you don't know what this is about, don't change it

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf 
b/core/src/main/resources/geardefault.conf
index e4aaec6..3fe030a 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -42,6 +42,9 @@ gearpump {
   ## Number of executors to launch when starting an application
   application.executor-num = 1
 
+  ## Application's total number of times that allowed to be restarted
+  application.total-retries = 5
+
   ## Unique Id to identity this worker instance in low level resource manager 
like YARN.
   ##
   ## This value is typically configured by resource manager integration 
module, like gearpump-yarn in this case.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 24e70dd..049d11d 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.master
 
 import akka.actor._
 import akka.pattern.ask
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, 
SaveAppDataFailed, _}
 import org.apache.gearpump.cluster.AppMasterToWorker._
@@ -49,8 +49,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  private val appMasterMaxRetries: Int = 5
-  private val appMasterRetryTimeRange: Duration = 20.seconds
+  private val appTotalRetries: Int = 5
 
   implicit val timeout = FUTURE_TIMEOUT
   implicit val executionContext = context.dispatcher
@@ -102,8 +101,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       } else {
         context.actorOf(launcher.props(nextAppId, 
APPMASTER_DEFAULT_EXECUTOR_ID, app, jar, username,
           context.parent, Some(client)), 
s"launcher${nextAppId}_${Util.randInt()}")
-        appMasterRestartPolicies += nextAppId ->
-          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+        appMasterRestartPolicies += nextAppId -> new 
RestartPolicy(appTotalRetries)
 
         val appRuntimeInfo = ApplicationRuntimeInfo(nextAppId, app.name,
           user = username,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/scala/org/apache/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala 
b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
index c98726e..fa5dfa8 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
@@ -163,5 +163,7 @@ object Constants {
 
   val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num"
 
+  val APPLICATION_TOTAL_RETRIES = "gearpump.application.total-retries"
+
   val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala 
b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
index 97d6dd0..06cb423 100644
--- a/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
@@ -18,24 +18,19 @@
 
 package org.apache.gearpump.util
 
-import scala.concurrent.duration.Duration
-
-import akka.actor.ChildRestartStats
-
 /**
  * When one executor or task fails, Gearpump will try to start. However, if it 
fails after
  * multiple retries, then we abort.
  *
- * @param maxNrOfRetries The number of times is allowed to be restarted, 
negative value means no
- *                       limit, if the limit is exceeded the policy will not 
allow to restart
- * @param withinTimeRange Duration of the time window for maxNrOfRetries.
- *                        Duration.Inf means no window
+ * @param totalNrOfRetries The total number of times is allowed to be 
restarted, negative value
+ *                         means no limit, if the limit is exceeded the policy 
will not allow
+ *                         to restart
  */
-class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
-  private val status = new ChildRestartStats(null, 0, 0L)
-  private val retriesWindow = (Some(maxNrOfRetries), 
Some(withinTimeRange.toMillis.toInt))
+class RestartPolicy(totalNrOfRetries: Int) {
+  private var historicalRetries: Int = 0
 
   def allowRestart: Boolean = {
-    status.requestRestartPermission(retriesWindow)
+    historicalRetries += 1
+    totalNrOfRetries < 0 || historicalRetries <= totalNrOfRetries
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/test/resources/test.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test.conf 
b/core/src/test/resources/test.conf
index 58e5ef1..0ed29ef 100644
--- a/core/src/test/resources/test.conf
+++ b/core/src/test/resources/test.conf
@@ -14,6 +14,9 @@ gearpump {
 
   application.executor-num = 1
 
+  ## Application's total number of times that allowed to be restarted
+  application.total-retries = 1
+
   worker.executor-process-launcher = 
"org.apache.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
 
   cluster {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala 
b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
new file mode 100644
index 0000000..5d0c66d
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class RestartPolicySpec extends FlatSpec with Matchers {
+
+  "RestartPolicy" should "forbid too many restarts" in {
+    val policy = new RestartPolicy(2)
+    assert(policy.allowRestart)
+    assert(policy.allowRestart)
+    assert(!policy.allowRestart)
+  }
+
+  "RestartPolicy" should "forbid too many restarts in a window duration" in {
+    val policy = new RestartPolicy(-1)
+    assert(policy.allowRestart)
+    assert(policy.allowRestart)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index d7582b0..7ac1b74 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -36,9 +36,6 @@ object Constants {
   val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
     "gearpump.streaming.ack-once-every-message-count"
 
-  val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW =
-    "gearpump.streaming.executor-restart-time-window"
-
   // The partitioners provided by Gearpump
   val BUILTIN_PARTITIONERS = Array(
     classOf[BroadcastPartitioner],

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 085b3f0..81ed79a 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -32,10 +32,9 @@ import 
org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted,
 import org.apache.gearpump.streaming.appmaster.TaskManager._
 import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, 
TaskLocation}
 import org.apache.gearpump.streaming.executor.Executor.RestartTasks
-import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy
 import org.apache.gearpump.streaming.task._
 import org.apache.gearpump.streaming.util.ActorPathUtil
-import org.apache.gearpump.util.{Constants, LogUtil}
+import org.apache.gearpump.util.{Constants, LogUtil, RestartPolicy}
 import org.slf4j.Logger
 
 import scala.concurrent.Future
@@ -76,15 +75,8 @@ private[appmaster] class TaskManager(
 
   private val ids = new SessionIdFactory()
 
-  import 
org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW
-  // the default 20 seconds is too small for tests
-  // so that executor will be restarted infinitely
-  private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries 
= 5,
-    withinTimeRange = if 
(systemConfig.hasPath(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW)) {
-      
systemConfig.getInt(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW).seconds
-    } else {
-      20.seconds
-    })
+  private val appTotalRetries: Int = 
systemConfig.getInt(Constants.APPLICATION_TOTAL_RETRIES)
+  private val appRestartPolicy = new RestartPolicy(appTotalRetries)
 
   private implicit val timeout = Constants.FUTURE_TIMEOUT
   private implicit val actorSystem = context.system
@@ -142,7 +134,7 @@ private[appmaster] class TaskManager(
         }
       case MessageLoss(executorId, taskId, cause) =>
         if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
-          executorRestartPolicy.allowRestartExecutor(executorId)) {
+          appRestartPolicy.allowRestart) {
           context.become(recovery(recoverState))
         } else {
           val errorMsg = s"Task $taskId fails too many times to recover"
@@ -223,11 +215,11 @@ private[appmaster] class TaskManager(
     LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...")
 
     val onMessageLoss: Receive = {
-      case executorStopped@ExecutorStopped(executorId) =>
+      case ExecutorStopped(executorId) =>
         context.become(recovery(recoverState))
       case MessageLoss(executorId, taskId, cause) =>
         if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
-          executorRestartPolicy.allowRestartExecutor(executorId)) {
+          appRestartPolicy.allowRestart) {
           context.become(recovery(recoverState))
         } else {
           val errorMsg = s"Task $taskId fails too many times to recover"
@@ -261,7 +253,6 @@ private[appmaster] class TaskManager(
           LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks)
           val launchTasks = LaunchTasks(tasks, state.dag.version, 
processorDescription, subscribers)
           executorManager ! UniCast(executorId, launchTasks)
-          tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _))
         case ChangeTasksOnExecutor(executorId, tasks) =>
           LOG.info("change Task on executor: " + executorId + ", tasks: " + 
tasks)
           val changeTasks = ChangeTasks(tasks, state.dag.version, 
processorDescription.life,
@@ -321,7 +312,7 @@ private[appmaster] class TaskManager(
 
   def onExecutorError: Receive = {
     case ExecutorStopped(executorId) =>
-      if (executorRestartPolicy.allowRestartExecutor(executorId)) {
+      if (appRestartPolicy.allowRestart) {
         jarScheduler.executorFailed(executorId).foreach { 
resourceRequestDetail =>
           if (resourceRequestDetail.isDefined) {
             executorManager ! 
StartExecutors(resourceRequestDetail.get.requests,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
deleted file mode 100644
index 90615ad..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.executor
-
-import scala.collection.immutable
-import scala.concurrent.duration.Duration
-
-import org.apache.gearpump.streaming.task.TaskId
-import org.apache.gearpump.util.RestartPolicy
-
-/**
- *
- * Controls how many retries to recover failed executors.
- *
- * @param maxNrOfRetries the number of times a executor is allowed to be 
restarted,
- *                       negative value means no limit, if the limit is 
exceeded the policy
- *                       will not allow to restart the executor
- * @param withinTimeRange duration of the time window for maxNrOfRetries, 
Duration.Inf
- *                        means no window
- */
-class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
-  private var executorToTaskIds = Map.empty[Int, Set[TaskId]]
-  private var taskRestartPolicies = new immutable.HashMap[TaskId, 
RestartPolicy]
-
-  def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = {
-    var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, 
Set.empty[TaskId])
-    taskSetForExecutorId += taskId
-    executorToTaskIds += executorId -> taskSetForExecutorId
-    if (!taskRestartPolicies.contains(taskId)) {
-      taskRestartPolicies += taskId -> new RestartPolicy(maxNrOfRetries, 
withinTimeRange)
-    }
-  }
-
-  def allowRestartExecutor(executorId: Int): Boolean = {
-    executorToTaskIds(executorId).forall(taskId => 
taskRestartPolicies(taskId).allowRestart)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index 89dea81..c1791aa 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -18,13 +18,12 @@
 package org.apache.gearpump.streaming.appmaster
 
 
-import akka.actor.{ActorSystem, ActorRef, Props}
+import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.testkit.{TestActorRef, TestProbe}
 import com.typesafe.config.ConfigFactory
-import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.AppMasterToMaster._
 import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, 
ShutdownApplication}
+import org.apache.gearpump.cluster.ClientToMaster.GetLastFailure
 import org.apache.gearpump.cluster.MasterToAppMaster._
 import org.apache.gearpump.cluster.MasterToClient.LastFailure
 import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
@@ -37,21 +36,18 @@ import org.apache.gearpump.jarstore.FilePath
 import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
UnRegisterTask}
-import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, 
LookupTaskActorRef}
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
 import org.apache.gearpump.streaming.task.{TaskContext, _}
-import org.apache.gearpump.streaming.{Constants, DAG, Processor, 
StreamApplication}
+import org.apache.gearpump.streaming.{DAG, Processor, StreamApplication}
 import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
-import org.apache.gearpump.util.{ActorUtil, Graph}
+import org.apache.gearpump.util.{ActorUtil, Constants, Graph}
 import org.apache.gearpump.util.Graph._
 import org.scalatest._
 
 import scala.concurrent.duration._
 
 class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
-  protected override def config = {
-    
ConfigFactory.parseString(s"${Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW}
 = 60")
-      .withFallback(TestUtil.DEFAULT_CONFIG)
-  }
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   var appMaster: ActorRef = null
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
deleted file mode 100644
index 5f3905f..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.appmaster
-
-import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy
-import org.apache.gearpump.streaming.task.TaskId
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.concurrent.duration._
-
-class ExecutorRestartPolicySpec extends WordSpec with Matchers {
-
-  "ExecutorRestartPolicy" should {
-    "decide whether to restart the executor" in {
-      val executorId1 = 1
-      val executorId2 = 2
-      val taskId = TaskId(0, 0)
-      val executorSupervisor = new ExecutorRestartPolicy(
-        maxNrOfRetries = 3, withinTimeRange = 1.seconds)
-      executorSupervisor.addTaskToExecutor(executorId1, taskId)
-      assert(executorSupervisor.allowRestartExecutor(executorId1))
-      assert(executorSupervisor.allowRestartExecutor(executorId1))
-      executorSupervisor.addTaskToExecutor(executorId2, taskId)
-      assert(executorSupervisor.allowRestartExecutor(executorId2))
-      assert(!executorSupervisor.allowRestartExecutor(executorId2))
-      Thread.sleep(1000)
-      assert(executorSupervisor.allowRestartExecutor(executorId2))
-    }
-  }
-}

Reply via email to