Repository: incubator-gearpump Updated Branches: refs/heads/master 0e56f83ce -> c9b2cea81
[GEARPUMP-277] Allow user to configure retry times on app failure Author: huafengw <[email protected]> Closes #156 from huafengw/fix277. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c9b2cea8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c9b2cea8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c9b2cea8 Branch: refs/heads/master Commit: c9b2cea813e5e8262fbbcf0e9e72c13cd463a424 Parents: 0e56f83 Author: huafengw <[email protected]> Authored: Wed Feb 22 16:28:33 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Feb 22 16:29:01 2017 +0800 ---------------------------------------------------------------------- conf/gear.conf | 3 ++ core/src/main/resources/geardefault.conf | 3 ++ .../gearpump/cluster/master/AppManager.scala | 8 ++- .../org/apache/gearpump/util/Constants.scala | 2 + .../apache/gearpump/util/RestartPolicy.scala | 19 +++---- core/src/test/resources/test.conf | 3 ++ .../gearpump/util/RestartPolicySpec.scala | 39 ++++++++++++++ .../apache/gearpump/streaming/Constants.scala | 3 -- .../streaming/appmaster/TaskManager.scala | 23 +++------ .../executor/ExecutorRestartPolicy.scala | 53 -------------------- .../streaming/appmaster/AppMasterSpec.scala | 16 +++--- .../appmaster/ExecutorRestartPolicySpec.scala | 46 ----------------- 12 files changed, 73 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/conf/gear.conf ---------------------------------------------------------------------- diff --git a/conf/gear.conf b/conf/gear.conf index 52c67b8..92d6708 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -70,6 +70,9 @@ gearpump { ## Number of executors to launch when starting an application application.executor-num = 1 + ## Application's total number of times that allowed to be restarted + application.total-retries = 5 + ########################### ### Change the dispather for tasks ### If you don't know what this is about, don't change it http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index e4aaec6..3fe030a 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -42,6 +42,9 @@ gearpump { ## Number of executors to launch when starting an application application.executor-num = 1 + ## Application's total number of times that allowed to be restarted + application.total-retries = 5 + ## Unique Id to identity this worker instance in low level resource manager like YARN. ## ## This value is typically configured by resource manager integration module, like gearpump-yarn in this case. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index 24e70dd..049d11d 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.master import akka.actor._ import akka.pattern.ask -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} import org.apache.gearpump._ import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} import org.apache.gearpump.cluster.AppMasterToWorker._ @@ -49,8 +49,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch private val LOG: Logger = LogUtil.getLogger(getClass) - private val appMasterMaxRetries: Int = 5 - private val appMasterRetryTimeRange: Duration = 20.seconds + private val appTotalRetries: Int = 5 implicit val timeout = FUTURE_TIMEOUT implicit val executionContext = context.dispatcher @@ -102,8 +101,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } else { context.actorOf(launcher.props(nextAppId, APPMASTER_DEFAULT_EXECUTOR_ID, app, jar, username, context.parent, Some(client)), s"launcher${nextAppId}_${Util.randInt()}") - appMasterRestartPolicies += nextAppId -> - new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) + appMasterRestartPolicies += nextAppId -> new RestartPolicy(appTotalRetries) val appRuntimeInfo = ApplicationRuntimeInfo(nextAppId, app.name, user = username, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/scala/org/apache/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala b/core/src/main/scala/org/apache/gearpump/util/Constants.scala index c98726e..fa5dfa8 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala @@ -163,5 +163,7 @@ object Constants { val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num" + val APPLICATION_TOTAL_RETRIES = "gearpump.application.total-retries" + val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala index 97d6dd0..06cb423 100644 --- a/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala +++ b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala @@ -18,24 +18,19 @@ package org.apache.gearpump.util -import scala.concurrent.duration.Duration - -import akka.actor.ChildRestartStats - /** * When one executor or task fails, Gearpump will try to start. However, if it fails after * multiple retries, then we abort. * - * @param maxNrOfRetries The number of times is allowed to be restarted, negative value means no - * limit, if the limit is exceeded the policy will not allow to restart - * @param withinTimeRange Duration of the time window for maxNrOfRetries. - * Duration.Inf means no window + * @param totalNrOfRetries The total number of times is allowed to be restarted, negative value + * means no limit, if the limit is exceeded the policy will not allow + * to restart */ -class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { - private val status = new ChildRestartStats(null, 0, 0L) - private val retriesWindow = (Some(maxNrOfRetries), Some(withinTimeRange.toMillis.toInt)) +class RestartPolicy(totalNrOfRetries: Int) { + private var historicalRetries: Int = 0 def allowRestart: Boolean = { - status.requestRestartPermission(retriesWindow) + historicalRetries += 1 + totalNrOfRetries < 0 || historicalRetries <= totalNrOfRetries } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/test/resources/test.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf index 58e5ef1..0ed29ef 100644 --- a/core/src/test/resources/test.conf +++ b/core/src/test/resources/test.conf @@ -14,6 +14,9 @@ gearpump { application.executor-num = 1 + ## Application's total number of times that allowed to be restarted + application.total-retries = 1 + worker.executor-process-launcher = "org.apache.gearpump.cluster.worker.DefaultExecutorProcessLauncher" cluster { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala new file mode 100644 index 0000000..5d0c66d --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import org.scalatest.{FlatSpec, Matchers} + +import scala.concurrent.duration._ + +class RestartPolicySpec extends FlatSpec with Matchers { + + "RestartPolicy" should "forbid too many restarts" in { + val policy = new RestartPolicy(2) + assert(policy.allowRestart) + assert(policy.allowRestart) + assert(!policy.allowRestart) + } + + "RestartPolicy" should "forbid too many restarts in a window duration" in { + val policy = new RestartPolicy(-1) + assert(policy.allowRestart) + assert(policy.allowRestart) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index d7582b0..7ac1b74 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -36,9 +36,6 @@ object Constants { val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = "gearpump.streaming.ack-once-every-message-count" - val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW = - "gearpump.streaming.executor-restart-time-window" - // The partitioners provided by Gearpump val BUILTIN_PARTITIONERS = Array( classOf[BroadcastPartitioner], http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala index 085b3f0..81ed79a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -32,10 +32,9 @@ import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, import org.apache.gearpump.streaming.appmaster.TaskManager._ import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, TaskLocation} import org.apache.gearpump.streaming.executor.Executor.RestartTasks -import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy import org.apache.gearpump.streaming.task._ import org.apache.gearpump.streaming.util.ActorPathUtil -import org.apache.gearpump.util.{Constants, LogUtil} +import org.apache.gearpump.util.{Constants, LogUtil, RestartPolicy} import org.slf4j.Logger import scala.concurrent.Future @@ -76,15 +75,8 @@ private[appmaster] class TaskManager( private val ids = new SessionIdFactory() - import org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW - // the default 20 seconds is too small for tests - // so that executor will be restarted infinitely - private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, - withinTimeRange = if (systemConfig.hasPath(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW)) { - systemConfig.getInt(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW).seconds - } else { - 20.seconds - }) + private val appTotalRetries: Int = systemConfig.getInt(Constants.APPLICATION_TOTAL_RETRIES) + private val appRestartPolicy = new RestartPolicy(appTotalRetries) private implicit val timeout = Constants.FUTURE_TIMEOUT private implicit val actorSystem = context.system @@ -142,7 +134,7 @@ private[appmaster] class TaskManager( } case MessageLoss(executorId, taskId, cause) => if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && - executorRestartPolicy.allowRestartExecutor(executorId)) { + appRestartPolicy.allowRestart) { context.become(recovery(recoverState)) } else { val errorMsg = s"Task $taskId fails too many times to recover" @@ -223,11 +215,11 @@ private[appmaster] class TaskManager( LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...") val onMessageLoss: Receive = { - case executorStopped@ExecutorStopped(executorId) => + case ExecutorStopped(executorId) => context.become(recovery(recoverState)) case MessageLoss(executorId, taskId, cause) => if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && - executorRestartPolicy.allowRestartExecutor(executorId)) { + appRestartPolicy.allowRestart) { context.become(recovery(recoverState)) } else { val errorMsg = s"Task $taskId fails too many times to recover" @@ -261,7 +253,6 @@ private[appmaster] class TaskManager( LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks) val launchTasks = LaunchTasks(tasks, state.dag.version, processorDescription, subscribers) executorManager ! UniCast(executorId, launchTasks) - tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _)) case ChangeTasksOnExecutor(executorId, tasks) => LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks) val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, @@ -321,7 +312,7 @@ private[appmaster] class TaskManager( def onExecutorError: Receive = { case ExecutorStopped(executorId) => - if (executorRestartPolicy.allowRestartExecutor(executorId)) { + if (appRestartPolicy.allowRestart) { jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail => if (resourceRequestDetail.isDefined) { executorManager ! StartExecutors(resourceRequestDetail.get.requests, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala deleted file mode 100644 index 90615ad..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.executor - -import scala.collection.immutable -import scala.concurrent.duration.Duration - -import org.apache.gearpump.streaming.task.TaskId -import org.apache.gearpump.util.RestartPolicy - -/** - * - * Controls how many retries to recover failed executors. - * - * @param maxNrOfRetries the number of times a executor is allowed to be restarted, - * negative value means no limit, if the limit is exceeded the policy - * will not allow to restart the executor - * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf - * means no window - */ -class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { - private var executorToTaskIds = Map.empty[Int, Set[TaskId]] - private var taskRestartPolicies = new immutable.HashMap[TaskId, RestartPolicy] - - def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = { - var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, Set.empty[TaskId]) - taskSetForExecutorId += taskId - executorToTaskIds += executorId -> taskSetForExecutorId - if (!taskRestartPolicies.contains(taskId)) { - taskRestartPolicies += taskId -> new RestartPolicy(maxNrOfRetries, withinTimeRange) - } - } - - def allowRestartExecutor(executorId: Int): Boolean = { - executorToTaskIds(executorId).forall(taskId => taskRestartPolicies(taskId).allowRestart) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index 89dea81..c1791aa 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -18,13 +18,12 @@ package org.apache.gearpump.streaming.appmaster -import akka.actor.{ActorSystem, ActorRef, Props} +import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.{TestActorRef, TestProbe} import com.typesafe.config.ConfigFactory -import org.apache.gearpump.Message import org.apache.gearpump.cluster.AppMasterToMaster._ import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, ShutdownApplication} +import org.apache.gearpump.cluster.ClientToMaster.GetLastFailure import org.apache.gearpump.cluster.MasterToAppMaster._ import org.apache.gearpump.cluster.MasterToClient.LastFailure import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected @@ -37,21 +36,18 @@ import org.apache.gearpump.jarstore.FilePath import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask} -import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef} +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} import org.apache.gearpump.streaming.task.{TaskContext, _} -import org.apache.gearpump.streaming.{Constants, DAG, Processor, StreamApplication} +import org.apache.gearpump.streaming.{DAG, Processor, StreamApplication} import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem -import org.apache.gearpump.util.{ActorUtil, Graph} +import org.apache.gearpump.util.{ActorUtil, Constants, Graph} import org.apache.gearpump.util.Graph._ import org.scalatest._ import scala.concurrent.duration._ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - protected override def config = { - ConfigFactory.parseString(s"${Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW} = 60") - .withFallback(TestUtil.DEFAULT_CONFIG) - } + protected override def config = TestUtil.DEFAULT_CONFIG var appMaster: ActorRef = null http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c9b2cea8/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala deleted file mode 100644 index 5f3905f..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.appmaster - -import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy -import org.apache.gearpump.streaming.task.TaskId -import org.scalatest.{Matchers, WordSpec} - -import scala.concurrent.duration._ - -class ExecutorRestartPolicySpec extends WordSpec with Matchers { - - "ExecutorRestartPolicy" should { - "decide whether to restart the executor" in { - val executorId1 = 1 - val executorId2 = 2 - val taskId = TaskId(0, 0) - val executorSupervisor = new ExecutorRestartPolicy( - maxNrOfRetries = 3, withinTimeRange = 1.seconds) - executorSupervisor.addTaskToExecutor(executorId1, taskId) - assert(executorSupervisor.allowRestartExecutor(executorId1)) - assert(executorSupervisor.allowRestartExecutor(executorId1)) - executorSupervisor.addTaskToExecutor(executorId2, taskId) - assert(executorSupervisor.allowRestartExecutor(executorId2)) - assert(!executorSupervisor.allowRestartExecutor(executorId2)) - Thread.sleep(1000) - assert(executorSupervisor.allowRestartExecutor(executorId2)) - } - } -}
