dmvk commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1168855893
##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
/** Tests that a rescaled job graph will be recovered with the latest
parallelism. */
@Test
void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path
tmpFolder) throws Exception {
+ final Configuration configuration = new Configuration();
+
+ // Only run the test if AdaptiveScheduler is enabled.
+
assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
final JobVertex jobVertex = new JobVertex("operator");
jobVertex.setParallelism(1);
jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
final JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(jobVertex);
final JobID jobId = jobGraph.getJobID();
- final Configuration configuration = new Configuration();
+ // We need to have a restart strategy set, to prevent the job from
failing during the first
+ // cluster shutdown when TM disconnects.
+ configuration.set(RestartStrategyOptions.RESTART_STRATEGY,
"fixed-delay");
+ configuration.set(
+ RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
Integer.MAX_VALUE);
+ configuration.set(
+ RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ofMillis(100));
- configuration.set(JobManagerOptions.SCHEDULER,
JobManagerOptions.SchedulerType.Adaptive);
Review Comment:
This is meant in combination with:
```
assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
```
which ensures this is already set
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]