[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6283


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202333518
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -567,22 +560,16 @@ private void configureCheckpointing() {
 
long interval = cfg.getCheckpointInterval();
if (interval > 0) {
-
ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
// propagate the expected behaviour for checkpoint 
errors to task.

executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-
-   // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy
-   if (executionConfig.getRestartStrategy() == null) {
-   // if the user enabled checkpointing, the 
default number of exec retries is infinite.
-   executionConfig.setRestartStrategy(
-   
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
-   }
} else {
// interval of max value means disable periodic 
checkpoint
interval = Long.MAX_VALUE;
}
 
+
+
--- End diff --

Remove two line breaks


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202320783
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ---
@@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws 
Exception {
}
}
 
+   /**
+* Tests that in a streaming use case where checkpointing is enabled, a
+* fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
+* strategy has been specified.
+*/
+   @Test
+   public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+   // create savepoint data
+   final long savepointId = 42L;
+   final File savepointFile = createSavepoint(savepointId);
+
+   // set savepoint settings
+   final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+   savepointFile.getAbsolutePath(),
+   true);
+   final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+   final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+   final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+   completedCheckpointStore,
+   new StandaloneCheckpointIDCounter());
+   
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build());
--- End diff --

This was the problem with wrongly handling default value in 
`RestartStrategyFactory`. Fixed now.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202319384
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -339,12 +339,28 @@ public void 
setSnapshotSettings(JobCheckpointingSettings settings) {
 * Gets the settings for asynchronous snapshots. This method returns 
null, when
 * checkpointing is not enabled.
 *
-* @return The snapshot settings, or null, if checkpointing is not 
enabled.
+* @return The snapshot settings
 */
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
 
+   /**
+* Checks if the checkpointing was enabled for this job graph
+*
+* @return true if checkpointing enabled
+*/
+   public boolean isCheckpointingEnabled() {
+
+   if (snapshotSettings == null) {
+   return false;
+   }
+
+   long checkpointInterval = 
snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+   return checkpointInterval > 0 &&
+   checkpointInterval < Long.MAX_VALUE;
--- End diff --

I don't think it is true (about the checkpoint enabling). I thought the 
same based on some javadocs, but it turned out that `snapshotSetting` is always 
set in 
`org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#configureCheckpointing`.
That's why I added this method.

The problem with the second method is that the `CheckpointCoordinator` is 
created while constructing `ExecutionGraph` which requires the restartstrategy. 
I thought adding this method was the least invasive one.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202311125
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 ---
@@ -149,7 +149,7 @@ public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration
}
 
// fallback in case of an error
-   return 
NoRestartStrategy.createFactory(configuration);
+   return 
NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration);
--- End diff --

Don't know why, but assumed the `default` branch is reached in case nothing 
was set in config. My mistake.

I've fixed it to differentiate the situation when `"none"` was set (this 
value is used across documentation, I think it should translate directly to 
`NoRestart`) and when the config was not set at all.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202297327
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 ---
@@ -149,7 +149,7 @@ public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration
}
 
// fallback in case of an error
-   return 
NoRestartStrategy.createFactory(configuration);
+   return 
NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration);
--- End diff --

I think we should also create this factory if the `restart-strategy` 
configuration value is `"non"` which is the default value.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202298725
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ---
@@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws 
Exception {
}
}
 
+   /**
+* Tests that in a streaming use case where checkpointing is enabled, a
+* fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
+* strategy has been specified.
+*/
+   @Test
+   public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+   // create savepoint data
+   final long savepointId = 42L;
+   final File savepointFile = createSavepoint(savepointId);
+
+   // set savepoint settings
+   final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+   savepointFile.getAbsolutePath(),
+   true);
+   final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+   final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+   final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+   completedCheckpointStore,
+   new StandaloneCheckpointIDCounter());
+   
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build());
--- End diff --

