[FLINK-3187] [restart] Introduce RestartStrategy to ExecutionGraph

A RestartStrategy defines how the ExecutionGraph reacts in case of a restart. 
Different strategies
are conceivable. For example, no restart, fixed delay restart, exponential 
backoff restart, scaling
in/out restart, etc.

Expose RestartStrategy to user API

This removes the setNumberExecutionRetries and the setDelayBetweenRetries on 
the ExecutionEnvironment and
the ExecutionConfig. Instead the more general RestartStrategy can be set. In 
order to maintain the
separation between the runtime and api module, one sets a 
RestartStrategyConfiguration which is transformed
into a RestartStrategy on the JobManager.

Replace old execution-retries configuration parameters by restart-strategy.

Add FixedDelayRestartStrategy test case

Reintroduce old configuration values and API calls for the deprecated restart 
mechanism

The old configuration values and API calls will be respected if no explicit
RestartStrategy has been set. The values, if correct, are used to instantiate
a FixedDelayRestartStrategy.

Add deprecation comments to the JavaDocs

Add logging statement for job recovery

Fix JobManagerProcessFailureBatchRecoveryITCase by introducing a job recovery 
timeout

Add proper annotations to RestartStrategies

Let ExecutionGraphRestartTest extend TestLogger

This closes #1470.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5eae47f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5eae47f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5eae47f5

Branch: refs/heads/master
Commit: 5eae47f5d381cd55729660f62c714b52d28413ed
Parents: b17632d
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Dec 17 13:49:10 2015 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 15 15:58:05 2016 +0100

----------------------------------------------------------------------
 docs/apis/streaming/fault_tolerance.md          | 168 +++++++++++++++++++
 docs/internals/monitoring_rest_api.md           |   2 +-
 docs/setup/config.md                            |  16 +-
 .../flink/api/common/ExecutionConfig.java       |  85 +++++++++-
 .../java/org/apache/flink/api/common/Plan.java  |  24 +--
 .../restartstrategy/RestartStrategies.java      | 120 +++++++++++++
 .../flink/configuration/ConfigConstants.java    |  42 ++++-
 .../streaming/examples/kafka/ReadFromKafka.java |   3 +-
 .../examples/kafka/WriteIntoKafka.java          |   3 +-
 .../flink/api/java/ExecutionEnvironment.java    |  32 ++++
 .../flink/python/api/PythonPlanBinder.java      |   4 +-
 .../plantranslate/JobGraphGenerator.java        |   4 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   4 +-
 .../app/partials/jobs/job.config.jade           |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  82 +++------
 .../restart/FixedDelayRestartStrategy.java      | 116 +++++++++++++
 .../restart/NoRestartStrategy.java              |  48 ++++++
 .../executiongraph/restart/RestartStrategy.java |  41 +++++
 .../restart/RestartStrategyFactory.java         | 127 ++++++++++++++
 .../apache/flink/runtime/jobgraph/JobGraph.java |  57 ++-----
 .../flink/runtime/jobmanager/JobManager.scala   | 112 ++++++-------
 ...ExecutionGraphCheckpointCoordinatorTest.java |  18 +-
 .../ExecutionGraphConstructionTest.java         |  25 ++-
 .../ExecutionGraphDeploymentTest.java           |   9 +-
 .../ExecutionGraphRestartTest.java              |  56 ++++---
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +-
 .../ExecutionStateProgressTest.java             |   6 +-
 .../executiongraph/LocalInputSplitsTest.java    |   7 +-
 .../executiongraph/PointwisePatternTest.java    |  22 ++-
 .../TerminalStateDeadlockTest.java              |  13 +-
 .../VertexLocationConstraintTest.java           |  19 ++-
 .../executiongraph/VertexSlotSharingTest.java   |   4 +-
 .../restart/FixedDelayRestartStrategyTest.java  |  51 ++++++
 .../partition/PipelinedSubpartitionTest.java    |   4 -
 .../JobManagerLeaderElectionTest.java           |   7 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |   2 +-
 .../TaskManagerLossFailsTasksTest.scala         |   6 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   9 +-
 .../runtime/testingUtils/TestingCluster.scala   |  12 +-
 .../testingUtils/TestingJobManager.scala        |  13 +-
 .../runtime/testingUtils/TestingUtils.scala     |  12 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |  46 ++++-
 .../connectors/kafka/Kafka08ITCase.java         |  19 +--
 .../connectors/kafka/KafkaConsumerTestBase.java |  27 +--
 .../connectors/kafka/KafkaProducerTestBase.java |   3 +-
 .../connectors/kafka/KafkaTestBase.java         |   2 +-
 .../kafka/testutils/DataGenerators.java         |   5 +-
 .../environment/StreamExecutionEnvironment.java |  32 ++++
 .../api/graph/StreamingJobGraphGenerator.java   |  30 ++--
 .../partitioner/RescalePartitionerTest.java     |   7 +-
 .../runtime/state/StateBackendITCase.java       |   3 +-
 .../streaming/timestamp/TimestampITCase.java    |   1 -
 .../api/scala/StreamExecutionEnvironment.scala  |  49 ++++--
 .../EventTimeAllWindowCheckpointingITCase.java  |  10 +-
 .../EventTimeWindowCheckpointingITCase.java     |  14 +-
 .../test/checkpointing/SavepointITCase.java     |  18 +-
 .../StreamCheckpointNotifierITCase.java         |   2 +-
 .../StreamFaultToleranceTestBase.java           |   1 -
 .../WindowCheckpointingITCase.java              |  10 +-
 .../test/classloading/ClassLoaderITCase.java    |   1 -
 .../jar/CheckpointedStreamingProgram.java       |   3 +-
 .../test/failingPrograms/TaskFailureITCase.java |   4 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../flink/test/recovery/FastFailuresITCase.java |   6 +-
 ...anagerProcessFailureBatchRecoveryITCase.java |   3 +-
 .../recovery/ProcessFailureCancelingITCase.java |   3 +-
 .../test/recovery/SimpleRecoveryITCase.java     |  13 +-
 .../TaskManagerFailureRecoveryITCase.java       |   3 +-
 ...anagerProcessFailureBatchRecoveryITCase.java |   3 +-
 ...erProcessFailureStreamingRecoveryITCase.java |   3 +-
 .../ZooKeeperLeaderElectionITCase.java          |   2 +-
 .../flink/yarn/TestingYarnJobManager.scala      |  16 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  16 +-
 73 files changed, 1314 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md 
b/docs/apis/streaming/fault_tolerance.md
index 4b91c61..f1a6803 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -194,3 +194,171 @@ state updates) of Flink coupled with bundled sinks:
 </table>
 
 {% top %}
