Repository: flink Updated Branches: refs/heads/master 0b15bc3c5 -> 0df5601ad
[FLINK-1668] [core] Add a config option to specify delays between restarts Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abbb0a93 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abbb0a93 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abbb0a93 Branch: refs/heads/master Commit: abbb0a93ca67da17197dc5372e6d95edd8149d44 Parents: 500ddff Author: Stephan Ewen <se...@apache.org> Authored: Mon Mar 9 19:28:54 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Mar 9 19:31:05 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 8 +++++++- .../flink/runtime/jobmanager/JobManager.scala | 18 +++++++++++++++--- .../flink/runtime/jobmanager/RecoveryITCase.scala | 1 + .../flink/test/misc/AutoParallelismITCase.java | 2 -- .../ProcessFailureBatchRecoveryITCase.java | 1 + .../flink/test/recovery/SimpleRecoveryITCase.java | 2 +- .../jobmanager/JobManagerFailsITCase.scala | 1 + .../taskmanager/TaskManagerFailsITCase.scala | 1 + 8 files changed, 27 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 0f42a17..028c258 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -40,6 +40,12 @@ public final class ConfigConstants { * value to 0 effectively disables fault tolerance. */ public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default"; + + /** + * Config parameter for the delay between execution retries. The value must be specified in the + * notation "10 s" or "1 min" (style of Scala Finite Durations) + */ + public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay"; // -------------------------------- Runtime ------------------------------- @@ -339,7 +345,7 @@ public final class ConfigConstants { public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout"; /** - * Timeout for all blocking calls + * Timeout for all blocking calls that look up remote actors */ public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout"; http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index e3e96e5..7ba06e7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -850,9 +850,21 @@ object JobManager { ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES) - val delayBetweenRetries = 2 * Duration(configuration.getString( - ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis + // configure the delay between execution retries. + // unless explicitly specifies, this is dependent on the heartbeat timeout + val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) + val delayString = configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, + pauseString) + + val delayBetweenRetries: Long = try { + Duration(delayString).toMillis + } + catch { + case n: NumberFormatException => throw new Exception( + s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}: " + + s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)"); + } val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount) http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index e7d1d83..c201d08 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -49,6 +49,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout) new TestingCluster(config) } http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index ea79a3a..8ddd7bc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -56,8 +56,6 @@ public class AutoParallelismITCase { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); - cluster = new ForkableFlinkMiniCluster(config, false); } http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java index 6866fbc..cceeb47 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java @@ -125,6 +125,7 @@ public class ProcessFailureBatchRecoveryITCase { jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s"); jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index df6fbba..48afce1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -51,7 +51,7 @@ public class SimpleRecoveryITCase { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms"); cluster = new ForkableFlinkMiniCluster(config, false); } http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index f9b1b4c..625ca07 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -137,6 +137,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "8000 ms") config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) new ForkableFlinkMiniCluster(config, singleActorSystem = false) http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index 245bcd9..659262c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -232,6 +232,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4000 ms") config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) new ForkableFlinkMiniCluster(config, singleActorSystem = false)