tillrohrmann commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r577607349



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -731,6 +731,29 @@ public void goToFailing(
                         failureCause));
     }
 
+    @Override
+    public void goToStopWithSavepoint(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        transitionToState(
+                new StopWithSavepoint(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        userCodeClassLoader,
+                        checkpointCoordinator,

Review comment:
       For what do we need the `checkpointCoordinator`? Isn't it already 
included in the `ExecutionGraph`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java
##########
@@ -141,29 +100,54 @@ public void notifyNewResourcesAvailable() {
         }
     }
 
-    /** Context of the {@link Executing} state. */
-    interface Context extends StateWithExecutionGraph.Context {
+    CompletableFuture<String> stopWithSavepoint(
+            String targetDirectory, boolean advanceToEndOfEventTime) {

Review comment:
       `@Nullable` seems to be missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,
+                                context.getMainThreadExecutor());
+
+        FutureUtils.forward(savepointFuture, operationCompletionFuture);
+    }
+
+    private String handleSavepointOperationResult(
+            CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+        if (throwable != null) {
+            context.runIfState(this, () -> handleAnyFailure(throwable));
+            throw new SavepointCreationException(
+                    "Unable to stop with savepoint. Error while creating the 
savepoint.",
+                    throwable);
+        }
+        return completedCheckpoint.getExternalPointer();
+    }
+
+    private String handleStopWithSavepointOperationResult(
+            JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+        // filter out exception from first stage to show correct root cause
+        if (throwable != null) {
+            if (throwable instanceof SavepointCreationException) {
+                throw (SavepointCreationException) throwable;
+            } else {
+                throw new CompletionException(
+                        "Error while stopping job after creating savepoint", 
throwable);
+            }
+        }
+
+        if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+            // ensure that all vertices from the execution graph are in state 
FINISHED. There might
+            // be cases where only some subgraphs of the job are finished.
+            if (!areAllVerticesFinished()) {
+                if (getLogger().isDebugEnabled()) {
+                    getLogger().debug("Printing vertex states:");
+                    for (Execution e : 
getExecutionGraph().getRegisteredExecutions().values()) {
+                        getLogger().debug("{} = {}", e.getVertex(), 
e.getState());
+                    }
+                }
+                final FlinkException exception =
+                        new FlinkException(
+                                "Not all vertices of job are finished. Stop 
with savepoint operation failed.");
+                context.runIfState(this, () -> handleAnyFailure(exception));
+                throw new CompletionException("Unable to stop with savepoint", 
exception);
+            }
+            context.runIfState(
+                    this,
+                    () ->
+                            context.goToFinished(
+                                    
ArchivedExecutionGraph.createFrom(getExecutionGraph())));
+            return statusAndPath.getPath();
+        } else {
+            throw new CompletionException(
+                    new FlinkException(
+                            "Reached state "
+                                    + statusAndPath.getStatus()
+                                    + " instead of FINISHED "));
+        }
+    }
+
+    private boolean areAllVerticesFinished() {
+        return getExecutionGraph().getRegisteredExecutions().values().stream()
+                .noneMatch(e -> e.getState() != ExecutionState.FINISHED);
+    }
+
+    @Override
+    protected void handleAnyFailure(@Nonnull Throwable cause) {
+        Preconditions.checkState(context.isState(this), "Assuming 
StopWithSavepoint state");
+        // restart the checkpoint coordinator if stopWithSavepoint failed.
+        startCheckpointScheduler(checkpointCoordinator);

Review comment:
       We could say that we do this whenever we leave this state in any other 
state than `Finished`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,
+                                context.getMainThreadExecutor());
+
+        FutureUtils.forward(savepointFuture, operationCompletionFuture);

Review comment:
       Instead of using future callbacks, the `StopWithSavepoint` state could 
have small internal state machine which reacts to signals. Two signals we need 
to handle are the terminal job status and the savepoint status. The former is 
already available via `onGloballyTerminalState`. The latter could be 
implemented by having `onSavepointSuccess(String path)` and a 
`onSavepointFailure(Throwable cause)`. That way we wouldn't have to deal with 
nested future callbacks. Maybe we could even reuse the improvements from 
FLINK-21030.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+    }
+
+    /** Expected behavior is that the job fails. */
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        try {
+            
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("CheckpointException"));
+        }
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnStop() throws Exception {
+        try {
+            testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Reached state FAILED 
instead of FINISHED"));
+        }
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+        env.setParallelism(PARALLELISM);
+
+        env.addSource(new 
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+                .addSink(new DiscardingSink<>());
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        DummySource.resetForParallelism(PARALLELISM);
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        try {
+            client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+            fail("Expect failure of operation");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("CheckpointException"));
+        }
+
+        // trigger second savepoint
+        DummySource.awaitRunning();
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+    }
+
+    /** Tests the stop with savepoint operation */
+    private void testStopWithSavepoint(StopWithSavepointTestBehavior behavior) 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        if (behavior == 
StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY) {
+            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));

