[FLINK-3187] [restart] Introduce RestartStrategy to ExecutionGraph A RestartStrategy defines how the ExecutionGraph reacts in case of a restart. Different strategies are conceivable. For example, no restart, fixed delay restart, exponential backoff restart, scaling in/out restart, etc.
Expose RestartStrategy to user API This removes the setNumberExecutionRetries and the setDelayBetweenRetries on the ExecutionEnvironment and the ExecutionConfig. Instead the more general RestartStrategy can be set. In order to maintain the separation between the runtime and api module, one sets a RestartStrategyConfiguration which is transformed into a RestartStrategy on the JobManager. Replace old execution-retries configuration parameters by restart-strategy. Add FixedDelayRestartStrategy test case Reintroduce old configuration values and API calls for the deprecated restart mechanism The old configuration values and API calls will be respected if no explicit RestartStrategy has been set. The values, if correct, are used to instantiate a FixedDelayRestartStrategy. Add deprecation comments to the JavaDocs Add logging statement for job recovery Fix JobManagerProcessFailureBatchRecoveryITCase by introducing a job recovery timeout Add proper annotations to RestartStrategies Let ExecutionGraphRestartTest extend TestLogger This closes #1470. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5eae47f5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5eae47f5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5eae47f5 Branch: refs/heads/master Commit: 5eae47f5d381cd55729660f62c714b52d28413ed Parents: b17632d Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Dec 17 13:49:10 2015 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Feb 15 15:58:05 2016 +0100 ---------------------------------------------------------------------- docs/apis/streaming/fault_tolerance.md | 168 +++++++++++++++++++ docs/internals/monitoring_rest_api.md | 2 +- docs/setup/config.md | 16 +- .../flink/api/common/ExecutionConfig.java | 85 +++++++++- .../java/org/apache/flink/api/common/Plan.java | 24 +-- .../restartstrategy/RestartStrategies.java | 120 +++++++++++++ .../flink/configuration/ConfigConstants.java | 42 ++++- .../streaming/examples/kafka/ReadFromKafka.java | 3 +- .../examples/kafka/WriteIntoKafka.java | 3 +- .../flink/api/java/ExecutionEnvironment.java | 32 ++++ .../flink/python/api/PythonPlanBinder.java | 4 +- .../plantranslate/JobGraphGenerator.java | 4 +- .../webmonitor/handlers/JobConfigHandler.java | 4 +- .../app/partials/jobs/job.config.jade | 2 +- .../runtime/executiongraph/ExecutionGraph.java | 82 +++------ .../restart/FixedDelayRestartStrategy.java | 116 +++++++++++++ .../restart/NoRestartStrategy.java | 48 ++++++ .../executiongraph/restart/RestartStrategy.java | 41 +++++ .../restart/RestartStrategyFactory.java | 127 ++++++++++++++ .../apache/flink/runtime/jobgraph/JobGraph.java | 57 ++----- .../flink/runtime/jobmanager/JobManager.scala | 112 ++++++------- ...ExecutionGraphCheckpointCoordinatorTest.java | 18 +- .../ExecutionGraphConstructionTest.java | 25 ++- .../ExecutionGraphDeploymentTest.java | 9 +- .../ExecutionGraphRestartTest.java | 56 ++++--- .../executiongraph/ExecutionGraphTestUtils.java | 4 +- .../ExecutionStateProgressTest.java | 6 +- .../executiongraph/LocalInputSplitsTest.java | 7 +- .../executiongraph/PointwisePatternTest.java | 22 ++- .../TerminalStateDeadlockTest.java | 13 +- .../VertexLocationConstraintTest.java | 19 ++- .../executiongraph/VertexSlotSharingTest.java | 4 +- .../restart/FixedDelayRestartStrategyTest.java | 51 ++++++ .../partition/PipelinedSubpartitionTest.java | 4 - .../JobManagerLeaderElectionTest.java | 7 +- .../runtime/testutils/ZooKeeperTestUtils.java | 2 +- .../TaskManagerLossFailsTasksTest.scala | 6 +- .../runtime/jobmanager/RecoveryITCase.scala | 9 +- .../runtime/testingUtils/TestingCluster.scala | 12 +- .../testingUtils/TestingJobManager.scala | 13 +- .../runtime/testingUtils/TestingUtils.scala | 12 +- .../flink/api/scala/ExecutionEnvironment.scala | 46 ++++- .../connectors/kafka/Kafka08ITCase.java | 19 +-- .../connectors/kafka/KafkaConsumerTestBase.java | 27 +-- .../connectors/kafka/KafkaProducerTestBase.java | 3 +- .../connectors/kafka/KafkaTestBase.java | 2 +- .../kafka/testutils/DataGenerators.java | 5 +- .../environment/StreamExecutionEnvironment.java | 32 ++++ .../api/graph/StreamingJobGraphGenerator.java | 30 ++-- .../partitioner/RescalePartitionerTest.java | 7 +- .../runtime/state/StateBackendITCase.java | 3 +- .../streaming/timestamp/TimestampITCase.java | 1 - .../api/scala/StreamExecutionEnvironment.scala | 49 ++++-- .../EventTimeAllWindowCheckpointingITCase.java | 10 +- .../EventTimeWindowCheckpointingITCase.java | 14 +- .../test/checkpointing/SavepointITCase.java | 18 +- .../StreamCheckpointNotifierITCase.java | 2 +- .../StreamFaultToleranceTestBase.java | 1 - .../WindowCheckpointingITCase.java | 10 +- .../test/classloading/ClassLoaderITCase.java | 1 - .../jar/CheckpointedStreamingProgram.java | 3 +- .../test/failingPrograms/TaskFailureITCase.java | 4 +- ...ctTaskManagerProcessFailureRecoveryTest.java | 2 +- .../flink/test/recovery/FastFailuresITCase.java | 6 +- ...anagerProcessFailureBatchRecoveryITCase.java | 3 +- .../recovery/ProcessFailureCancelingITCase.java | 3 +- .../test/recovery/SimpleRecoveryITCase.java | 13 +- .../TaskManagerFailureRecoveryITCase.java | 3 +- ...anagerProcessFailureBatchRecoveryITCase.java | 3 +- ...erProcessFailureStreamingRecoveryITCase.java | 3 +- .../ZooKeeperLeaderElectionITCase.java | 2 +- .../flink/yarn/TestingYarnJobManager.scala | 16 +- .../org/apache/flink/yarn/YarnJobManager.scala | 16 +- 73 files changed, 1314 insertions(+), 434 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/docs/apis/streaming/fault_tolerance.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md index 4b91c61..f1a6803 100644 --- a/docs/apis/streaming/fault_tolerance.md +++ b/docs/apis/streaming/fault_tolerance.md @@ -194,3 +194,171 @@ state updates) of Flink coupled with bundled sinks: </table> {% top %} + +Restart Strategies +------------------ + +Flink supports different restart strategies which control how the jobs are restarted in case of a failure. +The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined. +In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting. + +The default restart strategy is set via Flink's configuration file `flink-conf.yaml`. +The configuration parameter *restart-strategy* defines which strategy is taken. +Per default, the no-restart strategy is used. +See the following list of available restart strategies to learn what values are supported. + +Each restart strategy comes with its own set of parameters which control its behaviour. +These values are also set in the configuration file. +The description of each restart strategy contains more information about the respective configuration values. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 50%">Restart Strategy</th> + <th class="text-left">Value for restart-strategy</th> + </tr> + </thead> + <tbody> + <tr> + <td>Fixed delay</td> + <td>fixed-delay</td> + </tr> + <tr> + <td>No restart</td> + <td>none</td> + </tr> + </tbody> +</table> + +Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy. +This restart strategy is set programmatically by calling the `setRestartStrategy` method on the `ExecutionEnvironment`. +Note that this also works for the `StreamExecutionEnvironment`. + +The following example shows how we can set a fixed delay restart strategy for our job. +In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setRestartStrategy(RestartStrategies.fixedDelay( + 3, // number of restart attempts + 10000 // delay in milliseconds +)); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment() +env.setRestartStrategy(RestartStrategies.fixedDelay( + 3, // number of restart attempts + 10000 // delay in milliseconds +)) +{% endhighlight %} +</div> +</div> + +## Fixed Delay Restart Strategy + +The fixed delay restart strategy attempts a given number of times to restart the job. +If the maximum number of attempts is exceeded, the job eventually fails. +In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time. + +This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`. + +~~~ +restart-strategy: fixed-delay +~~~ + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 40%">Configuration Parameter</th> + <th class="text-left" style="width: 40%">Description</th> + <th class="text-left">Default Value</th> + </tr> + </thead> + <tbody> + <tr> + <td><it>restart-strategy.fixed-delay.attempts</it></td> + <td>Number of restart attempts</td> + <td>1</td> + </tr> + <tr> + <td><it>restart-strategy.fixed-delay.delay</it></td> + <td>Delay between two consecutive restart attempts</td> + <td><it>akka.ask.timeout</it></td> + </tr> + </tbody> +</table> + +~~~ +restart-strategy.fixed-delay.attempts: 3 +restart-strategy.fixed-delay.delay: 10 s +~~~ + +The fixed delay restart strategy can also be set programmatically: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setRestartStrategy(RestartStrategies.fixedDelay( + 3, // number of restart attempts + 10000 // delay in milliseconds +)); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment() +env.setRestartStrategy(RestartStrategies.fixedDelay( + 3, // number of restart attempts + 10000 // delay in milliseconds +)) +{% endhighlight %} +</div> +</div> + +### Restart Attempts + +The number of times that Flink retries the execution before the job is declared as failed is configurable via the *restart-strategy.fixed-delay.attempts* parameter. + +The default value is **1**. + +### Retry Delays + +Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay. + +Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. + +The default value is the value of *akka.ask.timeout*. + +## No Restart Strategy + +The job fails directly and no restart is attempted. + +~~~ +restart-strategy: none +~~~ + +The no restart strategy can also be set programmatically: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setRestartStrategy(RestartStrategies.noRestart()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment() +env.setRestartStrategy(RestartStrategies.noRestart()) +{% endhighlight %} +</div> +</div> + +[Back to top](#top) + + http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/docs/internals/monitoring_rest_api.md ---------------------------------------------------------------------- diff --git a/docs/internals/monitoring_rest_api.md b/docs/internals/monitoring_rest_api.md index f53d46f..97a5c51 100644 --- a/docs/internals/monitoring_rest_api.md +++ b/docs/internals/monitoring_rest_api.md @@ -246,7 +246,7 @@ Sample Result: "name": "WordCount Example", "execution-config": { "execution-mode": "PIPELINED", - "max-execution-retries": -1, + "restart-strategy": "Restart deactivated", "job-parallelism": -1, "object-reuse-mode": false } http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 343a856..59cccc3 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -118,9 +118,17 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber - `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine. -- `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values have to be specified as strings with a unit. - -- `execution-retries.default`: Default number of execution retries, used by jobs that do not explicitly specify that value on the execution environment. Default value is zero. +- `restart-strategy`: Default restart strategy to use in case that no restart strategy has been specified for the submitted job. +Currently, it can be chosen between using a fixed delay restart strategy and to turn it off. +To use the fixed delay strategy you have to specify "fixed-delay". +To turn the restart behaviour off you have to specify "none". +Default value "none". + +- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default restart strategy is set to "fixed-delay". +Default value is 1. + +- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay". +Default value is the `akka.ask.timeout`. ## Full Reference @@ -247,6 +255,8 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up. +- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. + ## Background ### Configuring the Network Buffers http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 8d5211b..b31106f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common; import com.esotericsoftware.kryo.Serializer; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import java.io.Serializable; import java.util.LinkedHashMap; @@ -76,7 +77,11 @@ public class ExecutionConfig implements Serializable { private int parallelism = -1; - private int numberOfExecutionRetries = -1; + /** + * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration + */ + @Deprecated + private int numberOfExecutionRetries = 0; private boolean forceKryo = false; @@ -96,9 +101,15 @@ public class ExecutionConfig implements Serializable { private long autoWatermarkInterval = 0; private boolean timestampsEnabled = false; - - private long executionRetryDelay = -1; + /** + * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration + */ + @Deprecated + private long executionRetryDelay = 0; + + private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration; + // Serializers and types registered with Kryo and the PojoSerializer // we store them in linked maps/sets to ensure they are registered in order in all kryo instances. @@ -245,19 +256,64 @@ public class ExecutionConfig implements Serializable { } /** + * Sets the restart strategy to be used for recovery. + * + * <pre>{@code + * ExecutionConfig config = env.getConfig(); + * + * config.setRestartStrategy(RestartStrategies.fixedDelayRestart( + * 10, // number of retries + * 1000 // delay between retries)); + * }</pre> + * + * @param restartStrategyConfiguration Configuration defining the restart strategy to use + */ + @PublicEvolving + public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) { + this.restartStrategyConfiguration = restartStrategyConfiguration; + } + + /** + * Returns the restart strategy which has been set for the current job. + * + * @return The specified restart configuration + */ + @PublicEvolving + public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() { + if (restartStrategyConfiguration == null) { + // support the old API calls by creating a restart strategy from them + if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) { + return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay()); + } else { + return null; + } + } else { + return restartStrategyConfiguration; + } + } + + /** * Gets the number of times the system will try to re-execute failed tasks. A value * of {@code -1} indicates that the system default value (as defined in the configuration) * should be used. * * @return The number of times the system will try to re-execute failed tasks. + * + * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration */ + @Deprecated public int getNumberOfExecutionRetries() { return numberOfExecutionRetries; } /** * Returns the delay between execution retries. + * + * @return The delay between successive execution retries in milliseconds. + * + * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration */ + @Deprecated public long getExecutionRetryDelay() { return executionRetryDelay; } @@ -268,11 +324,18 @@ public class ExecutionConfig implements Serializable { * default value (as defined in the configuration) should be used. * * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. + * + * @return The current execution configuration + * + * @deprecated This method will be replaced by {@link #setRestartStrategy}. The + * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of + * execution retries. */ + @Deprecated public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) { if (numberOfExecutionRetries < -1) { throw new IllegalArgumentException( - "The number of execution retries must be non-negative, or -1 (use system default)"); + "The number of execution retries must be non-negative, or -1 (use system default)"); } this.numberOfExecutionRetries = numberOfExecutionRetries; return this; @@ -282,15 +345,23 @@ public class ExecutionConfig implements Serializable { * Sets the delay between executions. A value of {@code -1} indicates that the default value * should be used. * @param executionRetryDelay The number of milliseconds the system will wait to retry. + * + * @return The current execution configuration + * + * @deprecated This method will be replaced by {@link #setRestartStrategy}. The + * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the delay between + * successive execution attempts. */ + @Deprecated public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { if (executionRetryDelay < -1 ) { throw new IllegalArgumentException( - "The delay between reties must be non-negative, or -1 (use system default)"); + "The delay between reties must be non-negative, or -1 (use system default)"); } this.executionRetryDelay = executionRetryDelay; return this; } + /** * Sets the execution mode to execute the program. The execution mode defines whether * data exchanges are performed in a batch or on a pipelined manner. @@ -614,7 +685,7 @@ public class ExecutionConfig implements Serializable { Objects.equals(executionMode, other.executionMode) && useClosureCleaner == other.useClosureCleaner && parallelism == other.parallelism && - numberOfExecutionRetries == other.numberOfExecutionRetries && + restartStrategyConfiguration.equals(other.restartStrategyConfiguration) && forceKryo == other.forceKryo && objectReuse == other.objectReuse && autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled && @@ -640,7 +711,7 @@ public class ExecutionConfig implements Serializable { executionMode, useClosureCleaner, parallelism, - numberOfExecutionRetries, + restartStrategyConfiguration, forceKryo, objectReuse, autoTypeRegistrationEnabled, http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/api/common/Plan.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index 899b9d6..d81fcd1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -38,6 +38,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Visitable; @@ -292,24 +293,15 @@ public class Plan implements Visitable<Operator<?>> { this.defaultParallelism = defaultParallelism; } - - /** - * Gets the number of times the system will try to re-execute failed tasks. A value - * of {@code -1} indicates that the system default value (as defined in the configuration) - * should be used. - * - * @return The number of times the system will try to re-execute failed tasks. - */ - public int getNumberOfExecutionRetries() { - return getExecutionConfig().getNumberOfExecutionRetries(); - } - + /** - * Gets the delay between retry failed task. - * @return The delay the system will wait to retry. + * Returns the specified restart strategy configuration. This configuration defines the used + * restart strategy to be used at runtime. + * + * @return The specified restart strategy configuration */ - public long getExecutionRetryDelay() { - return getExecutionConfig().getExecutionRetryDelay(); + public RestartStrategies.RestartStrategyConfiguration getRestartStrategyConfiguration() { + return getExecutionConfig().getRestartStrategy(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java new file mode 100644 index 0000000..12f9d08 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.restartstrategy; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * This class defines methods to generate RestartStrategyConfigurations. These configurations are + * used to create RestartStrategies at runtime. + * + * The RestartStrategyConfigurations are used to decouple the core module from the runtime module. + */ +@PublicEvolving +public class RestartStrategies { + + /** + * Generates NoRestartStrategyConfiguration + * + * @return NoRestartStrategyConfiguration + */ + public static RestartStrategyConfiguration noRestart() { + return new NoRestartStrategyConfiguration(); + } + + /** + * Generates a FixedDelayRestartStrategyConfiguration. + * + * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy + * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy + * @return FixedDelayRestartStrategy + */ + public static RestartStrategyConfiguration fixedDelayRestart( + int restartAttempts, + long delayBetweenAttempts) { + + return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts); + } + + public abstract static class RestartStrategyConfiguration implements Serializable { + private static final long serialVersionUID = 6285853591578313960L; + + private RestartStrategyConfiguration() {} + + /** + * Returns a description which is shown in the web interface + * + * @return Description of the restart strategy + */ + public abstract String getDescription(); + } + + final public static class NoRestartStrategyConfiguration extends RestartStrategyConfiguration { + private static final long serialVersionUID = -5894362702943349962L; + + @Override + public String getDescription() { + return "Restart deactivated."; + } + } + + final public static class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration { + private static final long serialVersionUID = 4149870149673363190L; + + private final int restartAttempts; + private final long delayBetweenAttempts; + + FixedDelayRestartStrategyConfiguration(int restartAttempts, long delayBetweenAttempts) { + this.restartAttempts = restartAttempts; + this.delayBetweenAttempts = delayBetweenAttempts; + } + + public int getRestartAttempts() { + return restartAttempts; + } + + public long getDelayBetweenAttempts() { + return delayBetweenAttempts; + } + + @Override + public int hashCode() { + return 31 * restartAttempts + (int)(delayBetweenAttempts ^ (delayBetweenAttempts >>> 32)); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FixedDelayRestartStrategyConfiguration) { + FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj; + + return restartAttempts == other.restartAttempts && delayBetweenAttempts == other.delayBetweenAttempts; + } else { + return false; + } + } + + @Override + public String getDescription() { + return "Restart with fixed delay (" + delayBetweenAttempts + " ms). #" + + restartAttempts + " restart attempts."; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 2b75644..b2bbda1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -19,6 +19,7 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; /** * This class contains all constants for the configuration. That includes the configuration keys and @@ -38,16 +39,50 @@ public final class ConfigConstants { */ public static final String DEFAULT_PARALLELISM_KEY = "parallelism.default"; + // ---------------------------- Restart strategies ------------------------ + + /** + * Defines the restart strategy to be used. It can be "off", "none", "disable" to be disabled or + * it can be "fixeddelay", "fixed-delay" to use the FixedDelayRestartStrategy. You can also + * specify a class name which implements the RestartStrategy interface and has a static + * create method which takes a Configuration object. + */ + @PublicEvolving + public static final String RESTART_STRATEGY = "restart-strategy"; + + /** + * Maximum number of attempts the fixed delay restart strategy will try before failing a job. + */ + @PublicEvolving + public static final String RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS = "restart-strategy.fixed-delay.attempts"; + + /** + * Delay between two consecutive restart attempts. It can be specified using Scala's + * FiniteDuration notation: "1 min", "20 s" + */ + @PublicEvolving + public static final String RESTART_STRATEGY_FIXED_DELAY_DELAY = "restart-strategy.fixed-delay.delay"; + /** * Config parameter for the number of re-tries for failed tasks. Setting this * value to 0 effectively disables fault tolerance. + * + * @deprecated The configuration value will be replaced by {@link #RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS} + * and the corresponding FixedDelayRestartStrategy. */ + @Deprecated + @PublicEvolving public static final String EXECUTION_RETRIES_KEY = "execution-retries.default"; /** * Config parameter for the delay between execution retries. The value must be specified in the * notation "10 s" or "1 min" (style of Scala Finite Durations) + * + * @deprecated The configuration value will be replaced by {@link #RESTART_STRATEGY_FIXED_DELAY_DELAY} + * and the corresponding FixedDelayRestartStrategy. */ + @Deprecated + @PublicEvolving public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay"; // -------------------------------- Runtime ------------------------------- @@ -268,8 +303,6 @@ public final class ConfigConstants { */ public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; - - /** * The config parameter defining the Akka actor system port for the ApplicationMaster and * JobManager @@ -471,6 +504,9 @@ public final class ConfigConstants { /** Ports used by the job manager if not in standalone recovery mode */ public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port"; + /** The time before the JobManager recovers persisted jobs */ + public static final String RECOVERY_JOB_DELAY = "recovery.job.delay"; + // --------------------------- ZooKeeper ---------------------------------- /** ZooKeeper servers. */ @@ -521,7 +557,7 @@ public final class ConfigConstants { * The default number of execution retries. */ public static final int DEFAULT_EXECUTION_RETRIES = 0; - + // ------------------------------ Runtime --------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java index 2179eca..0b6e7f7 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.kafka; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -45,7 +46,7 @@ public class ReadFromKafka { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(3); // retry if job fails + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(5000); // create a checkpoint every 5 secodns env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java index 0a33265..697b8e0 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.kafka; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -43,7 +44,7 @@ public class WriteIntoKafka { StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(3); + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); // very simple data generator DataStream<String> messageStream = env.addSource(new SourceFunction<String>() { http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 512fe42..f0006a4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -35,6 +35,7 @@ import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.operators.OperatorInformation; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; @@ -182,12 +183,38 @@ public abstract class ExecutionEnvironment { } /** + * Sets the restart strategy configuration. The configuration specifies which restart strategy + * will be used for the execution graph in case of a restart. + * + * @param restartStrategyConfiguration Restart strategy configuration to be set + */ + @PublicEvolving + public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) { + config.setRestartStrategy(restartStrategyConfiguration); + } + + /** + * Returns the specified restart strategy configuration. + * + * @return The restart strategy configuration to be used + */ + @PublicEvolving + public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() { + return config.getRestartStrategy(); + } + + /** * Sets the number of times that failed tasks are re-executed. A value of zero * effectively disables fault tolerance. A value of {@code -1} indicates that the system * default value (as defined in the configuration) should be used. * * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. + * + * @deprecated This method will be replaced by {@link #setRestartStrategy}. The + * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of + * execution retries. */ + @Deprecated @PublicEvolving public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { config.setNumberOfExecutionRetries(numberOfExecutionRetries); @@ -199,7 +226,12 @@ public abstract class ExecutionEnvironment { * should be used. * * @return The number of times the system will try to re-execute failed tasks. + * + * @deprecated This method will be replaced by {@link #getRestartStrategy}. The + * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of + * execution retries. */ + @Deprecated @PublicEvolving public int getNumberOfExecutionRetries() { return config.getNumberOfExecutionRetries(); http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 0479c0b..91e2369 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -19,7 +19,9 @@ import java.net.URISyntaxException; import java.util.Arrays; import java.util.HashMap; import java.util.Random; + import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; @@ -262,7 +264,7 @@ public class PythonPlanBinder { break; case RETRY: int retry = (Integer) value.getField(1); - env.setNumberOfExecutionRetries(retry); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L)); break; case DEBUG: DEBUG = (Boolean) value.getField(1); http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 159a94a..f59c347 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -218,8 +218,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // create the job graph object JobGraph graph = new JobGraph(jobId, program.getJobName()); - graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries()); - graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay()); + + graph.setRestartStrategyConfiguration(program.getOriginalPlan().getRestartStrategyConfiguration()); graph.setAllowQueuedScheduling(false); graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout()); http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index f39bb1a..0f2f514 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -50,7 +50,9 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { gen.writeObjectFieldStart("execution-config"); gen.writeStringField("execution-mode", ec.getExecutionMode().name()); - gen.writeNumberField("max-execution-retries", ec.getNumberOfExecutionRetries()); + + final String restartStrategyDescription = ec.getRestartStrategy() != null ? ec.getRestartStrategy().getDescription() : "default"; + gen.writeStringField("restart-strategy", restartStrategyDescription); gen.writeNumberField("job-parallelism", ec.getParallelism()); gen.writeBooleanField("object-reuse-mode", ec.isObjectReuseEnabled()); http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade index 83b7880..96d0369 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.config.jade @@ -28,7 +28,7 @@ table.table.table-properties(ng-if="job['execution-config']") tr td Max. number of execution retries - td {{ job['execution-config']['max-execution-retries'] === -1 ? 'deactivated' : job['execution-config']['max-execution-retries'] }} + td {{ job['execution-config']['restart-strategy'] }} tr td Job parallelism http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index c3bfbb0..20288fb 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.UnrecoverableException; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -81,13 +82,10 @@ import java.util.Collection; import java.util.HashSet; import java.util.NoSuchElementException; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import static akka.dispatch.Futures.future; - /** * The execution graph is the central data structure that coordinates the distributed * execution of a data flow. It keeps representations of each parallel task, each @@ -183,12 +181,6 @@ public class ExecutionGraph implements Serializable { // ------ Configuration of the Execution ------- - /** The number of times failed executions should be retried. */ - private int numberOfRetriesLeft; - - /** The delay that the system should wait before restarting failed executions. */ - private long delayBeforeRetrying; - /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able * to deploy them immediately. */ private boolean allowQueuedScheduling = false; @@ -219,6 +211,10 @@ public class ExecutionGraph implements Serializable { @SuppressWarnings("NonSerializableFieldInSerializableClass") private Scheduler scheduler; + /** Strategy to use for restarts */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private RestartStrategy restartStrategy; + /** The classloader for the user code. Needed for calls into user code classes */ @SuppressWarnings("NonSerializableFieldInSerializableClass") private ClassLoader userClassLoader; @@ -255,13 +251,15 @@ public class ExecutionGraph implements Serializable { JobID jobId, String jobName, Configuration jobConfig, - FiniteDuration timeout) { + FiniteDuration timeout, + RestartStrategy restartStrategy) { this( executionContext, jobId, jobName, jobConfig, timeout, + restartStrategy, new ArrayList<BlobKey>(), new ArrayList<URL>(), ExecutionGraph.class.getClassLoader() @@ -274,6 +272,7 @@ public class ExecutionGraph implements Serializable { String jobName, Configuration jobConfig, FiniteDuration timeout, + RestartStrategy restartStrategy, List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, ClassLoader userClassLoader) { @@ -304,6 +303,8 @@ public class ExecutionGraph implements Serializable { this.requiredClasspaths = requiredClasspaths; this.timeout = timeout; + + this.restartStrategy = restartStrategy; } // -------------------------------------------------------------------------------------------- @@ -317,28 +318,6 @@ public class ExecutionGraph implements Serializable { public int getNumberOfExecutionJobVertices() { return this.verticesInCreationOrder.size(); } - - public void setNumberOfRetriesLeft(int numberOfRetriesLeft) { - if (numberOfRetriesLeft < -1) { - throw new IllegalArgumentException(); - } - this.numberOfRetriesLeft = numberOfRetriesLeft; - } - - public int getNumberOfRetriesLeft() { - return numberOfRetriesLeft; - } - - public void setDelayBeforeRetrying(long delayBeforeRetrying) { - if (delayBeforeRetrying < 0) { - throw new IllegalArgumentException("Delay before retry must be non-negative."); - } - this.delayBeforeRetrying = delayBeforeRetrying; - } - - public long getDelayBeforeRetrying() { - return delayBeforeRetrying; - } public boolean isQueuedSchedulingAllowed() { return this.allowQueuedScheduling; @@ -477,6 +456,10 @@ public class ExecutionGraph implements Serializable { return savepointCoordinator; } + public RestartStrategy getRestartStrategy() { + return restartStrategy; + } + public CheckpointStatsTracker getCheckpointStatsTracker() { return checkpointStatsTracker; } @@ -1029,40 +1012,13 @@ public class ExecutionGraph implements Serializable { else if (current == JobStatus.FAILING) { boolean isRecoverable = !(failureCause instanceof UnrecoverableException); - if (isRecoverable && numberOfRetriesLeft > 0 && + if (isRecoverable && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { - - numberOfRetriesLeft--; - - if (delayBeforeRetrying > 0) { - future(new Callable<Object>() { - @Override - public Object call() throws Exception { - try { - LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying); - Thread.sleep(delayBeforeRetrying); - } - catch(InterruptedException e){ - // should only happen on shutdown - } - restart(); - return null; - } - }, executionContext); - } else { - future(new Callable<Object>() { - @Override - public Object call() throws Exception { - restart(); - return null; - } - }, executionContext); - } + restartStrategy.restart(this); break; - } - else if ((!isRecoverable || numberOfRetriesLeft <= 0) && - transitionState(current, JobStatus.FAILED, failureCause)) { + } else if ((!isRecoverable || !restartStrategy.canRestart()) && + transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java new file mode 100644 index 0000000..b5b00e4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.Callable; + +import static akka.dispatch.Futures.future; + +/** + * Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number of times + * with a fixed time delay in between. + */ +public class FixedDelayRestartStrategy implements RestartStrategy { + private static final Logger LOG = LoggerFactory.getLogger(FixedDelayRestartStrategy.class); + + + private final int maxNumberRestartAttempts; + private final long delayBetweenRestartAttempts; + private int currentRestartAttempt; + + public FixedDelayRestartStrategy( + int maxNumberRestartAttempts, + long delayBetweenRestartAttempts) { + + Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive."); + Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive"); + + this.maxNumberRestartAttempts = maxNumberRestartAttempts; + this.delayBetweenRestartAttempts = delayBetweenRestartAttempts; + currentRestartAttempt = 0; + } + + public int getCurrentRestartAttempt() { + return currentRestartAttempt; + } + + @Override + public boolean canRestart() { + return currentRestartAttempt < maxNumberRestartAttempts; + } + + @Override + public void restart(final ExecutionGraph executionGraph) { + currentRestartAttempt++; + + future(new Callable<Object>() { + @Override + public Object call() throws Exception { + try { + LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttempts); + // do the delay + Thread.sleep(delayBetweenRestartAttempts); + } catch(InterruptedException e) { + // should only happen on shutdown + } + executionGraph.restart(); + return null; + } + }, executionGraph.getExecutionContext()); + } + + /** + * Creates a FixedDelayRestartStrategy from the given Configuration. + * + * @param configuration Configuration containing the parameter values for the restart strategy + * @return Initialized instance of FixedDelayRestartStrategy + * @throws Exception + */ + public static FixedDelayRestartStrategy create(Configuration configuration) throws Exception { + int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + + String timeoutString = configuration.getString( + ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + + String delayString = configuration.getString( + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, + timeoutString + ); + + long delay; + + try { + delay = Duration.apply(delayString).toMillis(); + } catch (NumberFormatException nfe) { + throw new Exception("Invalid config value for " + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + + ": " + delayString + ". Value must be a valid duration (such as 100 s or 1 min)."); + } + + return new FixedDelayRestartStrategy(maxAttempts, delay); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java new file mode 100644 index 0000000..6be56ea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; + +/** + * Restart strategy which does not restart an {@link ExecutionGraph}. + */ +public class NoRestartStrategy implements RestartStrategy { + + @Override + public boolean canRestart() { + return false; + } + + @Override + public void restart(ExecutionGraph executionGraph) { + throw new RuntimeException("NoRestartStrategy does not support restart."); + } + + /** + * Creates a NoRestartStrategy instance. + * + * @param configuration Configuration object which is ignored + * @return NoRestartStrategy instance + */ + public static NoRestartStrategy create(Configuration configuration) { + return new NoRestartStrategy(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java new file mode 100644 index 0000000..2880c01 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.runtime.executiongraph.ExecutionGraph; + +/** + * Strategy for {@link ExecutionGraph} restarts. + */ +public interface RestartStrategy { + + /** + * True if the restart strategy can be applied to restart the {@link ExecutionGraph}. + * + * @return true if restart is possible, otherwise false + */ + boolean canRestart(); + + /** + * Restarts the given {@link ExecutionGraph}. + * + * @param executionGraph The ExecutionGraph to be restarted + */ + void restart(ExecutionGraph executionGraph); +} http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java new file mode 100644 index 0000000..b9da63d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class RestartStrategyFactory { + private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class); + private static final String CREATE_METHOD = "create"; + + /** + * Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}. + * + * @param restartStrategyConfiguration Restart strategy configuration which specifies which + * restart strategy to instantiate + * @return RestartStrategy instance + */ + public static RestartStrategy createRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) { + if (restartStrategyConfiguration instanceof RestartStrategies.NoRestartStrategyConfiguration) { + return new NoRestartStrategy(); + } else if (restartStrategyConfiguration instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration) { + RestartStrategies.FixedDelayRestartStrategyConfiguration fixedDelayConfig = + (RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategyConfiguration; + + return new FixedDelayRestartStrategy( + fixedDelayConfig.getRestartAttempts(), + fixedDelayConfig.getDelayBetweenAttempts()); + } else { + throw new IllegalArgumentException("Unknown restart strategy configuration " + + restartStrategyConfiguration + "."); + } + } + + /** + * Creates a {@link RestartStrategy} instance from the given {@link Configuration}. + * + * @param configuration Configuration object containing the configuration values. + * @return RestartStrategy instance + * @throws Exception which indicates that the RestartStrategy could not be instantiated. + */ + public static RestartStrategy createFromConfig(Configuration configuration) throws Exception { + String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase(); + + switch (restartStrategyName) { + case "none": + // support deprecated ConfigConstants values + final int numberExecutionRetries = configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY, + ConfigConstants.DEFAULT_EXECUTION_RETRIES); + String pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + String delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, + pauseString); + + long delay; + + try { + delay = Duration.apply(delayString).toMillis(); + } catch (NumberFormatException nfe) { + throw new Exception("Invalid config value for " + ConfigConstants.EXECUTION_RETRY_DELAY_KEY + + ": " + delayString + ". Value must be a valid duration (such as 100 s or 1 min)."); + } + + if (numberExecutionRetries > 0 && delay >= 0) { + return new FixedDelayRestartStrategy(numberExecutionRetries, delay); + } else { + return NoRestartStrategy.create(configuration); + } + case "off": + case "disable": + return NoRestartStrategy.create(configuration); + case "fixeddelay": + case "fixed-delay": + return FixedDelayRestartStrategy.create(configuration); + default: + try { + Class<?> clazz = Class.forName(restartStrategyName); + + if (clazz != null) { + Method method = clazz.getMethod(CREATE_METHOD, Configuration.class); + + if (method != null) { + Object result = method.invoke(null, configuration); + + if (result != null) { + return (RestartStrategy) result; + } + } + } + } catch (ClassNotFoundException cnfe) { + LOG.warn("Could not find restart strategy class {}.", restartStrategyName); + } catch (NoSuchMethodException nsme) { + LOG.warn("Class {} does not has static method {}.", restartStrategyName, CREATE_METHOD); + } catch (InvocationTargetException ite) { + LOG.warn("Cannot call static method {} from class {}.", CREATE_METHOD, restartStrategyName); + } catch (IllegalAccessException iae) { + LOG.warn("Illegal access while calling method {} from class {}.", CREATE_METHOD, restartStrategyName); + } + + // fallback in case of an error + return NoRestartStrategy.create(configuration); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 403ad67..e20f737 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; /** * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts. * All programs from higher level APIs are transformed into JobGraphs. @@ -78,10 +79,8 @@ public class JobGraph implements Serializable { /** Name of this job. */ private final String jobName; - /** The number of times that failed tasks should be re-executed */ - private int numExecutionRetries; - - private long executionRetryDelay; + /** Configuration which defines which restart strategy to use for the job recovery */ + private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration; /** The number of seconds after which the corresponding ExecutionGraph is removed at the * job manager after it has been executed. */ @@ -193,54 +192,22 @@ public class JobGraph implements Serializable { } /** - * Sets the number of times that failed tasks are re-executed. A value of zero - * effectively disables fault tolerance. A value of {@code -1} indicates that the system - * default value (as defined in the configuration) should be used. - * - * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. - */ - public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { - if (numberOfExecutionRetries < -1) { - throw new IllegalArgumentException( - "The number of execution retries must be non-negative, or -1 (use system default)"); - } - this.numExecutionRetries = numberOfExecutionRetries; - } - - /** - * Gets the number of times the system will try to re-execute failed tasks. A value - * of {@code -1} indicates that the system default value (as defined in the configuration) - * should be used. + * Sets the restart strategy configuration. This configuration specifies the restart strategy + * to be used by the ExecutionGraph in case of a restart. * - * @return The number of times the system will try to re-execute failed tasks. - */ - public int getNumberOfExecutionRetries() { - return numExecutionRetries; - } - - /** - * Gets the delay of time the system will try to re-execute failed tasks. A value of - * {@code -1} indicates the system default value (as defined in the configuration) - * should be used. - * @return The delay of time in milliseconds the system will try to re-execute failed tasks. + * @param restartStrategyConfiguration Restart strategy configuration to be set */ - public long getExecutionRetryDelay() { - return executionRetryDelay; + public void setRestartStrategyConfiguration(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) { + this.restartStrategyConfiguration = restartStrategyConfiguration; } /** - * Sets the delay that failed tasks are re-executed. A value of zero - * effectively disables fault tolerance. A value of {@code -1} indicates that the system - * default value (as defined in the configuration) should be used. + * Gets the restart strategy configuration * - * @param executionRetryDelay The delay of time the system will wait to re-execute failed tasks. + * @return Restart strategy configuration to be used */ - public void setExecutionRetryDelay(long executionRetryDelay){ - if (executionRetryDelay < -1) { - throw new IllegalArgumentException( - "The delay between reties must be non-negative, or -1 (use system default)"); - } - this.executionRetryDelay = executionRetryDelay; + public RestartStrategies.RestartStrategyConfiguration getRestartStrategyConfiguration() { + return restartStrategyConfiguration; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 78612c0..bd18160 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} import java.util.UUID -import java.util.concurrent.ExecutorService +import java.util.concurrent.{TimeUnit, ExecutorService} import akka.actor.Status.Failure import akka.actor._ @@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.UnrecoverableException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory} import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator @@ -108,13 +109,13 @@ class JobManager( protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, - protected val defaultExecutionRetries: Int, - protected val delayBetweenRetries: Long, + protected val defaultRestartStrategy: RestartStrategy, protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, - protected val savepointStore: SavepointStore) + protected val savepointStore: SavepointStore, + protected val jobRecoveryTimeout: FiniteDuration) extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging with LogMessages // mixin order is important, we want first logging @@ -131,7 +132,7 @@ class JobManager( log.error("Executor could not execute task", t) } }) - + /** Either running or not yet archived jobs (session hasn't been ended). */ protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() @@ -256,7 +257,7 @@ class JobManager( // shut down the extra thread pool for futures executorService.shutdown() - + log.debug(s"Job manager ${self.path} is completely stopped.") } @@ -280,10 +281,13 @@ class JobManager( // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task // managers etc.) if (recoveryMode != RecoveryMode.STANDALONE) { - log.info(s"Delaying recovery of all jobs for $delayBetweenRetries ms.") + log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.") - context.system.scheduler.scheduleOnce(new FiniteDuration(delayBetweenRetries, - MILLISECONDS), self, decorateMessage(RecoverAllJobs))(context.dispatcher) + context.system.scheduler.scheduleOnce( + jobRecoveryTimeout, + self, + decorateMessage(RecoverAllJobs))( + context.dispatcher) } }(context.dispatcher) @@ -903,6 +907,12 @@ class JobManager( throw new JobSubmissionException(jobId, "The given job is empty") } + val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration()) + .map(RestartStrategyFactory.createRestartStrategy(_)) match { + case Some(strategy) => strategy + case None => defaultRestartStrategy + } + // see if there already exists an ExecutionGraph for the corresponding job ID executionGraph = currentJobs.get(jobGraph.getJobID) match { case Some((graph, currentJobInfo)) => @@ -915,6 +925,7 @@ class JobManager( jobGraph.getName, jobGraph.getJobConfiguration, timeout, + restartStrategy, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths, userCodeLoader) @@ -923,22 +934,6 @@ class JobManager( graph } - // configure the execution graph - val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 0) { - jobGraph.getNumberOfExecutionRetries() - } else { - defaultExecutionRetries - } - - val executionRetryDelay = if (jobGraph.getExecutionRetryDelay() >= 0) { - jobGraph.getExecutionRetryDelay() - } - else { - delayBetweenRetries - } - - executionGraph.setNumberOfRetriesLeft(jobNumberRetries) - executionGraph.setDelayBeforeRetrying(executionRetryDelay) executionGraph.setScheduleMode(jobGraph.getScheduleMode()) executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()) @@ -2032,14 +2027,15 @@ object JobManager { InstanceManager, FlinkScheduler, BlobLibraryCacheManager, - Int, // execution retries - Long, // delay between retries + RestartStrategy, FiniteDuration, // timeout Int, // number of archived jobs LeaderElectionService, SubmittedJobGraphStore, CheckpointRecoveryFactory, - SavepointStore) = { + SavepointStore, + FiniteDuration // timeout for job recovery + ) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -2047,35 +2043,12 @@ object JobManager { ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - val executionRetries = configuration.getInteger( - ConfigConstants.EXECUTION_RETRIES_KEY, - ConfigConstants.DEFAULT_EXECUTION_RETRIES) + val restartStrategy = RestartStrategyFactory + .createFromConfig(configuration) val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) - // configure the delay between execution retries. - // unless explicitly specifies, this is dependent on the heartbeat timeout - val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) - val delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, - pauseString) - - val delayBetweenRetries: Long = try { - Duration(delayString).toMillis - } - catch { - case n: NumberFormatException => - if (delayString.equals(pauseString)) { - throw new Exception( - s"Invalid config value for ${ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE}: " + - s"$pauseString. Value must be a valid duration (such as '10 s' or '1 min')") - } else { - throw new Exception( - s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " + - s"$delayString. Value must be a valid duration (such as '100 milli' or '10 s')") - } - } var blobServer: BlobServer = null var instanceManager: InstanceManager = null @@ -2139,18 +2112,33 @@ object JobManager { val savepointStore = SavepointStoreFactory.createFromConfig(configuration) + val jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, ""); + + val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) { + timeout + } else { + try { + FiniteDuration(Duration(jobRecoveryTimeoutStr).toMillis, TimeUnit.MILLISECONDS) + } catch { + case n: NumberFormatException => + throw new Exception( + s"Invalid config value for ${ConfigConstants.RECOVERY_JOB_DELAY}: " + + s"$jobRecoveryTimeoutStr. Value must be a valid duration (such as '10 s' or '1 min')") + } + } + (executorService, instanceManager, scheduler, libraryCacheManager, - executionRetries, - delayBetweenRetries, + restartStrategy, timeout, archiveCount, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore) + savepointStore, + jobRecoveryTimeout) } /** @@ -2206,14 +2194,14 @@ object JobManager { instanceManager, scheduler, libraryCacheManager, - executionRetries, - delayBetweenRetries, + restartStrategy, timeout, archiveCount, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore) = createJobManagerComponents( + savepointStore, + jobRecoveryTimeout) = createJobManagerComponents( configuration, None) @@ -2233,13 +2221,13 @@ object JobManager { scheduler, libraryCacheManager, archive, - executionRetries, - delayBetweenRetries, + restartStrategy, timeout, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore) + savepointStore, + jobRecoveryTimeout) val jobManager: ActorRef = jobMangerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index e921e92..26a626e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; @@ -43,14 +44,15 @@ public class ExecutionGraphCheckpointCoordinatorTest { @Test public void testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws Exception { ExecutionGraph executionGraph = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - new JobID(), - "test", - new Configuration(), - new FiniteDuration(1, TimeUnit.DAYS), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - ClassLoader.getSystemClassLoader()); + TestingUtils.defaultExecutionContext(), + new JobID(), + "test", + new Configuration(), + new FiniteDuration(1, TimeUnit.DAYS), + new NoRestartStrategy(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + ClassLoader.getSystemClassLoader()); ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index a47ea77..2ca51db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; @@ -105,7 +106,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -148,7 +150,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -214,7 +217,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -467,7 +471,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -522,7 +527,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -582,7 +588,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -626,7 +633,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); @@ -696,7 +704,8 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); // check the v1 / v2 co location hints ( assumes parallelism(v1) >= parallelism(v2) ) http://git-wip-us.apache.org/repos/asf/flink/blob/5eae47f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index bea7c22..9221bda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -83,7 +84,8 @@ public class ExecutionGraphDeploymentTest { jobId, "some job", new Configuration(), - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4); @@ -285,7 +287,8 @@ public class ExecutionGraphDeploymentTest { jobId, "some job", new Configuration(), - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); eg.setQueuedSchedulingAllowed(false); List<JobVertex> ordered = Arrays.asList(v1, v2); @@ -325,4 +328,4 @@ public class ExecutionGraphDeploymentTest { throw new Exception(); } } -} \ No newline at end of file +}