Changing this line into
```
new TestingJobManagerSharedServicesBuilder()

.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
.build()
```
Will make the test fail.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202297182
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategy.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+/**
+ * Default restart strategy that resolves either to {@link 
NoRestartStrategy} or {@link FixedDelayRestartStrategy}
+ * depending if checkpointing was enabled.
+ */
+public class NoOrFixedIfCheckpointingEnabledRestartStrategy implements 
RestartStrategy {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   private final RestartStrategy resolvedStrategy;
+
+   /**
+* Creates a NoOrFixedIfCheckpointingEnabledRestartStrategyFactory 
instance.
+*
+* @param configuration Configuration object which is ignored
+* @return NoOrFixedIfCheckpointingEnabledRestartStrategyFactory 
instance
+*/
+   public static NoOrFixedIfCheckpointingEnabledRestartStrategyFactory 
createFactory(Configuration configuration) {
+   return new 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
+   }
+
+   /**
+* Creates instance of NoOrFixedIfCheckpointingEnabledRestartStrategy
+*
+* @param isCheckpointingEnabled if true resolves to {@link 
FixedDelayRestartStrategy}
+* otherwise to {@link NoRestartStrategy}
+*/
+   public NoOrFixedIfCheckpointingEnabledRestartStrategy(boolean 
isCheckpointingEnabled) {
+   if (isCheckpointingEnabled) {
+   resolvedStrategy = new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY);
+   } else {
+   resolvedStrategy = new NoRestartStrategy();
+   }
+   }
+
+   @Override
+   public boolean canRestart() {
+   return resolvedStrategy.canRestart();
+   }
+
+   @Override
+   public void restart(RestartCallback restarter, ScheduledExecutor 
executor) {
+   resolvedStrategy.restart(restarter, executor);
+   }
+
+   public static class 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends 
RestartStrategyFactory {
--- End diff --

Wouldn't it be enough to only have this restart strategy factory without 
the corresponding `RestartStrategy`? We could instantiate the respective 
strategies in the `createRestartStrategy(boolean isCheckpointingEnabled)` 
method.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202304777
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -339,12 +339,28 @@ public void 
setSnapshotSettings(JobCheckpointingSettings settings) {
 * Gets the settings for asynchronous snapshots. This method returns 
null, when
 * checkpointing is not enabled.
 *
-* @return The snapshot settings, or null, if checkpointing is not 
enabled.
+* @return The snapshot settings
 */
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
 
+   /**
+* Checks if the checkpointing was enabled for this job graph
+*
+* @return true if checkpointing enabled
+*/
+   public boolean isCheckpointingEnabled() {
+
+   if (snapshotSettings == null) {
+   return false;
+   }
+
+   long checkpointInterval = 
snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+   return checkpointInterval > 0 &&
+   checkpointInterval < Long.MAX_VALUE;
--- End diff --

I think technically, we enable checkpointing, meaning creating a 
`CheckpointCoordinator`, always iff `snapshotSettings != null`. We could also 
say that we check the `CheckpointCoordinator.isPeriodicCheckpointingConfigured` 
in order to decide whether checkpointing is enabled. Then we would not need to 
introduce this method which could go out of sync with how we define whether 
checkpointing is enabled or not. What do you think?


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202103545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

You're right. I'm just wondering whether you ever want to enable 
checkpointing without a restart strategy. So to speak if you set 
`FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as 
the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or 
`NoRestartStrategy`?

On the other hand you might want to disable restarting for all jobs running 
on your cluster by setting the restart strategy to `NoRestartStrategy`.

Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` 
to `FallbackRestartStrategy` and introduce a new default server side restart 
strategy `NoOrFixedIfCheckpointingEnabled` which resolved to 
`FixedRestartStrategy` if checkpointing is enabled and if not it resolves to 
`NoRestartStrategy`.

What do you think?


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202098798
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

Right now null is a bit different than `FallbackRestartStrategy`. 
* null - allows fallback to `FixedRestartStrategy` in case of checkpointing 
enabled and `noRestart` was set on server-side
* `FallbackRestartStrategy` - always the server-side strategy is used 
(indifferent to checkpointing)

If we by default set the `FallbackStrategy` we have two options:
 * we either always set `FixedRestartStrategy` if checkpointing is enabled 
and `noRestart` was set on server side
* we never automatically fallback to `FixedRestartStrategy`, even in case 
of checkpointing.

What do you think would be better option? Keep the null, always fallback to 
`FixedRestartStrategy` or never fallback to it?


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202065050
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest {
--- End diff --

Test classes should extend from `TestLogger` to give better test logging 
output separation.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202065356
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest {
+
+   @Test
+   public void testClientSideHighestPriority() {
+
+   RestartStrategy resolvedStrategy = 
RestartStrategyResolving.resolve(noRestart(),
+   new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
+   true);
+
+   assertTrue(resolvedStrategy instanceof NoRestartStrategy);
--- End diff --

For the future I would suggest to use Hamcrest matchers, because they give 
better failure messages and are more expressive.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202070020
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
 ---
@@ -33,12 +33,11 @@
 public class RestartStrategyTest extends TestLogger {
 
/**
-* Tests that in a streaming use case where checkpointing is enabled, a
-* fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
-* strategy has been specified.
+* Tests that in a streaming use case where checkpointing is enabled, 
there is no default strategy set on the
+* client side.
 */
@Test
-   public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+   public void testNoDefaultStrategyOnClientSideWhenCheckpointing() throws 
Exception {
--- End diff --

Maybe `testNoDefaultStrategyOnClientSideWhenCheckpointingEnabled`


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202070710
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

Should we maybe set the `FallbackRestartStrategyConfiguration` per default 
in the `ExecutionConfig`? That way, we could also simplify the resolve code.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202013103
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   /**
+* Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+* The resolving strategy is as follows:
+* 
+* Strategy set within job graph.
+* Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing is enabled.
+* If no strategy was set on client and server side and 
checkpointing was enabled then {@link FixedDelayRestartStrategy} is used
+* 
+*
+* @param clientConfigurationrestart configuration given within the 
job graph
+* @param serverStrategyFactory  default server side strategy factory
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return resolved strategy
+*/
+   public static RestartStrategy resolve(
+   @Nullable 
RestartStrategies.RestartStrategyConfiguration clientConfiguration,
+   RestartStrategyFactory serverStrategyFactory,
+   boolean isCheckpointingEnabled) {
+
+   final RestartStrategy serverSideRestartStrategy = 
serverStrategyFactory.createRestartStrategy();
+
+   RestartStrategy clientSideRestartStrategy = null;
--- End diff --

could be `final`


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202071547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   /**
+* Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+* The resolving strategy is as follows:
+* 
+* Strategy set within job graph.
+* Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing is enabled.
+* If no strategy was set on client and server side and 
checkpointing was enabled then {@link FixedDelayRestartStrategy} is used
+* 
+*
+* @param clientConfigurationrestart configuration given within the 
job graph
+* @param serverStrategyFactory  default server side strategy factory
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return resolved strategy
+*/
+   public static RestartStrategy resolve(
+   @Nullable 
RestartStrategies.RestartStrategyConfiguration clientConfiguration,
--- End diff --

By setting the default restart strategy to 
`FallbackRestartStrategyConfiguration` in the `ExecutionConfig` we could remove 
the `@Nullable` annotation here and simplify the code by avoiding the null 
checks.


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202012704
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   /**
+* Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+* The resolving strategy is as follows:
+* 
+* Strategy set within job graph.
+* Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing is enabled.
+* If no strategy was set on client and server side and 
checkpointing was enabled then {@link FixedDelayRestartStrategy} is used
+* 
+*
+* @param clientConfigurationrestart configuration given within the 
job graph
+* @param serverStrategyFactory  default server side strategy factory
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
--- End diff --

Please don't align the java doc strings. The problem is whenever someone 
changes the names of the parameters, he will be tempted to also correct the 
then wrong indentation which is unnecessary work. 


---


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-09 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6283

[FLINK-9143] Use cluster strategy if none was set on client side

## What is the purpose of the change

The goal of this PR is to enable configuring default restart strategy from 
the server side's config.


## Brief change log

  * no strategy is set on the client side if none explicitly specified
  * on server side the strategy is resolved based on: client configuration, 
server side configuration, fallback to `FixedDelayStrategy` if none set on 
client side and `NoRestartStrategy` set on server side in case of checkpointing 
enabled

## Verifying this change

This change added tests and can be verified as follows:
  - RestartStrategyResolvingTest.java
  - tests using cluster pass


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-9143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6283


commit 8efded9c1e1a555edd7733b35d9f1f49f8cc7304
Author: Dawid Wysakowicz 
Date:   2018-07-05T11:48:23Z

[FLINK-9143] Use cluster strategy if none was set on client side




---