This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02d30ace69dc18555a5085eccf70ee884e73a16e
Author: Robert Metzger <rmetz...@apache.org>
AuthorDate: Fri May 7 08:35:04 2021 +0200

    [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting 
state.
    
    The Canceling state of Adaptive Scheduler was expecting the ExecutionGraph 
to be in state RUNNING when entering the state.
    However, the Restarting state is cancelling the ExecutionGraph already, 
thus the ExectionGraph can be in state CANCELING or CANCELED when entering the 
Canceling state.
    
    Calling the ExecutionGraph.cancel() method in the Canceling state while 
being in ExecutionGraph.state = CANCELED || CANCELLED is not a problem.
    
    The change is guarded by a new ITCase, as this issue affects the interplay 
between different AS states.
    
    This closes #15882
---
 .../adaptive/StateWithExecutionGraph.java          |  2 -
 .../adaptive/AdaptiveSchedulerSimpleITCase.java    | 44 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 9962c78..91d688f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -91,8 +91,6 @@ abstract class StateWithExecutionGraph implements State {
         this.operatorCoordinatorHandler = operatorCoordinatorHandler;
         this.kvStateHandler = new KvStateHandler(executionGraph);
         this.logger = logger;
-        Preconditions.checkState(
-                executionGraph.getState() == JobStatus.RUNNING, "Assuming 
running execution graph");
 
         FutureUtils.assertNoException(
                 executionGraph
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
index 9280dbc..992d2c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -43,6 +46,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 
 import static org.junit.Assert.assertTrue;
 
@@ -105,6 +109,34 @@ public class AdaptiveSchedulerSimpleITCase extends 
TestLogger {
     }
 
     @Test
+    public void testJobCancellationWhileRestartingSucceeds() throws Exception {
+        final long timeInRestartingState = 10000L;
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobVertex alwaysFailingOperator = new JobVertex("Always failing 
operator");
+        alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
+        alwaysFailingOperator.setParallelism(1);
+
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        // configure a high delay between attempts: We'll stay in RESTARTING 
for 10 seconds.
+        executionConfig.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
timeInRestartingState));
+        jobGraph.setExecutionConfig(executionConfig);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        // wait until we are in RESTARTING state
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == 
JobStatus.RESTARTING,
+                Deadline.fromNow(Duration.of(timeInRestartingState, 
ChronoUnit.MILLIS)),
+                5);
+
+        // now cancel while in RESTARTING state
+        miniCluster.cancelJob(jobGraph.getJobID()).get();
+    }
+
+    @Test
     public void testGlobalFailoverIfTaskFails() throws Throwable {
         final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
         final JobGraph jobGraph = createOnceFailingJobGraph();
@@ -160,4 +192,16 @@ public class AdaptiveSchedulerSimpleITCase extends 
TestLogger {
             hasFailed = false;
         }
     }
+
+    /** Always failing {@link AbstractInvokable}. */
+    public static final class AlwaysFailingInvokable extends AbstractInvokable 
{
+        public AlwaysFailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            throw new FlinkRuntimeException("Test failure.");
+        }
+    }
 }

Reply via email to