[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542820#comment-16542820 ]
ASF GitHub Bot commented on FLINK-9143: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202297182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategy.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +/** + * Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy} + * depending if checkpointing was enabled. + */ +public class NoOrFixedIfCheckpointingEnabledRestartStrategy implements RestartStrategy { + + private static final long DEFAULT_RESTART_DELAY = 0; + + private final RestartStrategy resolvedStrategy; + + /** + * Creates a NoOrFixedIfCheckpointingEnabledRestartStrategyFactory instance. + * + * @param configuration Configuration object which is ignored + * @return NoOrFixedIfCheckpointingEnabledRestartStrategyFactory instance + */ + public static NoOrFixedIfCheckpointingEnabledRestartStrategyFactory createFactory(Configuration configuration) { + return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(); + } + + /** + * Creates instance of NoOrFixedIfCheckpointingEnabledRestartStrategy + * + * @param isCheckpointingEnabled if true resolves to {@link FixedDelayRestartStrategy} + * otherwise to {@link NoRestartStrategy} + */ + public NoOrFixedIfCheckpointingEnabledRestartStrategy(boolean isCheckpointingEnabled) { + if (isCheckpointingEnabled) { + resolvedStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY); + } else { + resolvedStrategy = new NoRestartStrategy(); + } + } + + @Override + public boolean canRestart() { + return resolvedStrategy.canRestart(); + } + + @Override + public void restart(RestartCallback restarter, ScheduledExecutor executor) { + resolvedStrategy.restart(restarter, executor); + } + + public static class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory { --- End diff -- Wouldn't it be enough to only have this restart strategy factory without the corresponding `RestartStrategy`? We could instantiate the respective strategies in the `createRestartStrategy(boolean isCheckpointingEnabled)` method. > Restart strategy defined in flink-conf.yaml is ignored > ------------------------------------------------------ > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration > Affects Versions: 1.4.2 > Reporter: Alex Smirnov > Assignee: yuqi > Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream<String> stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction<String, String>(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (10000 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)