Review comment:
       Are we calling `testStopWithSavepoint` with 
`FAIL_ON_FIRST_CHECKPOINT_ONLY`?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+    }
+
+    /** Expected behavior is that the job fails. */
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        try {
+            
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("CheckpointException"));
+        }
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnStop() throws Exception {
+        try {
+            testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Reached state FAILED 
instead of FINISHED"));
+        }
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+        env.setParallelism(PARALLELISM);
+
+        env.addSource(new 
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+                .addSink(new DiscardingSink<>());
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        DummySource.resetForParallelism(PARALLELISM);
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        try {
+            client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+            fail("Expect failure of operation");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("CheckpointException"));
+        }
+
+        // trigger second savepoint
+        DummySource.awaitRunning();
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+    }
+
+    /** Tests the stop with savepoint operation */
+    private void testStopWithSavepoint(StopWithSavepointTestBehavior behavior) 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        if (behavior == 
StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY) {
+            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+        } else {
+            env.setRestartStrategy(RestartStrategies.noRestart());
+        }
+        env.setParallelism(PARALLELISM);
+
+        env.addSource(new DummySource(behavior)).addSink(new 
DiscardingSink<>());
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+    }
+
+    private static final class DummySource extends 
RichParallelSourceFunction<Integer>
+            implements CheckpointedFunction, CheckpointListener {
+        private final StopWithSavepointTestBehavior behavior;
+        private volatile boolean running = true;
+        private static CountDownLatch instancesRunning;
+        private volatile boolean checkpointComplete = false;
+
+        public DummySource(StopWithSavepointTestBehavior behavior) {
+            this.behavior = behavior;
+        }
+
+        private static void resetForParallelism(int para) {
+            instancesRunning = new CountDownLatch(para);
+        }
+
+        private static void awaitRunning() throws InterruptedException {
+            Preconditions.checkNotNull(instancesRunning);
+            instancesRunning.await();
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            Preconditions.checkNotNull(instancesRunning);
+            instancesRunning.countDown();
+            int i = Integer.MIN_VALUE;
+            while (running) {
+                Thread.sleep(10L);
+                ctx.collect(i++);
+            }

Review comment:
       I think this should be executed under the checkpoint lock.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,

Review comment:
       E.g. we could have methods for stopping & starting the `CC` and to 
create a savepoint.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {

Review comment:
       I am not sure whether introducing `ExecutingStateWithFailureHandler` is 
really helpful here. I think it causes us to overlook how to handle certain 
signals. For example, when `cancel` is called, then we should fail the 
savepoint operation and complete `operationCompletionFuture` exceptionally. The 
same actually applies to `suspend`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java
##########
@@ -141,29 +100,54 @@ public void notifyNewResourcesAvailable() {
         }
     }
 
-    /** Context of the {@link Executing} state. */
-    interface Context extends StateWithExecutionGraph.Context {
+    CompletableFuture<String> stopWithSavepoint(
+            String targetDirectory, boolean advanceToEndOfEventTime) {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        final CheckpointCoordinator checkpointCoordinator =
+                executionGraph.getCheckpointCoordinator();
+
+        // check if stop with savepoint is possible:
+        if (checkpointCoordinator == null) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            String.format(
+                                    "Job %s is not a streaming job.", 
executionGraph.getJobID())));
+        }
 
-        /**
-         * Transitions into the {@link Canceling} state.
-         *
-         * @param executionGraph executionGraph to pass to the {@link 
Canceling} state
-         * @param executionGraphHandler executionGraphHandler to pass to the 
{@link Canceling} state
-         * @param operatorCoordinatorHandler operatorCoordinatorHandler to 
pass to the {@link
-         *     Canceling} state
-         */
-        void goToCanceling(
-                ExecutionGraph executionGraph,
-                ExecutionGraphHandler executionGraphHandler,
-                OperatorCoordinatorHandler operatorCoordinatorHandler);
+        if (targetDirectory == null
+                && 
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
+            getLogger()
+                    .info(
+                            "Trying to cancel job {} with savepoint, but no 
savepoint directory configured.",
+                            executionGraph.getJobID());
+
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            "No savepoint directory configured. You can either 
specify a directory "
+                                    + "while cancelling via -s 
:targetDirectory or configure a cluster-wide "
+                                    + "default via key '"
+                                    + 
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+                                    + "'."));
+        }
 
-        /**
-         * Asks how to handle the failure.
-         *
-         * @param failure failure describing the failure cause
-         * @return {@link FailureResult} which describes how to handle the 
failure
-         */
-        FailureResult howToHandleFailure(Throwable failure);
+        getLogger().info("Triggering stop-with-savepoint for job {}.", 
executionGraph.getJobID());
+
+        CompletableFuture<String> operationCompletionFuture = new 
CompletableFuture<>();

Review comment:
       Maybe instead of creating this future, we could define 
`goToStopWithSavepoint()` to return this future. That way we would no have to 
pass the future into the `StopWithSavepoint` state but could make it directly 
part of it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.