+
+Restart Strategies
+------------------
+
+Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure.
+The cluster can be started with a default restart strategy which is always 
used when no job specific restart strategy has been defined.
+In case that the job is submitted with a restart strategy, this strategy 
overrides the cluster's default setting.
+ 
+The default restart strategy is set via Flink's configuration file 
`flink-conf.yaml`.
+The configuration parameter *restart-strategy* defines which strategy is taken.
+Per default, the no-restart strategy is used.
+See the following list of available restart strategies to learn what values 
are supported.
+
+Each restart strategy comes with its own set of parameters which control its 
behaviour.
+These values are also set in the configuration file.
+The description of each restart strategy contains more information about the 
respective configuration values.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">Restart Strategy</th>
+      <th class="text-left">Value for restart-strategy</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>Fixed delay</td>
+        <td>fixed-delay</td>
+    </tr>
+    <tr>
+        <td>No restart</td>
+        <td>none</td>
+    </tr>
+  </tbody>
+</table>
+
+Apart from defining a default restart strategy, it is possible to define for 
each Flink job a specific restart strategy.
+This restart strategy is set programmatically by calling the 
`setRestartStrategy` method on the `ExecutionEnvironment`.
+Note that this also works for the `StreamExecutionEnvironment`.
+
+The following example shows how we can set a fixed delay restart strategy for 
our job.
+In case of a failure the system tries to restart the job 3 times and waits 10 
seconds in-between successive restart attempts.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.fixedDelay(
+  3, // number of restart attempts 
+  10000 // delay in milliseconds
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.fixedDelay(
+  3, // number of restart attempts 
+  10000 // delay in milliseconds
+))
+{% endhighlight %}
+</div>
+</div>
+
+## Fixed Delay Restart Strategy
+
+The fixed delay restart strategy attempts a given number of times to restart 
the job.
+If the maximum number of attempts is exceeded, the job eventually fails.
+In-between two consecutive restart attempts, the restart strategy waits a 
fixed amount of time.
+
+This strategy is enabled as default by setting the following configuration 
parameter in `flink-conf.yaml`.
+
+~~~
+restart-strategy: fixed-delay
+~~~
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Configuration Parameter</th>
+      <th class="text-left" style="width: 40%">Description</th>
+      <th class="text-left">Default Value</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><it>restart-strategy.fixed-delay.attempts</it></td>
+        <td>Number of restart attempts</td>
+        <td>1</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.fixed-delay.delay</it></td>
+        <td>Delay between two consecutive restart attempts</td>
+        <td><it>akka.ask.timeout</it></td>
+    </tr>
+  </tbody>
+</table>
+
+~~~
+restart-strategy.fixed-delay.attempts: 3
+restart-strategy.fixed-delay.delay: 10 s
+~~~
+
+The fixed delay restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.fixedDelay(
+  3, // number of restart attempts 
+  10000 // delay in milliseconds
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.fixedDelay(
+  3, // number of restart attempts 
+  10000 // delay in milliseconds
+))
+{% endhighlight %}
+</div>
+</div>
+
+### Restart Attempts
+
+The number of times that Flink retries the execution before the job is 
declared as failed is configurable via the 
*restart-strategy.fixed-delay.attempts* parameter.
+
+The default value is **1**.
+
+### Retry Delays
+
+Execution retries can be configured to be delayed. Delaying the retry means 
that after a failed execution, the re-execution does not start immediately, but 
only after a certain delay.
+
+Delaying the retries can be helpful when the program interacts with external 
systems where for example connections or pending transactions should reach a 
timeout before re-execution is attempted.
+
+The default value is the value of *akka.ask.timeout*.
+
+## No Restart Strategy
+
+The job fails directly and no restart is attempted.
+
+~~~
+restart-strategy: none
+~~~
+
+The no restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.noRestart());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.noRestart())
+{% endhighlight %}
+</div>
+</div>
+
+[Back to top](#top)
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/docs/internals/monitoring_rest_api.md
----------------------------------------------------------------------
diff --git a/docs/internals/monitoring_rest_api.md 
b/docs/internals/monitoring_rest_api.md
index f53d46f..97a5c51 100644
--- a/docs/internals/monitoring_rest_api.md
+++ b/docs/internals/monitoring_rest_api.md
@@ -246,7 +246,7 @@ Sample Result:
   "name": "WordCount Example",
   "execution-config": {
     "execution-mode": "PIPELINED",
-    "max-execution-retries": -1,
+    "restart-strategy": "Restart deactivated",
     "job-parallelism": -1,
     "object-reuse-mode": false
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 343a856..59cccc3 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -118,9 +118,17 @@ If you are on YARN, then it is sufficient to authenticate 
the client with Kerber
 
 - `blob.server.port`: Port definition for the blob server (serving user jar's) 
on the Taskmanagers. By default the port is set to 0, which means that the 
operating system is picking an ephemeral port. Flink also accepts a list of 
ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running on the same machine.
 
-- `execution-retries.delay`: Delay between execution retries. Default value "5 
s". Note that values have to be specified as strings with a unit.
-
-- `execution-retries.default`: Default number of execution retries, used by 
jobs that do not explicitly specify that value on the execution environment. 
Default value is zero.
+- `restart-strategy`: Default restart strategy to use in case that no restart 
strategy has been specified for the submitted job.
+Currently, it can be chosen between using a fixed delay restart strategy and 
to turn it off.
+To use the fixed delay strategy you have to specify "fixed-delay".
+To turn the restart behaviour off you have to specify "none".
+Default value "none".
+
+- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if 
the default restart strategy is set to "fixed-delay". 
+Default value is 1.
+ 
+- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used 
if the default restart strategy is set to "fixed-delay". 
+Default value is the `akka.ask.timeout`. 
 
 ## Full Reference
 
@@ -247,6 +255,8 @@ For example when running Flink on YARN on an environment 
with a restrictive fire
 
 - `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the 
number of connection retries before the client gives up.
 
+- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before 
persisted jobs are recovered in case of a recovery situation. 
+
 ## Background
 
 ### Configuring the Network Buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 8d5211b..b31106f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common;
 import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -76,7 +77,11 @@ public class ExecutionConfig implements Serializable {
 
        private int parallelism = -1;
 
-       private int numberOfExecutionRetries = -1;
+       /**
+        * @deprecated Should no longer be used because it is subsumed by 
RestartStrategyConfiguration
+        */
+       @Deprecated
+       private int numberOfExecutionRetries = 0;
 
        private boolean forceKryo = false;
 
@@ -96,9 +101,15 @@ public class ExecutionConfig implements Serializable {
        private long autoWatermarkInterval = 0;
 
        private boolean timestampsEnabled = false;
-       
-       private long executionRetryDelay = -1;
 
+       /**
+        * @deprecated Should no longer be used because it is subsumed by 
RestartStrategyConfiguration
+        */
+       @Deprecated
+       private long executionRetryDelay = 0;
+
+       private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
+       
        // Serializers and types registered with Kryo and the PojoSerializer
        // we store them in linked maps/sets to ensure they are registered in 
order in all kryo instances.
 
@@ -245,19 +256,64 @@ public class ExecutionConfig implements Serializable {
        }
 
        /**
+        * Sets the restart strategy to be used for recovery.
+        *
+        * <pre>{@code
+        * ExecutionConfig config = env.getConfig();
+        *
+        * config.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+        *      10,  // number of retries
+        *      1000 // delay between retries));
+        * }</pre>
+        *
+        * @param restartStrategyConfiguration Configuration defining the 
restart strategy to use
+        */
+       @PublicEvolving
+       public void 
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+               this.restartStrategyConfiguration = 
restartStrategyConfiguration;
+       }
+
+       /**
+        * Returns the restart strategy which has been set for the current job.
+        *
+        * @return The specified restart configuration
+        */
+       @PublicEvolving
+       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
+               if (restartStrategyConfiguration == null) {
+                       // support the old API calls by creating a restart 
strategy from them
+                       if (getNumberOfExecutionRetries() > 0 && 
getExecutionRetryDelay() >= 0) {
+                               return 
RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), 
getExecutionRetryDelay());
+                       } else {
+                               return null;
+                       }
+               } else {
+                       return restartStrategyConfiguration;
+               }
+       }
+
+       /**
         * Gets the number of times the system will try to re-execute failed 
tasks. A value
         * of {@code -1} indicates that the system default value (as defined in 
the configuration)
         * should be used.
         *
         * @return The number of times the system will try to re-execute failed 
tasks.
+        *
+        * @deprecated Should no longer be used because it is subsumed by 
RestartStrategyConfiguration
         */
+       @Deprecated
        public int getNumberOfExecutionRetries() {
                return numberOfExecutionRetries;
        }
 
        /**
         * Returns the delay between execution retries.
+        *
+        * @return The delay between successive execution retries in 
milliseconds.
+        *
+        * @deprecated Should no longer be used because it is subsumed by 
RestartStrategyConfiguration
         */
+       @Deprecated
        public long getExecutionRetryDelay() {
                return executionRetryDelay;
        }
@@ -268,11 +324,18 @@ public class ExecutionConfig implements Serializable {
         * default value (as defined in the configuration) should be used.
         *
         * @param numberOfExecutionRetries The number of times the system will 
try to re-execute failed tasks.
+        *
+        * @return The current execution configuration
+        *
+        * @deprecated This method will be replaced by {@link 
#setRestartStrategy}. The
+        * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} 
contains the number of
+        * execution retries.
         */
+       @Deprecated
        public ExecutionConfig setNumberOfExecutionRetries(int 
numberOfExecutionRetries) {
                if (numberOfExecutionRetries < -1) {
                        throw new IllegalArgumentException(
-                                       "The number of execution retries must 
be non-negative, or -1 (use system default)");
+                               "The number of execution retries must be 
non-negative, or -1 (use system default)");
                }
                this.numberOfExecutionRetries = numberOfExecutionRetries;
                return this;
@@ -282,15 +345,23 @@ public class ExecutionConfig implements Serializable {
         * Sets the delay between executions. A value of {@code -1} indicates 
that the default value
         * should be used.
         * @param executionRetryDelay The number of milliseconds the system 
will wait to retry.
+        *
+        * @return The current execution configuration
+        *
+        * @deprecated This method will be replaced by {@link 
#setRestartStrategy}. The
+        * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} 
contains the delay between
+        * successive execution attempts.
         */
+       @Deprecated
        public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) 
{
                if (executionRetryDelay < -1 ) {
                        throw new IllegalArgumentException(
-                                       "The delay between reties must be 
non-negative, or -1 (use system default)");
+                               "The delay between reties must be non-negative, 
or -1 (use system default)");
                }
                this.executionRetryDelay = executionRetryDelay;
                return this;
        }
+
        /**
         * Sets the execution mode to execute the program. The execution mode 
defines whether
         * data exchanges are performed in a batch or on a pipelined manner.
@@ -614,7 +685,7 @@ public class ExecutionConfig implements Serializable {
                                Objects.equals(executionMode, 
other.executionMode) &&
                                useClosureCleaner == other.useClosureCleaner &&
                                parallelism == other.parallelism &&
-                               numberOfExecutionRetries == 
other.numberOfExecutionRetries &&
+                               
restartStrategyConfiguration.equals(other.restartStrategyConfiguration) &&
                                forceKryo == other.forceKryo &&
                                objectReuse == other.objectReuse &&
                                autoTypeRegistrationEnabled == 
other.autoTypeRegistrationEnabled &&
@@ -640,7 +711,7 @@ public class ExecutionConfig implements Serializable {
                        executionMode,
                        useClosureCleaner,
                        parallelism,
-                       numberOfExecutionRetries,
+                       restartStrategyConfiguration,
                        forceKryo,
                        objectReuse,
                        autoTypeRegistrationEnabled,

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 899b9d6..d81fcd1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -38,6 +38,7 @@ import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Visitable;
@@ -292,24 +293,15 @@ public class Plan implements Visitable<Operator<?>> {
                
                this.defaultParallelism = defaultParallelism;
        }
-       
-       /**
-        * Gets the number of times the system will try to re-execute failed 
tasks. A value
-        * of {@code -1} indicates that the system default value (as defined in 
the configuration)
-        * should be used.
-        * 
-        * @return The number of times the system will try to re-execute failed 
tasks.
-        */
-       public int getNumberOfExecutionRetries() {
-               return getExecutionConfig().getNumberOfExecutionRetries();
-       }
-       
+
        /**
-        * Gets the delay between retry failed task.
-        * @return The delay the system will wait to retry.
+        * Returns the specified restart strategy configuration. This 
configuration defines the used
+        * restart strategy to be used at runtime.
+        *
+        * @return The specified restart strategy configuration
         */
-       public long getExecutionRetryDelay() {
-               return getExecutionConfig().getExecutionRetryDelay();
+       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategyConfiguration() {
+               return getExecutionConfig().getRestartStrategy();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
new file mode 100644
index 0000000..12f9d08
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.restartstrategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * This class defines methods to generate RestartStrategyConfigurations. These 
configurations are
+ * used to create RestartStrategies at runtime.
+ *
+ * The RestartStrategyConfigurations are used to decouple the core module from 
the runtime module.
+ */
+@PublicEvolving
+public class RestartStrategies {
+
+       /**
+        * Generates NoRestartStrategyConfiguration
+        *
+        * @return NoRestartStrategyConfiguration
+        */
+       public static RestartStrategyConfiguration noRestart() {
+               return new NoRestartStrategyConfiguration();
+       }
+
+       /**
+        * Generates a FixedDelayRestartStrategyConfiguration.
+        *
+        * @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
+        * @param delayBetweenAttempts Delay in-between restart attempts for 
the FixedDelayRestartStrategy
+        * @return FixedDelayRestartStrategy
+        */
+       public static RestartStrategyConfiguration fixedDelayRestart(
+               int restartAttempts,
+               long delayBetweenAttempts) {
+
+               return new 
FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts);
+       }
+
+       public abstract static class RestartStrategyConfiguration implements 
Serializable {
+               private static final long serialVersionUID = 
6285853591578313960L;
+
+               private RestartStrategyConfiguration() {}
+
+               /**
+                * Returns a description which is shown in the web interface
+                *
+                * @return Description of the restart strategy
+                */
+               public abstract String getDescription();
+       }
+
+       final public static class NoRestartStrategyConfiguration extends 
RestartStrategyConfiguration {
+               private static final long serialVersionUID = 
-5894362702943349962L;
+
+               @Override
+               public String getDescription() {
+                       return "Restart deactivated.";
+               }
+       }
+
+       final public static class FixedDelayRestartStrategyConfiguration 
extends RestartStrategyConfiguration {
+               private static final long serialVersionUID = 
4149870149673363190L;
+
+               private final int restartAttempts;
+               private final long delayBetweenAttempts;
+
+               FixedDelayRestartStrategyConfiguration(int restartAttempts, 
long delayBetweenAttempts) {
+                       this.restartAttempts = restartAttempts;
+                       this.delayBetweenAttempts = delayBetweenAttempts;
+               }
+
+               public int getRestartAttempts() {
+                       return restartAttempts;
+               }
+
+               public long getDelayBetweenAttempts() {
+                       return delayBetweenAttempts;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 31 * restartAttempts + 
(int)(delayBetweenAttempts ^ (delayBetweenAttempts >>> 32));
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof 
FixedDelayRestartStrategyConfiguration) {
+                               FixedDelayRestartStrategyConfiguration other = 
(FixedDelayRestartStrategyConfiguration) obj;
+
+                               return restartAttempts == other.restartAttempts 
&& delayBetweenAttempts == other.delayBetweenAttempts;
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public String getDescription() {
+                       return "Restart with fixed delay (" + 
delayBetweenAttempts + " ms). #"
+                               + restartAttempts + " restart attempts.";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2b75644..b2bbda1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * This class contains all constants for the configuration. That includes the 
configuration keys and
@@ -38,16 +39,50 @@ public final class ConfigConstants {
         */
        public static final String DEFAULT_PARALLELISM_KEY = 
"parallelism.default";
 
+       // ---------------------------- Restart strategies 
------------------------
+
+       /**
+        * Defines the restart strategy to be used. It can be "off", "none", 
"disable" to be disabled or
+        * it can be "fixeddelay", "fixed-delay" to use the 
FixedDelayRestartStrategy. You can also
+        * specify a class name which implements the RestartStrategy interface 
and has a static
+        * create method which takes a Configuration object.
+        */
+       @PublicEvolving
+       public static final String RESTART_STRATEGY = "restart-strategy";
+
+       /**
+        * Maximum number of attempts the fixed delay restart strategy will try 
before failing a job.
+        */
+       @PublicEvolving
+       public static final String RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS = 
"restart-strategy.fixed-delay.attempts";
+
+       /**
+        * Delay between two consecutive restart attempts. It can be specified 
using Scala's
+        * FiniteDuration notation: "1 min", "20 s"
+        */
+       @PublicEvolving
+       public static final String RESTART_STRATEGY_FIXED_DELAY_DELAY = 
"restart-strategy.fixed-delay.delay";
+
        /**
         * Config parameter for the number of re-tries for failed tasks. 
Setting this
         * value to 0 effectively disables fault tolerance.
+        *
+        * @deprecated The configuration value will be replaced by {@link 
#RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS}
+        * and the corresponding FixedDelayRestartStrategy.
         */
+       @Deprecated
+       @PublicEvolving
        public static final String EXECUTION_RETRIES_KEY = 
"execution-retries.default";
 
        /**
         * Config parameter for the delay between execution retries. The value 
must be specified in the
         * notation "10 s" or "1 min" (style of Scala Finite Durations)
+        *
+        * @deprecated The configuration value will be replaced by {@link 
#RESTART_STRATEGY_FIXED_DELAY_DELAY}
+        * and the corresponding FixedDelayRestartStrategy.
         */
+       @Deprecated
+       @PublicEvolving
        public static final String EXECUTION_RETRY_DELAY_KEY = 
"execution-retries.delay";
        
        // -------------------------------- Runtime 
-------------------------------
@@ -268,8 +303,6 @@ public final class ConfigConstants {
         */
        public static final String YARN_TASK_MANAGER_ENV_PREFIX = 
"yarn.taskmanager.env.";
 
-
-
         /**
         * The config parameter defining the Akka actor system port for the 
ApplicationMaster and
         * JobManager
@@ -471,6 +504,9 @@ public final class ConfigConstants {
        /** Ports used by the job manager if not in standalone recovery mode */
        public static final String RECOVERY_JOB_MANAGER_PORT = 
"recovery.jobmanager.port";
 
+       /** The time before the JobManager recovers persisted jobs */
+       public static final String RECOVERY_JOB_DELAY = "recovery.job.delay";
+
        // --------------------------- ZooKeeper 
----------------------------------
 
        /** ZooKeeper servers. */
@@ -521,7 +557,7 @@ public final class ConfigConstants {
         * The default number of execution retries.
         */
        public static final int DEFAULT_EXECUTION_RETRIES = 0;
-       
+
        // ------------------------------ Runtime 
---------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
index 2179eca..0b6e7f7 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.kafka;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,7 +46,7 @@ public class ReadFromKafka {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(3); // retry if job fails
+               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
                env.enableCheckpointing(5000); // create a checkpoint every 5 
secodns
                env.getConfig().setGlobalJobParameters(parameterTool); // make 
parameters available in the web interface
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
index 0a33265..697b8e0 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.kafka;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +44,7 @@ public class WriteIntoKafka {
 
                StreamExecutionEnvironment env 
=StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(3);
+               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
 
                // very simple data generator
                DataStream<String> messageStream = env.addSource(new 
SourceFunction<String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 512fe42..f0006a4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.OperatorInformation;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
@@ -182,12 +183,38 @@ public abstract class ExecutionEnvironment {
        }
 
        /**
+        * Sets the restart strategy configuration. The configuration specifies 
which restart strategy
+        * will be used for the execution graph in case of a restart.
+        *
+        * @param restartStrategyConfiguration Restart strategy configuration 
to be set
+        */
+       @PublicEvolving
+       public void 
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+               config.setRestartStrategy(restartStrategyConfiguration);
+       }
+
+       /**
+        * Returns the specified restart strategy configuration.
+        *
+        * @return The restart strategy configuration to be used
+        */
+       @PublicEvolving
+       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
+               return config.getRestartStrategy();
+       }
+
+       /**
         * Sets the number of times that failed tasks are re-executed. A value 
of zero
         * effectively disables fault tolerance. A value of {@code -1} 
indicates that the system
         * default value (as defined in the configuration) should be used.
         *
         * @param numberOfExecutionRetries The number of times the system will 
try to re-execute failed tasks.
+        *
+        * @deprecated This method will be replaced by {@link 
#setRestartStrategy}. The
+        * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} 
contains the number of
+        * execution retries.
         */
+       @Deprecated
        @PublicEvolving
        public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
                config.setNumberOfExecutionRetries(numberOfExecutionRetries);
@@ -199,7 +226,12 @@ public abstract class ExecutionEnvironment {
         * should be used.
         *
         * @return The number of times the system will try to re-execute failed 
tasks.
+        *
+        * @deprecated This method will be replaced by {@link 
#getRestartStrategy}. The
+        * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} 
contains the number of
+        * execution retries.
         */
+       @Deprecated
        @PublicEvolving
        public int getNumberOfExecutionRetries() {
                return config.getNumberOfExecutionRetries();

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 0479c0b..91e2369 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -19,7 +19,9 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Random;
+
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
@@ -262,7 +264,7 @@ public class PythonPlanBinder {
                                        break;
                                case RETRY:
                                        int retry = (Integer) value.getField(1);
-                                       env.setNumberOfExecutionRetries(retry);
+                                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L));
                                        break;
                                case DEBUG:
                                        DEBUG = (Boolean) value.getField(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 159a94a..f59c347 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -218,8 +218,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                
                // create the job graph object
                JobGraph graph = new JobGraph(jobId, program.getJobName());
-               
graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
-               
graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay());
+
+               
graph.setRestartStrategyConfiguration(program.getOriginalPlan().getRestartStrategyConfiguration());
                graph.setAllowQueuedScheduling(false);
                
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index f39bb1a..0f2f514 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -50,7 +50,9 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                        gen.writeObjectFieldStart("execution-config");
                        
                        gen.writeStringField("execution-mode", 
ec.getExecutionMode().name());
-                       gen.writeNumberField("max-execution-retries", 
ec.getNumberOfExecutionRetries());
+
+                       final String restartStrategyDescription = 
ec.getRestartStrategy() != null ? ec.getRestartStrategy().getDescription() : 
"default";
+                       gen.writeStringField("restart-strategy", 
restartStrategyDescription);
                        gen.writeNumberField("job-parallelism", 
ec.getParallelism());
                        gen.writeBooleanField("object-reuse-mode", 
ec.isObjectReuseEnabled());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade
index 83b7880..96d0369 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade
@@ -28,7 +28,7 @@ table.table.table-properties(ng-if="job['execution-config']")
 
     tr
       td Max. number of execution retries
-      td {{ job['execution-config']['max-execution-retries'] === -1 ? 
'deactivated' : job['execution-config']['max-execution-retries'] }}
+      td {{ job['execution-config']['restart-strategy'] }}
 
     tr
       td Job parallelism

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c3bfbb0..20288fb 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.UnrecoverableException;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -81,13 +82,10 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.NoSuchElementException;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import static akka.dispatch.Futures.future;
-
 /**
  * The execution graph is the central data structure that coordinates the 
distributed
  * execution of a data flow. It keeps representations of each parallel task, 
each
@@ -183,12 +181,6 @@ public class ExecutionGraph implements Serializable {
 
        // ------ Configuration of the Execution -------
 
-       /** The number of times failed executions should be retried. */
-       private int numberOfRetriesLeft;
-
-       /** The delay that the system should wait before restarting failed 
executions. */
-       private long delayBeforeRetrying;
-
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
         * to deploy them immediately. */
        private boolean allowQueuedScheduling = false;
@@ -219,6 +211,10 @@ public class ExecutionGraph implements Serializable {
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private Scheduler scheduler;
 
+       /** Strategy to use for restarts */
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
+       private RestartStrategy restartStrategy;
+
        /** The classloader for the user code. Needed for calls into user code 
classes */
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private ClassLoader userClassLoader;
@@ -255,13 +251,15 @@ public class ExecutionGraph implements Serializable {
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
-                       FiniteDuration timeout) {
+                       FiniteDuration timeout,
+                       RestartStrategy restartStrategy) {
                this(
                        executionContext,
                        jobId,
                        jobName,
                        jobConfig,
                        timeout,
+                       restartStrategy,
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
                        ExecutionGraph.class.getClassLoader()
@@ -274,6 +272,7 @@ public class ExecutionGraph implements Serializable {
                        String jobName,
                        Configuration jobConfig,
                        FiniteDuration timeout,
+                       RestartStrategy restartStrategy,
                        List<BlobKey> requiredJarFiles,
                        List<URL> requiredClasspaths,
                        ClassLoader userClassLoader) {
@@ -304,6 +303,8 @@ public class ExecutionGraph implements Serializable {
                this.requiredClasspaths = requiredClasspaths;
 
                this.timeout = timeout;
+
+               this.restartStrategy = restartStrategy;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -317,28 +318,6 @@ public class ExecutionGraph implements Serializable {
        public int getNumberOfExecutionJobVertices() {
                return this.verticesInCreationOrder.size();
        }
-       
-       public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
-               if (numberOfRetriesLeft < -1) {
-                       throw new IllegalArgumentException();
-               }
-               this.numberOfRetriesLeft = numberOfRetriesLeft;
-       }
-
-       public int getNumberOfRetriesLeft() {
-               return numberOfRetriesLeft;
-       }
-
-       public void setDelayBeforeRetrying(long delayBeforeRetrying) {
-               if (delayBeforeRetrying < 0) {
-                       throw new IllegalArgumentException("Delay before retry 
must be non-negative.");
-               }
-               this.delayBeforeRetrying = delayBeforeRetrying;
-       }
-
-       public long getDelayBeforeRetrying() {
-               return delayBeforeRetrying;
-       }
 
        public boolean isQueuedSchedulingAllowed() {
                return this.allowQueuedScheduling;
@@ -477,6 +456,10 @@ public class ExecutionGraph implements Serializable {
                return savepointCoordinator;
        }
 
+       public RestartStrategy getRestartStrategy() {
+               return restartStrategy;
+       }
+
        public CheckpointStatsTracker getCheckpointStatsTracker() {
                return checkpointStatsTracker;
        }
@@ -1029,40 +1012,13 @@ public class ExecutionGraph implements Serializable {
                                        else if (current == JobStatus.FAILING) {
                                                boolean isRecoverable = 
!(failureCause instanceof UnrecoverableException);
 
-                                               if (isRecoverable && 
numberOfRetriesLeft > 0 &&
+                                               if (isRecoverable && 
restartStrategy.canRestart() &&
                                                                
transitionState(current, JobStatus.RESTARTING)) {
-
-                                                       numberOfRetriesLeft--;
-                                                       
-                                                       if (delayBeforeRetrying 
> 0) {
-                                                               future(new 
Callable<Object>() {
-                                                                       
@Override
-                                                                       public 
Object call() throws Exception {
-                                                                               
try {
-                                                                               
        LOG.info("Delaying retry of job execution for {} ms ...", 
delayBeforeRetrying);
-                                                                               
        Thread.sleep(delayBeforeRetrying);
-                                                                               
}
-                                                                               
catch(InterruptedException e){
-                                                                               
        // should only happen on shutdown
-                                                                               
}
-                                                                               
restart();
-                                                                               
return null;
-                                                                       }
-                                                               }, 
executionContext);
-                                                       } else {
-                                                               future(new 
Callable<Object>() {
-                                                                       
@Override
-                                                                       public 
Object call() throws Exception {
-                                                                               
restart();
-                                                                               
return null;
-                                                                       }
-                                                               }, 
executionContext);
-                                                       }
+                                                       
restartStrategy.restart(this);
                                                        break;
-                                               }
-                                               else if ((!isRecoverable || 
numberOfRetriesLeft <= 0) &&
-                                                               
transitionState(current, JobStatus.FAILED, failureCause)) {
 
+                                               } else if ((!isRecoverable || 
!restartStrategy.canRestart()) &&
+                                                       
transitionState(current, JobStatus.FAILED, failureCause)) {
                                                        postRunCleanup();
                                                        break;
                                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
new file mode 100644
index 0000000..b5b00e4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.Callable;
+
+import static akka.dispatch.Futures.future;
+
+/**
+ * Restart strategy which tries to restart the given {@link ExecutionGraph} a 
fixed number of times
+ * with a fixed time delay in between.
+ */
+public class FixedDelayRestartStrategy implements RestartStrategy {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FixedDelayRestartStrategy.class);
+
+
+       private final int maxNumberRestartAttempts;
+       private final long delayBetweenRestartAttempts;
+       private int currentRestartAttempt;
+
+       public FixedDelayRestartStrategy(
+               int maxNumberRestartAttempts,
+               long delayBetweenRestartAttempts) {
+
+               Preconditions.checkArgument(maxNumberRestartAttempts >= 0, 
"Maximum number of restart attempts must be positive.");
+               Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, 
"Delay between restart attempts must be positive");
+
+               this.maxNumberRestartAttempts = maxNumberRestartAttempts;
+               this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
+               currentRestartAttempt = 0;
+       }
+
+       public int getCurrentRestartAttempt() {
+               return currentRestartAttempt;
+       }
+
+       @Override
+       public boolean canRestart() {
+               return currentRestartAttempt < maxNumberRestartAttempts;
+       }
+
+       @Override
+       public void restart(final ExecutionGraph executionGraph) {
+               currentRestartAttempt++;
+
+               future(new Callable<Object>() {
+                       @Override
+                       public Object call() throws Exception {
+                               try {
+                                       LOG.info("Delaying retry of job 
execution for {} ms ...", delayBetweenRestartAttempts);
+                                       // do the delay
+                                       
Thread.sleep(delayBetweenRestartAttempts);
+                               } catch(InterruptedException e) {
+                                       // should only happen on shutdown
+                               }
+                               executionGraph.restart();
+                               return null;
+                       }
+               }, executionGraph.getExecutionContext());
+       }
+
+       /**
+        * Creates a FixedDelayRestartStrategy from the given Configuration.
+        *
+        * @param configuration Configuration containing the parameter values 
for the restart strategy
+        * @return Initialized instance of FixedDelayRestartStrategy
+        * @throws Exception
+        */
+       public static FixedDelayRestartStrategy create(Configuration 
configuration) throws Exception {
+               int maxAttempts = 
configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
1);
+
+               String timeoutString = configuration.getString(
+                       ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
+                       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+               String delayString = configuration.getString(
+                       ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                       timeoutString
+               );
+
+               long delay;
+
+               try {
+                       delay = Duration.apply(delayString).toMillis();
+               } catch (NumberFormatException nfe) {
+                       throw new Exception("Invalid config value for " + 
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY +
+                               ": " + delayString + ". Value must be a valid 
duration (such as 100 s or 1 min).");
+               }
+
+               return new FixedDelayRestartStrategy(maxAttempts, delay);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
new file mode 100644
index 0000000..6be56ea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+/**
+ * Restart strategy which does not restart an {@link ExecutionGraph}.
+ */
+public class NoRestartStrategy implements RestartStrategy {
+
+       @Override
+       public boolean canRestart() {
+               return false;
+       }
+
+       @Override
+       public void restart(ExecutionGraph executionGraph) {
+               throw new RuntimeException("NoRestartStrategy does not support 
restart.");
+       }
+
+       /**
+        * Creates a NoRestartStrategy instance.
+        *
+        * @param configuration Configuration object which is ignored
+        * @return NoRestartStrategy instance
+        */
+       public static NoRestartStrategy create(Configuration configuration) {
+               return new NoRestartStrategy();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
new file mode 100644
index 0000000..2880c01
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+/**
+ * Strategy for {@link ExecutionGraph} restarts.
+ */
+public interface RestartStrategy {
+
+       /**
+        * True if the restart strategy can be applied to restart the {@link 
ExecutionGraph}.
+        *
+        * @return true if restart is possible, otherwise false
+        */
+       boolean canRestart();
+
+       /**
+        * Restarts the given {@link ExecutionGraph}.
+        *
+        * @param executionGraph The ExecutionGraph to be restarted
+        */
+       void restart(ExecutionGraph executionGraph);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
new file mode 100644
index 0000000..b9da63d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public class RestartStrategyFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(RestartStrategyFactory.class);
+       private static final String CREATE_METHOD = "create";
+
+       /**
+        * Creates a {@link RestartStrategy} instance from the given {@link 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}.
+        *
+        * @param restartStrategyConfiguration Restart strategy configuration 
which specifies which
+        *                                     restart strategy to instantiate
+        * @return RestartStrategy instance
+        */
+       public static RestartStrategy 
createRestartStrategy(RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+               if (restartStrategyConfiguration instanceof 
RestartStrategies.NoRestartStrategyConfiguration) {
+                       return new NoRestartStrategy();
+               } else if (restartStrategyConfiguration instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration) {
+                       
RestartStrategies.FixedDelayRestartStrategyConfiguration fixedDelayConfig =
+                               
(RestartStrategies.FixedDelayRestartStrategyConfiguration) 
restartStrategyConfiguration;
+
+                       return new FixedDelayRestartStrategy(
+                               fixedDelayConfig.getRestartAttempts(),
+                               fixedDelayConfig.getDelayBetweenAttempts());
+               } else {
+                       throw new IllegalArgumentException("Unknown restart 
strategy configuration " +
+                               restartStrategyConfiguration + ".");
+               }
+       }
+
+       /**
+        * Creates a {@link RestartStrategy} instance from the given {@link 
Configuration}.
+        *
+        * @param configuration Configuration object containing the 
configuration values.
+        * @return RestartStrategy instance
+        * @throws Exception which indicates that the RestartStrategy could not 
be instantiated.
+        */
+       public static RestartStrategy createFromConfig(Configuration 
configuration) throws Exception {
+               String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
+
+               switch (restartStrategyName) {
+                       case "none":
+                               // support deprecated ConfigConstants values
+                               final int numberExecutionRetries = 
configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY,
+                                       
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+                               String pauseString = 
configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+                                       
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+                               String delayString = 
configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
+                                       pauseString);
+
+                               long delay;
+
+                               try {
+                                       delay = 
Duration.apply(delayString).toMillis();
+                               } catch (NumberFormatException nfe) {
+                                       throw new Exception("Invalid config 
value for " + ConfigConstants.EXECUTION_RETRY_DELAY_KEY +
+                                               ": " + delayString + ". Value 
must be a valid duration (such as 100 s or 1 min).");
+                               }
+
+                               if (numberExecutionRetries > 0 && delay >= 0) {
+                                       return new 
FixedDelayRestartStrategy(numberExecutionRetries, delay);
+                               } else {
+                                       return 
NoRestartStrategy.create(configuration);
+                               }
+                       case "off":
+                       case "disable":
+                               return NoRestartStrategy.create(configuration);
+                       case "fixeddelay":
+                       case "fixed-delay":
+                               return 
FixedDelayRestartStrategy.create(configuration);
+                       default:
+                               try {
+                                       Class<?> clazz = 
Class.forName(restartStrategyName);
+
+                                       if (clazz != null) {
+                                               Method method = 
clazz.getMethod(CREATE_METHOD, Configuration.class);
+
+                                               if (method != null) {
+                                                       Object result = 
method.invoke(null, configuration);
+
+                                                       if (result != null) {
+                                                               return 
(RestartStrategy) result;
+                                                       }
+                                               }
+                                       }
+                               } catch (ClassNotFoundException cnfe) {
+                                       LOG.warn("Could not find restart 
strategy class {}.", restartStrategyName);
+                               } catch (NoSuchMethodException nsme) {
+                                       LOG.warn("Class {} does not has static 
method {}.", restartStrategyName, CREATE_METHOD);
+                               } catch (InvocationTargetException ite) {
+                                       LOG.warn("Cannot call static method {} 
from class {}.", CREATE_METHOD, restartStrategyName);
+                               } catch (IllegalAccessException iae) {
+                                       LOG.warn("Illegal access while calling 
method {} from class {}.", CREATE_METHOD, restartStrategyName);
+                               }
+
+                               // fallback in case of an error
+                               return NoRestartStrategy.create(configuration);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 403ad67..e20f737 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 /**
  * The JobGraph represents a Flink dataflow program, at the low level that the 
JobManager accepts.
  * All programs from higher level APIs are transformed into JobGraphs.
@@ -78,10 +79,8 @@ public class JobGraph implements Serializable {
        /** Name of this job. */
        private final String jobName;
 
-       /** The number of times that failed tasks should be re-executed */
-       private int numExecutionRetries;
-
-       private long executionRetryDelay;
+       /** Configuration which defines which restart strategy to use for the 
job recovery */
+       private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
 
        /** The number of seconds after which the corresponding ExecutionGraph 
is removed at the
         * job manager after it has been executed. */
@@ -193,54 +192,22 @@ public class JobGraph implements Serializable {
        }
 
        /**
-        * Sets the number of times that failed tasks are re-executed. A value 
of zero
-        * effectively disables fault tolerance. A value of {@code -1} 
indicates that the system
-        * default value (as defined in the configuration) should be used.
-        *
-        * @param numberOfExecutionRetries The number of times the system will 
try to re-execute failed tasks.
-        */
-       public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
-               if (numberOfExecutionRetries < -1) {
-                       throw new IllegalArgumentException(
-                                       "The number of execution retries must 
be non-negative, or -1 (use system default)");
-               }
-               this.numExecutionRetries = numberOfExecutionRetries;
-       }
-
-       /**
-        * Gets the number of times the system will try to re-execute failed 
tasks. A value
-        * of {@code -1} indicates that the system default value (as defined in 
the configuration)
-        * should be used.
+        * Sets the restart strategy configuration. This configuration 
specifies the restart strategy
+        * to be used by the ExecutionGraph in case of a restart.
         *
-        * @return The number of times the system will try to re-execute failed 
tasks.
-        */
-       public int getNumberOfExecutionRetries() {
-               return numExecutionRetries;
-       }
-
-       /**
-        * Gets the delay of time the system will try to re-execute failed 
tasks. A value of
-        * {@code -1} indicates the system default value (as defined in the 
configuration)
-        * should be used.
-        * @return The delay of time in milliseconds the system will try to 
re-execute failed tasks.
+        * @param restartStrategyConfiguration Restart strategy configuration 
to be set
         */
-       public long getExecutionRetryDelay() {
-               return executionRetryDelay;
+       public void 
setRestartStrategyConfiguration(RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
+               this.restartStrategyConfiguration = 
restartStrategyConfiguration;
        }
 
        /**
-        * Sets the delay that failed tasks are re-executed. A value of zero
-        * effectively disables fault tolerance. A value of {@code -1} 
indicates that the system
-        * default value (as defined in the configuration) should be used.
+        * Gets the restart strategy configuration
         *
-        * @param executionRetryDelay The delay of time the system will wait to 
re-execute failed tasks.
+        * @return Restart strategy configuration to be used
         */
-       public void setExecutionRetryDelay(long executionRetryDelay){
-               if (executionRetryDelay < -1) {
-                       throw new IllegalArgumentException(
-                                       "The delay between reties must be 
non-negative, or -1 (use system default)");
-               }
-               this.executionRetryDelay = executionRetryDelay;
+       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategyConfiguration() {
+               return restartStrategyConfiguration;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 78612c0..bd18160 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net.{BindException, ServerSocket, UnknownHostException, 
InetAddress, InetSocketAddress}
 import java.util.UUID
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.{TimeUnit, ExecutorService}
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.UnrecoverableException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, 
RestartStrategyFactory}
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
@@ -108,13 +109,13 @@ class JobManager(
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
-    protected val defaultExecutionRetries: Int,
-    protected val delayBetweenRetries: Long,
+    protected val defaultRestartStrategy: RestartStrategy,
     protected val timeout: FiniteDuration,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    protected val savepointStore: SavepointStore)
+    protected val savepointStore: SavepointStore,
+    protected val jobRecoveryTimeout: FiniteDuration)
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want 
filtering after logging
   with LogMessages // mixin order is important, we want first logging
@@ -131,7 +132,7 @@ class JobManager(
         log.error("Executor could not execute task", t)
       }
     })
-  
+
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, 
(ExecutionGraph, JobInfo)]()
 
@@ -256,7 +257,7 @@ class JobManager(
 
     // shut down the extra thread pool for futures
     executorService.shutdown()
-    
+
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -280,10 +281,13 @@ class JobManager(
         // TODO (critical next step) This needs to be more flexible and robust 
(e.g. wait for task
         // managers etc.)
         if (recoveryMode != RecoveryMode.STANDALONE) {
-          log.info(s"Delaying recovery of all jobs for $delayBetweenRetries 
ms.")
+          log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")
 
-          context.system.scheduler.scheduleOnce(new 
FiniteDuration(delayBetweenRetries,
-            MILLISECONDS), self, 
decorateMessage(RecoverAllJobs))(context.dispatcher)
+          context.system.scheduler.scheduleOnce(
+            jobRecoveryTimeout,
+            self,
+            decorateMessage(RecoverAllJobs))(
+            context.dispatcher)
         }
       }(context.dispatcher)
 
@@ -903,6 +907,12 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
+        val restartStrategy = 
Option(jobGraph.getRestartStrategyConfiguration())
+          .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+            case Some(strategy) => strategy
+            case None => defaultRestartStrategy
+          }
+
         // see if there already exists an ExecutionGraph for the corresponding 
job ID
         executionGraph = currentJobs.get(jobGraph.getJobID) match {
           case Some((graph, currentJobInfo)) =>
@@ -915,6 +925,7 @@ class JobManager(
               jobGraph.getName,
               jobGraph.getJobConfiguration,
               timeout,
+              restartStrategy,
               jobGraph.getUserJarBlobKeys,
               jobGraph.getClasspaths,
               userCodeLoader)
@@ -923,22 +934,6 @@ class JobManager(
             graph
         }
 
-        // configure the execution graph
-        val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 
0) {
-          jobGraph.getNumberOfExecutionRetries()
-        } else {
-          defaultExecutionRetries
-        }
-
-        val executionRetryDelay = if (jobGraph.getExecutionRetryDelay() >= 0) {
-          jobGraph.getExecutionRetryDelay()
-        }
-        else {
-          delayBetweenRetries
-        }
-
-        executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
-        executionGraph.setDelayBeforeRetrying(executionRetryDelay)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode())
         
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
 
@@ -2032,14 +2027,15 @@ object JobManager {
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
-    Int, // execution retries
-    Long, // delay between retries
+    RestartStrategy,
     FiniteDuration, // timeout
     Int, // number of archived jobs
     LeaderElectionService,
     SubmittedJobGraphStore,
     CheckpointRecoveryFactory,
-    SavepointStore) = {
+    SavepointStore,
+    FiniteDuration // timeout for job recovery
+   ) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -2047,35 +2043,12 @@ object JobManager {
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val executionRetries = configuration.getInteger(
-      ConfigConstants.EXECUTION_RETRIES_KEY,
-      ConfigConstants.DEFAULT_EXECUTION_RETRIES)
+    val restartStrategy = RestartStrategyFactory
+      .createFromConfig(configuration)
 
     val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
 
-    // configure the delay between execution retries.
-    // unless explicitly specifies, this is dependent on the heartbeat timeout
-    val pauseString = 
configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-                                              
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
-    val delayString = 
configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
-                                              pauseString)
-    
-    val delayBetweenRetries: Long = try {
-        Duration(delayString).toMillis
-      }
-      catch {
-        case n: NumberFormatException =>
-          if (delayString.equals(pauseString)) {
-            throw new Exception(
-              s"Invalid config value for 
${ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE}: " +
-                s"$pauseString. Value must be a valid duration (such as '10 s' 
or '1 min')")
-          } else {
-            throw new Exception(
-              s"Invalid config value for 
${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " +
-                s"$delayString. Value must be a valid duration (such as '100 
milli' or '10 s')")
-          }
-      }
 
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
@@ -2139,18 +2112,33 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
+    val jobRecoveryTimeoutStr = 
configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+
+    val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || 
jobRecoveryTimeoutStr.isEmpty) {
+      timeout
+    } else {
+      try {
+        FiniteDuration(Duration(jobRecoveryTimeoutStr).toMillis, 
TimeUnit.MILLISECONDS)
+      } catch {
+        case n: NumberFormatException =>
+          throw new Exception(
+            s"Invalid config value for ${ConfigConstants.RECOVERY_JOB_DELAY}: 
" +
+              s"$jobRecoveryTimeoutStr. Value must be a valid duration (such 
as '10 s' or '1 min')")
+      }
+    }
+
     (executorService,
       instanceManager,
       scheduler,
       libraryCacheManager,
-      executionRetries,
-      delayBetweenRetries,
+      restartStrategy,
       timeout,
       archiveCount,
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      savepointStore)
+      savepointStore,
+      jobRecoveryTimeout)
   }
 
   /**
@@ -2206,14 +2194,14 @@ object JobManager {
     instanceManager,
     scheduler,
     libraryCacheManager,
-    executionRetries,
-    delayBetweenRetries,
+    restartStrategy,
     timeout,
     archiveCount,
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore) = createJobManagerComponents(
+    savepointStore,
+    jobRecoveryTimeout) = createJobManagerComponents(
       configuration,
       None)
 
@@ -2233,13 +2221,13 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archive,
-      executionRetries,
-      delayBetweenRetries,
+      restartStrategy,
       timeout,
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      savepointStore)
+      savepointStore,
+      jobRecoveryTimeout)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index e921e92..26a626e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
@@ -43,14 +44,15 @@ public class ExecutionGraphCheckpointCoordinatorTest {
        @Test
        public void 
testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws 
Exception {
                ExecutionGraph executionGraph = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               new JobID(),
-                               "test",
-                               new Configuration(),
-                               new FiniteDuration(1, TimeUnit.DAYS),
-                               Collections.<BlobKey>emptyList(),
-                               Collections.<URL>emptyList(),
-                               ClassLoader.getSystemClassLoader());
+                       TestingUtils.defaultExecutionContext(),
+                       new JobID(),
+                       "test",
+                       new Configuration(),
+                       new FiniteDuration(1, TimeUnit.DAYS),
+                       new NoRestartStrategy(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       ClassLoader.getSystemClassLoader());
 
                ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index a47ea77..2ca51db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
@@ -105,7 +106,8 @@ public class ExecutionGraphConstructionTest {
                                jobId,
                                jobName,
                                cfg,
-                               AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -148,7 +150,8 @@ public class ExecutionGraphConstructionTest {
                                jobId,
                                jobName,
                                cfg,
-                               AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -214,7 +217,8 @@ public class ExecutionGraphConstructionTest {
                                jobId,
                                jobName,
                                cfg,
-                               AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -467,7 +471,8 @@ public class ExecutionGraphConstructionTest {
                                jobId,
                                jobName,
                                cfg,
-                               AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -522,7 +527,8 @@ public class ExecutionGraphConstructionTest {
                                jobId,
                                jobName,
                                cfg,
-                               AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -582,7 +588,8 @@ public class ExecutionGraphConstructionTest {
                                        jobId,
                                        jobName,
                                        cfg,
-                                       AkkaUtils.getDefaultTimeout());
+                                       AkkaUtils.getDefaultTimeout(),
+                                       new NoRestartStrategy());
                        try {
                                eg.attachJobGraph(ordered);
                        }
@@ -626,7 +633,8 @@ public class ExecutionGraphConstructionTest {
                                        jobId,
                                        jobName,
                                        cfg,
-                                       AkkaUtils.getDefaultTimeout());
+                                       AkkaUtils.getDefaultTimeout(),
+                                       new NoRestartStrategy());
 
                        try {
                                eg.attachJobGraph(ordered);
@@ -696,7 +704,8 @@ public class ExecutionGraphConstructionTest {
                                        jobId,
                                        jobName,
                                        cfg,
-                                       AkkaUtils.getDefaultTimeout());
+                                       AkkaUtils.getDefaultTimeout(),
+                                       new NoRestartStrategy());
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        
                        // check the v1 / v2 co location hints ( assumes 
parallelism(v1) >= parallelism(v2) )

http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index bea7c22..9221bda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -83,7 +84,8 @@ public class ExecutionGraphDeploymentTest {
                                        jobId,
                                        "some job",
                                        new Configuration(),
-                                       AkkaUtils.getDefaultTimeout());
+                                       AkkaUtils.getDefaultTimeout(),
+                                       new NoRestartStrategy());
 
                        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -285,7 +287,8 @@ public class ExecutionGraphDeploymentTest {
                                jobId,
                                "some job",
                                new Configuration(),
-                               AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
@@ -325,4 +328,4 @@ public class ExecutionGraphDeploymentTest {
                        throw new Exception();
                }
        }
-}
\ No newline at end of file
+}

Reply via email to