Repository: flink
Updated Branches:
  refs/heads/master 0b15bc3c5 -> 0df5601ad


[FLINK-1668] [core] Add a config option to specify delays between restarts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abbb0a93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abbb0a93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abbb0a93

Branch: refs/heads/master
Commit: abbb0a93ca67da17197dc5372e6d95edd8149d44
Parents: 500ddff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 19:28:54 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java      |  8 +++++++-
 .../flink/runtime/jobmanager/JobManager.scala     | 18 +++++++++++++++---
 .../flink/runtime/jobmanager/RecoveryITCase.scala |  1 +
 .../flink/test/misc/AutoParallelismITCase.java    |  2 --
 .../ProcessFailureBatchRecoveryITCase.java        |  1 +
 .../flink/test/recovery/SimpleRecoveryITCase.java |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala        |  1 +
 .../taskmanager/TaskManagerFailsITCase.scala      |  1 +
 8 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0f42a17..028c258 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -40,6 +40,12 @@ public final class ConfigConstants {
         * value to 0 effectively disables fault tolerance.
         */
        public static final String DEFAULT_EXECUTION_RETRIES_KEY = 
"execution-retries.default";
+
+       /**
+        * Config parameter for the delay between execution retries. The value 
must be specified in the
+        * notation "10 s" or "1 min" (style of Scala Finite Durations)
+        */
+       public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = 
"execution-retries.delay";
        
        // -------------------------------- Runtime 
-------------------------------
        
@@ -339,7 +345,7 @@ public final class ConfigConstants {
        public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
 
        /**
-        * Timeout for all blocking calls
+        * Timeout for all blocking calls that look up remote actors
         */
        public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
        

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e3e96e5..7ba06e7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -850,9 +850,21 @@ object JobManager {
       ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
       ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
-    val delayBetweenRetries = 2 * Duration(configuration.getString(
-      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
+    // configure the delay between execution retries.
+    // unless explicitly specifies, this is dependent on the heartbeat timeout
+    val pauseString = 
configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+                                              
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+    val delayString = 
configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY,
+                                              pauseString)
+
+    val delayBetweenRetries: Long = try {
+        Duration(delayString).toMillis
+      }
+      catch {
+        case n: NumberFormatException => throw new Exception(
+          s"Invalid config value for 
${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}: " +
+            s"${pauseString}. Value must be a valid duration (such as 100 
milli or 1 min)");
+      }
 
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index e7d1d83..c201d08 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -49,6 +49,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskManagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 
heartbeatTimeout)
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, 
heartbeatTimeout)
     new TestingCluster(config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index ea79a3a..8ddd7bc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -56,8 +56,6 @@ public class AutoParallelismITCase {
                Configuration config = new Configuration();
                
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
NUM_TM);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
SLOTS_PER_TM);
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
-
                cluster = new ForkableFlinkMiniCluster(config, false);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index 6866fbc..cceeb47 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -125,6 +125,7 @@ public class ProcessFailureBatchRecoveryITCase {
                        
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
                        
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
                        
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+                       
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
 
                        jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<Tuple2<String, Object>>(localAddress));
                        ActorRef jmActor = 
JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index df6fbba..48afce1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -51,7 +51,7 @@ public class SimpleRecoveryITCase {
                Configuration config = new Configuration();
                
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
+               
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
 
                cluster = new ForkableFlinkMiniCluster(config, false);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index f9b1b4c..625ca07 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -137,6 +137,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "8000 
ms")
     config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 245bcd9..659262c 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -232,6 +232,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4000 
ms")
     config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)

Reply via email to