Review comment:
       Let's not try to rely on things which are far away from this class. It 
is much easier to maintain something if things are close to each other. 
Concretely, the `StopWithSavepoint` stops the `CheckpointCoordinator` and hence 
it should be responsible for activating it again. If we don't do this, then 
changes to the `CheckpointCoordinatorDeActivator` will surprisingly also affect 
this class.
   
   Having to write an in-code comment explaining this contract should be a good 
indicator for such a problem.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+    /** Test failure during savepoint creation: go to failing */
+    @Test
+    public void testFailureDuringSavepointCreationWithNoRestart() throws 
Exception {
+        MockExecutingStateWithFailureHandlerContext ctx =
+                new MockExecutingStateWithFailureHandlerContext();
+
+        StopWithSavepointEnvironment stopWithSavepointEnv = 
createStopWithSavepointEnvironment(ctx);
+        StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+        stopWithSavepoint.onEnter();
+
+        
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+        // fail future returned by the CheckpointCoordinator
+        stopWithSavepointEnv
+                .getSavepointFuture()
+                .completeExceptionally(new RuntimeException("Savepoint 
creation failed"));
+
+        
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);

Review comment:
       Why is this necessary? A savepoint failure should not require the EG to 
reach a globally terminal state before we can recover?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,
+                                context.getMainThreadExecutor());
+
+        FutureUtils.forward(savepointFuture, operationCompletionFuture);
+    }
+
+    private String handleSavepointOperationResult(
+            CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+        if (throwable != null) {
+            context.runIfState(this, () -> handleAnyFailure(throwable));
+            throw new SavepointCreationException(
+                    "Unable to stop with savepoint. Error while creating the 
savepoint.",
+                    throwable);
+        }
+        return completedCheckpoint.getExternalPointer();
+    }
+
+    private String handleStopWithSavepointOperationResult(
+            JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+        // filter out exception from first stage to show correct root cause
+        if (throwable != null) {
+            if (throwable instanceof SavepointCreationException) {
+                throw (SavepointCreationException) throwable;
+            } else {
+                throw new CompletionException(
+                        "Error while stopping job after creating savepoint", 
throwable);
+            }
+        }
+
+        if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+            // ensure that all vertices from the execution graph are in state 
FINISHED. There might
+            // be cases where only some subgraphs of the job are finished.
+            if (!areAllVerticesFinished()) {
+                if (getLogger().isDebugEnabled()) {
+                    getLogger().debug("Printing vertex states:");
+                    for (Execution e : 
getExecutionGraph().getRegisteredExecutions().values()) {
+                        getLogger().debug("{} = {}", e.getVertex(), 
e.getState());
+                    }
+                }
+                final FlinkException exception =
+                        new FlinkException(
+                                "Not all vertices of job are finished. Stop 
with savepoint operation failed.");
+                context.runIfState(this, () -> handleAnyFailure(exception));
+                throw new CompletionException("Unable to stop with savepoint", 
exception);
+            }
+            context.runIfState(
+                    this,
+                    () ->
+                            context.goToFinished(
+                                    
ArchivedExecutionGraph.createFrom(getExecutionGraph())));
+            return statusAndPath.getPath();
+        } else {
+            throw new CompletionException(
+                    new FlinkException(
+                            "Reached state "
+                                    + statusAndPath.getStatus()
+                                    + " instead of FINISHED "));
+        }
+    }
+
+    private boolean areAllVerticesFinished() {
+        return getExecutionGraph().getRegisteredExecutions().values().stream()
+                .noneMatch(e -> e.getState() != ExecutionState.FINISHED);
+    }
+
+    @Override
+    protected void handleAnyFailure(@Nonnull Throwable cause) {
+        Preconditions.checkState(context.isState(this), "Assuming 
StopWithSavepoint state");
+        // restart the checkpoint coordinator if stopWithSavepoint failed.
+        startCheckpointScheduler(checkpointCoordinator);
+
+        if (getLogger().isDebugEnabled()) {
+            getLogger()
+                    .debug(
+                            "Restarting or Failing job because of failure 
while stopping with savepoint.",
+                            cause);
+        } else {
+            getLogger()
+                    .info(
+                            "Restarting or Failing job because of failure 
while stopping with savepoint. Reason: {}",
+                            cause.getMessage());
+        }
+        super.handleAnyFailure(cause);
+    }
+
+    @Override
+    void onGloballyTerminalState(JobStatus globallyTerminalState) {
+        // handled already

Review comment:
       This should be an indicator that we now have different control paths for 
one and the same thing. This is usually not ideal and makes maintenance harder.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,
+                                context.getMainThreadExecutor());
+
+        FutureUtils.forward(savepointFuture, operationCompletionFuture);
+    }
+
+    private String handleSavepointOperationResult(
+            CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+        if (throwable != null) {
+            context.runIfState(this, () -> handleAnyFailure(throwable));
+            throw new SavepointCreationException(
+                    "Unable to stop with savepoint. Error while creating the 
savepoint.",
+                    throwable);
+        }
+        return completedCheckpoint.getExternalPointer();
+    }
+
+    private String handleStopWithSavepointOperationResult(
+            JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+        // filter out exception from first stage to show correct root cause
+        if (throwable != null) {
+            if (throwable instanceof SavepointCreationException) {
+                throw (SavepointCreationException) throwable;
+            } else {
+                throw new CompletionException(
+                        "Error while stopping job after creating savepoint", 
throwable);
+            }
+        }
+
+        if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+            // ensure that all vertices from the execution graph are in state 
FINISHED. There might
+            // be cases where only some subgraphs of the job are finished.
+            if (!areAllVerticesFinished()) {
+                if (getLogger().isDebugEnabled()) {
+                    getLogger().debug("Printing vertex states:");
+                    for (Execution e : 
getExecutionGraph().getRegisteredExecutions().values()) {
+                        getLogger().debug("{} = {}", e.getVertex(), 
e.getState());
+                    }
+                }
+                final FlinkException exception =
+                        new FlinkException(
+                                "Not all vertices of job are finished. Stop 
with savepoint operation failed.");
+                context.runIfState(this, () -> handleAnyFailure(exception));
+                throw new CompletionException("Unable to stop with savepoint", 
exception);
+            }
+            context.runIfState(
+                    this,
+                    () ->
+                            context.goToFinished(
+                                    
ArchivedExecutionGraph.createFrom(getExecutionGraph())));
+            return statusAndPath.getPath();
+        } else {
+            throw new CompletionException(
+                    new FlinkException(
+                            "Reached state "
+                                    + statusAndPath.getStatus()
+                                    + " instead of FINISHED "));
+        }
+    }
+
+    private boolean areAllVerticesFinished() {
+        return getExecutionGraph().getRegisteredExecutions().values().stream()
+                .noneMatch(e -> e.getState() != ExecutionState.FINISHED);
+    }
+
+    @Override
+    protected void handleAnyFailure(@Nonnull Throwable cause) {
+        Preconditions.checkState(context.isState(this), "Assuming 
StopWithSavepoint state");
+        // restart the checkpoint coordinator if stopWithSavepoint failed.
+        startCheckpointScheduler(checkpointCoordinator);
+
+        if (getLogger().isDebugEnabled()) {
+            getLogger()
+                    .debug(
+                            "Restarting or Failing job because of failure 
while stopping with savepoint.",
+                            cause);
+        } else {
+            getLogger()
+                    .info(
+                            "Restarting or Failing job because of failure 
while stopping with savepoint. Reason: {}",
+                            cause.getMessage());
+        }
+        super.handleAnyFailure(cause);

Review comment:
       This is a bit of personal taste but I find it hard to see what happens 
in case of a failure in the state `StopWithSavepoint`. If we didn't use the 
`ExecutingStateWithFailureHandler` and instead have an explicit 
`context.goToRestarting` then it would be a bit clearer. It currently works 
because all failures are handled as global failovers. But once this changes, 
this class will break because we want to fail with global failovers explicitly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,

Review comment:
       Why are we handling the final job result by accessing the 
`getTerminationFuture`? Don't we already have `onGloballyTerminalState` which 
tells us if the `ExecutionGraph` has reached a globally terminal state?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,
+                                context.getMainThreadExecutor());
+
+        FutureUtils.forward(savepointFuture, operationCompletionFuture);
+    }
+
+    private String handleSavepointOperationResult(
+            CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+        if (throwable != null) {
+            context.runIfState(this, () -> handleAnyFailure(throwable));
+            throw new SavepointCreationException(
+                    "Unable to stop with savepoint. Error while creating the 
savepoint.",
+                    throwable);

Review comment:
       Handling the failure and sending the result to the user happens here in 
two places. The latter is even obfuscated a bit through throwing an exception 
which is forwarded to the result future at some other place. This is a rather 
complicated control flow. Couldn't we do this in the failure handling logic?
   
   Are we treating savepoint failures as global failures? It looks a bit like 
this. I think this is wrong. A savepoint failure should only fail the operation 
and transition us back to the `Executing` state.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,
+            String targetDirectory,
+            boolean advanceToEndOfEventTime,
+            CompletableFuture<String> operationCompletionFuture) {
+        super(
+                context,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                logger,
+                userCodeClassLoader);
+        this.context = context;
+        this.checkpointCoordinator = checkpointCoordinator;
+        this.targetDirectory = targetDirectory;
+        this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+        this.operationCompletionFuture = operationCompletionFuture;
+    }
+
+    @Override
+    public void onEnter() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        // handle errors while creating the savepoint
+                        .handleAsync(
+                                this::handleSavepointOperationResult,
+                                context.getMainThreadExecutor())
+                        // Wait for job to be finished
+                        .thenCompose(
+                                path ->
+                                        executionGraph
+                                                .getTerminationFuture()
+                                                .thenApply(
+                                                        status ->
+                                                                new 
JobStatusAndSavepointPath(
+                                                                        
status, path)))
+                        // check that the job finished successfully
+                        .handleAsync(
+                                this::handleStopWithSavepointOperationResult,
+                                context.getMainThreadExecutor());
+
+        FutureUtils.forward(savepointFuture, operationCompletionFuture);
+    }
+
+    private String handleSavepointOperationResult(
+            CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+        if (throwable != null) {
+            context.runIfState(this, () -> handleAnyFailure(throwable));
+            throw new SavepointCreationException(
+                    "Unable to stop with savepoint. Error while creating the 
savepoint.",
+                    throwable);
+        }
+        return completedCheckpoint.getExternalPointer();
+    }
+
+    private String handleStopWithSavepointOperationResult(
+            JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+        // filter out exception from first stage to show correct root cause
+        if (throwable != null) {
+            if (throwable instanceof SavepointCreationException) {
+                throw (SavepointCreationException) throwable;
+            } else {
+                throw new CompletionException(
+                        "Error while stopping job after creating savepoint", 
throwable);
+            }
+        }
+
+        if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+            // ensure that all vertices from the execution graph are in state 
FINISHED. There might
+            // be cases where only some subgraphs of the job are finished.
+            if (!areAllVerticesFinished()) {
+                if (getLogger().isDebugEnabled()) {
+                    getLogger().debug("Printing vertex states:");
+                    for (Execution e : 
getExecutionGraph().getRegisteredExecutions().values()) {
+                        getLogger().debug("{} = {}", e.getVertex(), 
e.getState());
+                    }
+                }
+                final FlinkException exception =
+                        new FlinkException(
+                                "Not all vertices of job are finished. Stop 
with savepoint operation failed.");
+                context.runIfState(this, () -> handleAnyFailure(exception));
+                throw new CompletionException("Unable to stop with savepoint", 
exception);

Review comment:
       here again we have one path for the failure handling and another path 
for communicating the result back to the user. I think this should be one and 
the same path. If we leave this state in any other state than `Finished`, then 
we should fail the operation.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+    /** Test failure during savepoint creation: go to failing */
+    @Test
+    public void testFailureDuringSavepointCreationWithNoRestart() throws 
Exception {
+        MockExecutingStateWithFailureHandlerContext ctx =
+                new MockExecutingStateWithFailureHandlerContext();
+
+        StopWithSavepointEnvironment stopWithSavepointEnv = 
createStopWithSavepointEnvironment(ctx);
+        StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+        stopWithSavepoint.onEnter();
+
+        
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+        // fail future returned by the CheckpointCoordinator
+        stopWithSavepointEnv
+                .getSavepointFuture()
+                .completeExceptionally(new RuntimeException("Savepoint 
creation failed"));
+
+        
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+        ctx.setExpectFailing(assertNonNull());
+
+        ctx.close(); // trigger outstanding executions
+
+        assertThat(
+                
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
+                is(true));
+    }
+
+    private static class StopWithSavepointEnvironment {
+        private final StopWithSavepoint state;
+        private final CompletableFuture<JobStatus> 
executionGraphTerminationFuture;
+        private final CompletableFuture<String> operationCompletionFuture;
+        private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+        StopWithSavepointEnvironment(
+                StopWithSavepoint state,
+                CompletableFuture<JobStatus> executionGraphTerminationFuture,
+                CompletableFuture<String> operationCompletionFuture,
+                CompletableFuture<CompletedCheckpoint> savepointFuture) {
+            this.state = state;
+            this.executionGraphTerminationFuture = 
executionGraphTerminationFuture;
+            this.operationCompletionFuture = operationCompletionFuture;
+            this.savepointFuture = savepointFuture;
+        }
+
+        public StopWithSavepoint getState() {
+            return state;
+        }
+
+        public CompletableFuture<JobStatus> 
getExecutionGraphTerminationFuture() {
+            return executionGraphTerminationFuture;
+        }
+
+        public CompletableFuture<String> getOperationCompletionFuture() {
+            return operationCompletionFuture;
+        }
+
+        public CompletableFuture<CompletedCheckpoint> getSavepointFuture() {
+            return savepointFuture;
+        }
+    }
+
+    private StopWithSavepointEnvironment createStopWithSavepointEnvironment(
+            MockExecutingStateWithFailureHandlerContext ctx) throws 
IOException {
+        CompletableFuture<JobStatus> executionGraphTerminationFuture = new 
CompletableFuture<>();
+        CompletableFuture<String> operationCompletionFuture = new 
CompletableFuture<>();
+        CompletableFuture<CompletedCheckpoint> savepointFuture = new 
CompletableFuture<>();
+        final ExecutionGraph executionGraph =
+                new MockExecutionGraph(executionGraphTerminationFuture);
+        executionGraph.transitionToRunning();
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph,
+                        log,
+                        ctx.getMainThreadExecutor(),
+                        ctx.getMainThreadExecutor());
+        OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(
+                        executionGraph,
+                        (throwable) -> {
+                            throw new RuntimeException("Error in test", 
throwable);
+                        });
+
+        CheckpointCoordinator checkpointCoordinator =
+                new MockCheckpointCoordinator(savepointFuture);
+
+        StopWithSavepoint stopWithSavepoint =
+                new StopWithSavepoint(
+                        ctx,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        log,
+                        ClassLoader.getSystemClassLoader(),
+                        checkpointCoordinator,
+                        "savepointTargetDir",
+                        false,
+                        operationCompletionFuture);
+        return new StopWithSavepointEnvironment(
+                stopWithSavepoint,
+                executionGraphTerminationFuture,
+                operationCompletionFuture,
+                savepointFuture);
+    }
+
+    private static class MockCheckpointCoordinator extends 
CheckpointCoordinator {

Review comment:
       Let's not continue with this pattern. Extending concrete classes is a 
recipe for a lot of rabbit holes.
   
   Maybe an easier solution could be to define the interface the 
`StopWithSavepoint` state requires and then have some adapters to map it to the 
underlying `CheckpointCoordinator` (or even let the `CheckpointCoordinator` 
implement this interface).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final boolean advanceToEndOfEventTime;
+    private final String targetDirectory;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final Context context;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            CheckpointCoordinator checkpointCoordinator,

Review comment:
       How well is the state testable with accessing the 
`CheckpointCoordinator` explicitly? Would it be easier if we hid the 
coordinator behind some interface or a `context` method?

##########
File path: flink-tests/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
       Probably not intended.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+    /** Test failure during savepoint creation: go to failing */
+    @Test
+    public void testFailureDuringSavepointCreationWithNoRestart() throws 
Exception {
+        MockExecutingStateWithFailureHandlerContext ctx =
+                new MockExecutingStateWithFailureHandlerContext();
+
+        StopWithSavepointEnvironment stopWithSavepointEnv = 
createStopWithSavepointEnvironment(ctx);
+        StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+        stopWithSavepoint.onEnter();
+
+        
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+        // fail future returned by the CheckpointCoordinator
+        stopWithSavepointEnv
+                .getSavepointFuture()
+                .completeExceptionally(new RuntimeException("Savepoint 
creation failed"));
+
+        
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+        ctx.setExpectFailing(assertNonNull());
+
+        ctx.close(); // trigger outstanding executions
+
+        assertThat(
+                
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
+                is(true));
+    }
+
+    private static class StopWithSavepointEnvironment {
+        private final StopWithSavepoint state;
+        private final CompletableFuture<JobStatus> 
executionGraphTerminationFuture;
+        private final CompletableFuture<String> operationCompletionFuture;
+        private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+        StopWithSavepointEnvironment(
+                StopWithSavepoint state,
+                CompletableFuture<JobStatus> executionGraphTerminationFuture,
+                CompletableFuture<String> operationCompletionFuture,
+                CompletableFuture<CompletedCheckpoint> savepointFuture) {
+            this.state = state;
+            this.executionGraphTerminationFuture = 
executionGraphTerminationFuture;
+            this.operationCompletionFuture = operationCompletionFuture;
+            this.savepointFuture = savepointFuture;
+        }
+
+        public StopWithSavepoint getState() {
+            return state;
+        }
+
+        public CompletableFuture<JobStatus> 
getExecutionGraphTerminationFuture() {
+            return executionGraphTerminationFuture;
+        }
+
+        public CompletableFuture<String> getOperationCompletionFuture() {
+            return operationCompletionFuture;
+        }
+
+        public CompletableFuture<CompletedCheckpoint> getSavepointFuture() {
+            return savepointFuture;
+        }
+    }
+
+    private StopWithSavepointEnvironment createStopWithSavepointEnvironment(
+            MockExecutingStateWithFailureHandlerContext ctx) throws 
IOException {
+        CompletableFuture<JobStatus> executionGraphTerminationFuture = new 
CompletableFuture<>();
+        CompletableFuture<String> operationCompletionFuture = new 
CompletableFuture<>();
+        CompletableFuture<CompletedCheckpoint> savepointFuture = new 
CompletableFuture<>();
+        final ExecutionGraph executionGraph =
+                new MockExecutionGraph(executionGraphTerminationFuture);
+        executionGraph.transitionToRunning();
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph,
+                        log,
+                        ctx.getMainThreadExecutor(),
+                        ctx.getMainThreadExecutor());
+        OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(
+                        executionGraph,
+                        (throwable) -> {
+                            throw new RuntimeException("Error in test", 
throwable);
+                        });
+
+        CheckpointCoordinator checkpointCoordinator =
+                new MockCheckpointCoordinator(savepointFuture);
+
+        StopWithSavepoint stopWithSavepoint =
+                new StopWithSavepoint(
+                        ctx,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        log,
+                        ClassLoader.getSystemClassLoader(),
+                        checkpointCoordinator,
+                        "savepointTargetDir",
+                        false,
+                        operationCompletionFuture);
+        return new StopWithSavepointEnvironment(
+                stopWithSavepoint,
+                executionGraphTerminationFuture,
+                operationCompletionFuture,
+                savepointFuture);
+    }
+
+    private static class MockCheckpointCoordinator extends 
CheckpointCoordinator {
+        private static ExecutionVertex vertex = mockExecutionVertex(new 
ExecutionAttemptID());
+        private static ExecutionVertex[] defaultVertices = new 
ExecutionVertex[] {vertex};
+        private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+        MockCheckpointCoordinator(CompletableFuture<CompletedCheckpoint> 
savepointFuture) {
+            super(
+                    new JobID(),
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .build(),
+                    Collections.emptyList(),
+                    new StandaloneCheckpointIDCounter(),
+                    new StandaloneCompletedCheckpointStore(1),
+                    new MemoryStateBackend(),
+                    Executors.directExecutor(),
+                    new CheckpointsCleaner(),
+                    new ManuallyTriggeredScheduledExecutor(),
+                    SharedStateRegistry.DEFAULT_FACTORY,
+                    new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE),
+                    new CheckpointPlanCalculator(
+                            new JobID(),
+                            Arrays.asList(defaultVertices),
+                            Arrays.asList(defaultVertices),
+                            Arrays.asList(defaultVertices)),
+                    new 
ExecutionAttemptMappingProvider(Arrays.asList(defaultVertices)));

Review comment:
       The pain to set these things up should be a good indicator that we are 
doing something wrong here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+    /** Test failure during savepoint creation: go to failing */
+    @Test
+    public void testFailureDuringSavepointCreationWithNoRestart() throws 
Exception {
+        MockExecutingStateWithFailureHandlerContext ctx =
+                new MockExecutingStateWithFailureHandlerContext();
+
+        StopWithSavepointEnvironment stopWithSavepointEnv = 
createStopWithSavepointEnvironment(ctx);
+        StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+        stopWithSavepoint.onEnter();
+
+        
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+        // fail future returned by the CheckpointCoordinator
+        stopWithSavepointEnv
+                .getSavepointFuture()
+                .completeExceptionally(new RuntimeException("Savepoint 
creation failed"));
+
+        
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+        ctx.setExpectFailing(assertNonNull());
+
+        ctx.close(); // trigger outstanding executions
+
+        assertThat(
+                
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
+                is(true));
+    }
+
+    private static class StopWithSavepointEnvironment {
+        private final StopWithSavepoint state;
+        private final CompletableFuture<JobStatus> 
executionGraphTerminationFuture;
+        private final CompletableFuture<String> operationCompletionFuture;
+        private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+        StopWithSavepointEnvironment(
+                StopWithSavepoint state,
+                CompletableFuture<JobStatus> executionGraphTerminationFuture,
+                CompletableFuture<String> operationCompletionFuture,
+                CompletableFuture<CompletedCheckpoint> savepointFuture) {
+            this.state = state;
+            this.executionGraphTerminationFuture = 
executionGraphTerminationFuture;
+            this.operationCompletionFuture = operationCompletionFuture;
+            this.savepointFuture = savepointFuture;
+        }
+
+        public StopWithSavepoint getState() {
+            return state;
+        }
+
+        public CompletableFuture<JobStatus> 
getExecutionGraphTerminationFuture() {
+            return executionGraphTerminationFuture;
+        }
+
+        public CompletableFuture<String> getOperationCompletionFuture() {
+            return operationCompletionFuture;
+        }
+
+        public CompletableFuture<CompletedCheckpoint> getSavepointFuture() {
+            return savepointFuture;
+        }
+    }
+
+    private StopWithSavepointEnvironment createStopWithSavepointEnvironment(
+            MockExecutingStateWithFailureHandlerContext ctx) throws 
IOException {
+        CompletableFuture<JobStatus> executionGraphTerminationFuture = new 
CompletableFuture<>();
+        CompletableFuture<String> operationCompletionFuture = new 
CompletableFuture<>();
+        CompletableFuture<CompletedCheckpoint> savepointFuture = new 
CompletableFuture<>();
+        final ExecutionGraph executionGraph =
+                new MockExecutionGraph(executionGraphTerminationFuture);
+        executionGraph.transitionToRunning();
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph,
+                        log,
+                        ctx.getMainThreadExecutor(),
+                        ctx.getMainThreadExecutor());
+        OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(
+                        executionGraph,
+                        (throwable) -> {
+                            throw new RuntimeException("Error in test", 
throwable);
+                        });
+
+        CheckpointCoordinator checkpointCoordinator =
+                new MockCheckpointCoordinator(savepointFuture);
+
+        StopWithSavepoint stopWithSavepoint =
+                new StopWithSavepoint(
+                        ctx,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        log,
+                        ClassLoader.getSystemClassLoader(),
+                        checkpointCoordinator,
+                        "savepointTargetDir",
+                        false,
+                        operationCompletionFuture);
+        return new StopWithSavepointEnvironment(
+                stopWithSavepoint,
+                executionGraphTerminationFuture,
+                operationCompletionFuture,
+                savepointFuture);
+    }
+
+    private static class MockCheckpointCoordinator extends 
CheckpointCoordinator {
+        private static ExecutionVertex vertex = mockExecutionVertex(new 
ExecutionAttemptID());
+        private static ExecutionVertex[] defaultVertices = new 
ExecutionVertex[] {vertex};
+        private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+        MockCheckpointCoordinator(CompletableFuture<CompletedCheckpoint> 
savepointFuture) {
+            super(
+                    new JobID(),
+                    new CheckpointCoordinatorConfiguration
+                                    
.CheckpointCoordinatorConfigurationBuilder()
+                            .build(),
+                    Collections.emptyList(),
+                    new StandaloneCheckpointIDCounter(),
+                    new StandaloneCompletedCheckpointStore(1),
+                    new MemoryStateBackend(),
+                    Executors.directExecutor(),
+                    new CheckpointsCleaner(),
+                    new ManuallyTriggeredScheduledExecutor(),
+                    SharedStateRegistry.DEFAULT_FACTORY,
+                    new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE),
+                    new CheckpointPlanCalculator(
+                            new JobID(),
+                            Arrays.asList(defaultVertices),
+                            Arrays.asList(defaultVertices),
+                            Arrays.asList(defaultVertices)),
+                    new 
ExecutionAttemptMappingProvider(Arrays.asList(defaultVertices)));
+            this.savepointFuture = savepointFuture;
+        }
+
+        @Override
+        public CompletableFuture<CompletedCheckpoint> 
triggerSynchronousSavepoint(
+                boolean advanceToEndOfEventTime, @Nullable String 
targetLocation) {
+            return savepointFuture;
+        }
+    }
+
+    private static class MockExecutionGraph extends ExecutionGraph {
+        private final CompletableFuture<JobStatus> mockTerminationFuture;
+
+        private MockExecutionGraph(CompletableFuture<JobStatus> 
mockTerminationFuture)
+                throws IOException {
+            super(
+                    new JobInformation(
+                            new JobID(),
+                            "Test Job",
+                            new SerializedValue<>(new ExecutionConfig()),
+                            new Configuration(),
+                            Collections.emptyList(),
+                            Collections.emptyList()),
+                    TestingUtils.defaultExecutor(),
+                    TestingUtils.defaultExecutor(),
+                    AkkaUtils.getDefaultTimeout(),
+                    1,
+                    ExecutionGraph.class.getClassLoader(),
+                    VoidBlobWriter.getInstance(),
+                    
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
+                            new Configuration()),
+                    NettyShuffleMaster.INSTANCE,
+                    NoOpJobMasterPartitionTracker.INSTANCE,
+                    ScheduleMode.EAGER,
+                    NoOpExecutionDeploymentListener.get(),
+                    (execution, newState) -> {},
+                    0L);
+            this.mockTerminationFuture = mockTerminationFuture;
+        }
+
+        @Override
+        public CompletableFuture<JobStatus> getTerminationFuture() {
+            return mockTerminationFuture;
+        }
+    }

Review comment:
       Yet another `EG` sub class...

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+    }
+
+    /** Expected behavior is that the job fails. */
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        try {
+            
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("CheckpointException"));

Review comment:
       `FlinkMatchers.containsMessage`

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+    }
+
+    /** Expected behavior is that the job fails. */
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        try {
+            
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+            fail("Expect exception");

Review comment:
       Shouldn't we also assert that the job is still running?

##########
File path: flink-runtime/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
       Seems like not intended.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping 
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {

Review comment:
       I think in general one should be very careful when using inheritance. If 
the base class is not well designed for inheritance, then inheritance is 
usually a good way to get stuck in rabbit holes in the future.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+    /** Test failure during savepoint creation: go to failing */
+    @Test
+    public void testFailureDuringSavepointCreationWithNoRestart() throws 
Exception {
+        MockExecutingStateWithFailureHandlerContext ctx =
+                new MockExecutingStateWithFailureHandlerContext();
+
+        StopWithSavepointEnvironment stopWithSavepointEnv = 
createStopWithSavepointEnvironment(ctx);
+        StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+        stopWithSavepoint.onEnter();
+
+        
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+        // fail future returned by the CheckpointCoordinator
+        stopWithSavepointEnv
+                .getSavepointFuture()
+                .completeExceptionally(new RuntimeException("Savepoint 
creation failed"));
+
+        
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+        ctx.setExpectFailing(assertNonNull());
+
+        ctx.close(); // trigger outstanding executions
+
+        assertThat(
+                
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),

Review comment:
       I am not a huge fan of this environment. Looking alone on this test, it 
is not clear to me why the env has the operation completion future. If the 
operation future belonged to the state, then it would be clearer, for example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to