tillrohrmann commented on a change in pull request #14948: URL: https://github.com/apache/flink/pull/14948#discussion_r577607349
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java ########## @@ -731,6 +731,29 @@ public void goToFailing( failureCause)); } + @Override + public void goToStopWithSavepoint( + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + transitionToState( + new StopWithSavepoint( + this, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + LOG, + userCodeClassLoader, + checkpointCoordinator, Review comment: For what do we need the `checkpointCoordinator`? Isn't it already included in the `ExecutionGraph`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java ########## @@ -141,29 +100,54 @@ public void notifyNewResourcesAvailable() { } } - /** Context of the {@link Executing} state. */ - interface Context extends StateWithExecutionGraph.Context { + CompletableFuture<String> stopWithSavepoint( + String targetDirectory, boolean advanceToEndOfEventTime) { Review comment: `@Nullable` seems to be missing. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, + context.getMainThreadExecutor()); + + FutureUtils.forward(savepointFuture, operationCompletionFuture); + } + + private String handleSavepointOperationResult( + CompletedCheckpoint completedCheckpoint, Throwable throwable) { + if (throwable != null) { + context.runIfState(this, () -> handleAnyFailure(throwable)); + throw new SavepointCreationException( + "Unable to stop with savepoint. Error while creating the savepoint.", + throwable); + } + return completedCheckpoint.getExternalPointer(); + } + + private String handleStopWithSavepointOperationResult( + JobStatusAndSavepointPath statusAndPath, Throwable throwable) { + // filter out exception from first stage to show correct root cause + if (throwable != null) { + if (throwable instanceof SavepointCreationException) { + throw (SavepointCreationException) throwable; + } else { + throw new CompletionException( + "Error while stopping job after creating savepoint", throwable); + } + } + + if (statusAndPath.getStatus() == JobStatus.FINISHED) { + // ensure that all vertices from the execution graph are in state FINISHED. There might + // be cases where only some subgraphs of the job are finished. + if (!areAllVerticesFinished()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Printing vertex states:"); + for (Execution e : getExecutionGraph().getRegisteredExecutions().values()) { + getLogger().debug("{} = {}", e.getVertex(), e.getState()); + } + } + final FlinkException exception = + new FlinkException( + "Not all vertices of job are finished. Stop with savepoint operation failed."); + context.runIfState(this, () -> handleAnyFailure(exception)); + throw new CompletionException("Unable to stop with savepoint", exception); + } + context.runIfState( + this, + () -> + context.goToFinished( + ArchivedExecutionGraph.createFrom(getExecutionGraph()))); + return statusAndPath.getPath(); + } else { + throw new CompletionException( + new FlinkException( + "Reached state " + + statusAndPath.getStatus() + + " instead of FINISHED ")); + } + } + + private boolean areAllVerticesFinished() { + return getExecutionGraph().getRegisteredExecutions().values().stream() + .noneMatch(e -> e.getState() != ExecutionState.FINISHED); + } + + @Override + protected void handleAnyFailure(@Nonnull Throwable cause) { + Preconditions.checkState(context.isState(this), "Assuming StopWithSavepoint state"); + // restart the checkpoint coordinator if stopWithSavepoint failed. + startCheckpointScheduler(checkpointCoordinator); Review comment: We could say that we do this whenever we leave this state in any other state than `Finished`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, + context.getMainThreadExecutor()); + + FutureUtils.forward(savepointFuture, operationCompletionFuture); Review comment: Instead of using future callbacks, the `StopWithSavepoint` state could have small internal state machine which reacts to signals. Two signals we need to handle are the terminal job status and the savepoint status. The former is already available via `onGloballyTerminalState`. The latter could be implemented by having `onSavepointSuccess(String path)` and a `onSavepointFailure(Throwable cause)`. That way we wouldn't have to deal with nested future callbacks. Maybe we could even reuse the improvements from FLINK-21030. ########## File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java ########## @@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws Exception { env.execute(); } + private enum StopWithSavepointTestBehavior { + NO_FAILURE, + FAIL_ON_CHECKPOINT, + FAIL_ON_STOP, + FAIL_ON_FIRST_CHECKPOINT_ONLY + } + + @Test + public void testStopWithSavepointNoError() throws Exception { + testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE); + } + + /** Expected behavior is that the job fails. */ + @Test + public void testStopWithSavepointFailOnCheckpoint() throws Exception { + try { + testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("CheckpointException")); + } + } + + @Test + public void testStopWithSavepointFailOnStop() throws Exception { + try { + testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_STOP); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("Reached state FAILED instead of FINISHED")); + } + } + + @Test + public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Exception { + assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); + + env.setParallelism(PARALLELISM); + + env.addSource(new DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY)) + .addSink(new DiscardingSink<>()); + DummySource.resetForParallelism(PARALLELISM); + + JobClient client = env.executeAsync(); + + DummySource.awaitRunning(); + DummySource.resetForParallelism(PARALLELISM); + final File savepointDirectory = tempFolder.newFolder("savepoint"); + try { + client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get(); + fail("Expect failure of operation"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("CheckpointException")); + } + + // trigger second savepoint + DummySource.awaitRunning(); + final String savepoint = + client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get(); + assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath())); + } + + /** Tests the stop with savepoint operation */ + private void testStopWithSavepoint(StopWithSavepointTestBehavior behavior) throws Exception { + assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + if (behavior == StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); Review comment: Are we calling `testStopWithSavepoint` with `FAIL_ON_FIRST_CHECKPOINT_ONLY`? ########## File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java ########## @@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws Exception { env.execute(); } + private enum StopWithSavepointTestBehavior { + NO_FAILURE, + FAIL_ON_CHECKPOINT, + FAIL_ON_STOP, + FAIL_ON_FIRST_CHECKPOINT_ONLY + } + + @Test + public void testStopWithSavepointNoError() throws Exception { + testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE); + } + + /** Expected behavior is that the job fails. */ + @Test + public void testStopWithSavepointFailOnCheckpoint() throws Exception { + try { + testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("CheckpointException")); + } + } + + @Test + public void testStopWithSavepointFailOnStop() throws Exception { + try { + testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_STOP); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("Reached state FAILED instead of FINISHED")); + } + } + + @Test + public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Exception { + assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); + + env.setParallelism(PARALLELISM); + + env.addSource(new DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY)) + .addSink(new DiscardingSink<>()); + DummySource.resetForParallelism(PARALLELISM); + + JobClient client = env.executeAsync(); + + DummySource.awaitRunning(); + DummySource.resetForParallelism(PARALLELISM); + final File savepointDirectory = tempFolder.newFolder("savepoint"); + try { + client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get(); + fail("Expect failure of operation"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("CheckpointException")); + } + + // trigger second savepoint + DummySource.awaitRunning(); + final String savepoint = + client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get(); + assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath())); + } + + /** Tests the stop with savepoint operation */ + private void testStopWithSavepoint(StopWithSavepointTestBehavior behavior) throws Exception { + assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + if (behavior == StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY) { + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); + } else { + env.setRestartStrategy(RestartStrategies.noRestart()); + } + env.setParallelism(PARALLELISM); + + env.addSource(new DummySource(behavior)).addSink(new DiscardingSink<>()); + DummySource.resetForParallelism(PARALLELISM); + + JobClient client = env.executeAsync(); + + DummySource.awaitRunning(); + final File savepointDirectory = tempFolder.newFolder("savepoint"); + final String savepoint = + client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get(); + assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath())); + assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED)); + } + + private static final class DummySource extends RichParallelSourceFunction<Integer> + implements CheckpointedFunction, CheckpointListener { + private final StopWithSavepointTestBehavior behavior; + private volatile boolean running = true; + private static CountDownLatch instancesRunning; + private volatile boolean checkpointComplete = false; + + public DummySource(StopWithSavepointTestBehavior behavior) { + this.behavior = behavior; + } + + private static void resetForParallelism(int para) { + instancesRunning = new CountDownLatch(para); + } + + private static void awaitRunning() throws InterruptedException { + Preconditions.checkNotNull(instancesRunning); + instancesRunning.await(); + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + Preconditions.checkNotNull(instancesRunning); + instancesRunning.countDown(); + int i = Integer.MIN_VALUE; + while (running) { + Thread.sleep(10L); + ctx.collect(i++); + } Review comment: I think this should be executed under the checkpoint lock. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, Review comment: E.g. we could have methods for stopping & starting the `CC` and to create a savepoint. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { Review comment: I am not sure whether introducing `ExecutingStateWithFailureHandler` is really helpful here. I think it causes us to overlook how to handle certain signals. For example, when `cancel` is called, then we should fail the savepoint operation and complete `operationCompletionFuture` exceptionally. The same actually applies to `suspend`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java ########## @@ -141,29 +100,54 @@ public void notifyNewResourcesAvailable() { } } - /** Context of the {@link Executing} state. */ - interface Context extends StateWithExecutionGraph.Context { + CompletableFuture<String> stopWithSavepoint( + String targetDirectory, boolean advanceToEndOfEventTime) { + final ExecutionGraph executionGraph = getExecutionGraph(); + final CheckpointCoordinator checkpointCoordinator = + executionGraph.getCheckpointCoordinator(); + + // check if stop with savepoint is possible: + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally( + new IllegalStateException( + String.format( + "Job %s is not a streaming job.", executionGraph.getJobID()))); + } - /** - * Transitions into the {@link Canceling} state. - * - * @param executionGraph executionGraph to pass to the {@link Canceling} state - * @param executionGraphHandler executionGraphHandler to pass to the {@link Canceling} state - * @param operatorCoordinatorHandler operatorCoordinatorHandler to pass to the {@link - * Canceling} state - */ - void goToCanceling( - ExecutionGraph executionGraph, - ExecutionGraphHandler executionGraphHandler, - OperatorCoordinatorHandler operatorCoordinatorHandler); + if (targetDirectory == null + && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { + getLogger() + .info( + "Trying to cancel job {} with savepoint, but no savepoint directory configured.", + executionGraph.getJobID()); + + return FutureUtils.completedExceptionally( + new IllegalStateException( + "No savepoint directory configured. You can either specify a directory " + + "while cancelling via -s :targetDirectory or configure a cluster-wide " + + "default via key '" + + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + + "'.")); + } - /** - * Asks how to handle the failure. - * - * @param failure failure describing the failure cause - * @return {@link FailureResult} which describes how to handle the failure - */ - FailureResult howToHandleFailure(Throwable failure); + getLogger().info("Triggering stop-with-savepoint for job {}.", executionGraph.getJobID()); + + CompletableFuture<String> operationCompletionFuture = new CompletableFuture<>(); Review comment: Maybe instead of creating this future, we could define `goToStopWithSavepoint()` to return this future. That way we would no have to pass the future into the `StopWithSavepoint` state but could make it directly part of it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. Review comment: Let's not try to rely on things which are far away from this class. It is much easier to maintain something if things are close to each other. Concretely, the `StopWithSavepoint` stops the `CheckpointCoordinator` and hence it should be responsible for activating it again. If we don't do this, then changes to the `CheckpointCoordinatorDeActivator` will surprisingly also affect this class. Having to write an in-code comment explaining this contract should be a good indicator for such a problem. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider; +import org.apache.flink.runtime.checkpoint.NoOpFailJobCall; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex; +import static org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + + /** Test failure during savepoint creation: go to failing */ + @Test + public void testFailureDuringSavepointCreationWithNoRestart() throws Exception { + MockExecutingStateWithFailureHandlerContext ctx = + new MockExecutingStateWithFailureHandlerContext(); + + StopWithSavepointEnvironment stopWithSavepointEnv = createStopWithSavepointEnvironment(ctx); + StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState(); + stopWithSavepoint.onEnter(); + + ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart); + // fail future returned by the CheckpointCoordinator + stopWithSavepointEnv + .getSavepointFuture() + .completeExceptionally(new RuntimeException("Savepoint creation failed")); + + stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED); Review comment: Why is this necessary? A savepoint failure should not require the EG to reach a globally terminal state before we can recover? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, + context.getMainThreadExecutor()); + + FutureUtils.forward(savepointFuture, operationCompletionFuture); + } + + private String handleSavepointOperationResult( + CompletedCheckpoint completedCheckpoint, Throwable throwable) { + if (throwable != null) { + context.runIfState(this, () -> handleAnyFailure(throwable)); + throw new SavepointCreationException( + "Unable to stop with savepoint. Error while creating the savepoint.", + throwable); + } + return completedCheckpoint.getExternalPointer(); + } + + private String handleStopWithSavepointOperationResult( + JobStatusAndSavepointPath statusAndPath, Throwable throwable) { + // filter out exception from first stage to show correct root cause + if (throwable != null) { + if (throwable instanceof SavepointCreationException) { + throw (SavepointCreationException) throwable; + } else { + throw new CompletionException( + "Error while stopping job after creating savepoint", throwable); + } + } + + if (statusAndPath.getStatus() == JobStatus.FINISHED) { + // ensure that all vertices from the execution graph are in state FINISHED. There might + // be cases where only some subgraphs of the job are finished. + if (!areAllVerticesFinished()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Printing vertex states:"); + for (Execution e : getExecutionGraph().getRegisteredExecutions().values()) { + getLogger().debug("{} = {}", e.getVertex(), e.getState()); + } + } + final FlinkException exception = + new FlinkException( + "Not all vertices of job are finished. Stop with savepoint operation failed."); + context.runIfState(this, () -> handleAnyFailure(exception)); + throw new CompletionException("Unable to stop with savepoint", exception); + } + context.runIfState( + this, + () -> + context.goToFinished( + ArchivedExecutionGraph.createFrom(getExecutionGraph()))); + return statusAndPath.getPath(); + } else { + throw new CompletionException( + new FlinkException( + "Reached state " + + statusAndPath.getStatus() + + " instead of FINISHED ")); + } + } + + private boolean areAllVerticesFinished() { + return getExecutionGraph().getRegisteredExecutions().values().stream() + .noneMatch(e -> e.getState() != ExecutionState.FINISHED); + } + + @Override + protected void handleAnyFailure(@Nonnull Throwable cause) { + Preconditions.checkState(context.isState(this), "Assuming StopWithSavepoint state"); + // restart the checkpoint coordinator if stopWithSavepoint failed. + startCheckpointScheduler(checkpointCoordinator); + + if (getLogger().isDebugEnabled()) { + getLogger() + .debug( + "Restarting or Failing job because of failure while stopping with savepoint.", + cause); + } else { + getLogger() + .info( + "Restarting or Failing job because of failure while stopping with savepoint. Reason: {}", + cause.getMessage()); + } + super.handleAnyFailure(cause); + } + + @Override + void onGloballyTerminalState(JobStatus globallyTerminalState) { + // handled already Review comment: This should be an indicator that we now have different control paths for one and the same thing. This is usually not ideal and makes maintenance harder. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, + context.getMainThreadExecutor()); + + FutureUtils.forward(savepointFuture, operationCompletionFuture); + } + + private String handleSavepointOperationResult( + CompletedCheckpoint completedCheckpoint, Throwable throwable) { + if (throwable != null) { + context.runIfState(this, () -> handleAnyFailure(throwable)); + throw new SavepointCreationException( + "Unable to stop with savepoint. Error while creating the savepoint.", + throwable); + } + return completedCheckpoint.getExternalPointer(); + } + + private String handleStopWithSavepointOperationResult( + JobStatusAndSavepointPath statusAndPath, Throwable throwable) { + // filter out exception from first stage to show correct root cause + if (throwable != null) { + if (throwable instanceof SavepointCreationException) { + throw (SavepointCreationException) throwable; + } else { + throw new CompletionException( + "Error while stopping job after creating savepoint", throwable); + } + } + + if (statusAndPath.getStatus() == JobStatus.FINISHED) { + // ensure that all vertices from the execution graph are in state FINISHED. There might + // be cases where only some subgraphs of the job are finished. + if (!areAllVerticesFinished()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Printing vertex states:"); + for (Execution e : getExecutionGraph().getRegisteredExecutions().values()) { + getLogger().debug("{} = {}", e.getVertex(), e.getState()); + } + } + final FlinkException exception = + new FlinkException( + "Not all vertices of job are finished. Stop with savepoint operation failed."); + context.runIfState(this, () -> handleAnyFailure(exception)); + throw new CompletionException("Unable to stop with savepoint", exception); + } + context.runIfState( + this, + () -> + context.goToFinished( + ArchivedExecutionGraph.createFrom(getExecutionGraph()))); + return statusAndPath.getPath(); + } else { + throw new CompletionException( + new FlinkException( + "Reached state " + + statusAndPath.getStatus() + + " instead of FINISHED ")); + } + } + + private boolean areAllVerticesFinished() { + return getExecutionGraph().getRegisteredExecutions().values().stream() + .noneMatch(e -> e.getState() != ExecutionState.FINISHED); + } + + @Override + protected void handleAnyFailure(@Nonnull Throwable cause) { + Preconditions.checkState(context.isState(this), "Assuming StopWithSavepoint state"); + // restart the checkpoint coordinator if stopWithSavepoint failed. + startCheckpointScheduler(checkpointCoordinator); + + if (getLogger().isDebugEnabled()) { + getLogger() + .debug( + "Restarting or Failing job because of failure while stopping with savepoint.", + cause); + } else { + getLogger() + .info( + "Restarting or Failing job because of failure while stopping with savepoint. Reason: {}", + cause.getMessage()); + } + super.handleAnyFailure(cause); Review comment: This is a bit of personal taste but I find it hard to see what happens in case of a failure in the state `StopWithSavepoint`. If we didn't use the `ExecutingStateWithFailureHandler` and instead have an explicit `context.goToRestarting` then it would be a bit clearer. It currently works because all failures are handled as global failovers. But once this changes, this class will break because we want to fail with global failovers explicitly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, Review comment: Why are we handling the final job result by accessing the `getTerminationFuture`? Don't we already have `onGloballyTerminalState` which tells us if the `ExecutionGraph` has reached a globally terminal state? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, + context.getMainThreadExecutor()); + + FutureUtils.forward(savepointFuture, operationCompletionFuture); + } + + private String handleSavepointOperationResult( + CompletedCheckpoint completedCheckpoint, Throwable throwable) { + if (throwable != null) { + context.runIfState(this, () -> handleAnyFailure(throwable)); + throw new SavepointCreationException( + "Unable to stop with savepoint. Error while creating the savepoint.", + throwable); Review comment: Handling the failure and sending the result to the user happens here in two places. The latter is even obfuscated a bit through throwing an exception which is forwarded to the result future at some other place. This is a rather complicated control flow. Couldn't we do this in the failure handling logic? Are we treating savepoint failures as global failures? It looks a bit like this. I think this is wrong. A savepoint failure should only fail the operation and transition us back to the `Executing` state. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, + String targetDirectory, + boolean advanceToEndOfEventTime, + CompletableFuture<String> operationCompletionFuture) { + super( + context, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + userCodeClassLoader); + this.context = context; + this.checkpointCoordinator = checkpointCoordinator; + this.targetDirectory = targetDirectory; + this.advanceToEndOfEventTime = advanceToEndOfEventTime; + this.operationCompletionFuture = operationCompletionFuture; + } + + @Override + public void onEnter() { + final ExecutionGraph executionGraph = getExecutionGraph(); + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + // handle errors while creating the savepoint + .handleAsync( + this::handleSavepointOperationResult, + context.getMainThreadExecutor()) + // Wait for job to be finished + .thenCompose( + path -> + executionGraph + .getTerminationFuture() + .thenApply( + status -> + new JobStatusAndSavepointPath( + status, path))) + // check that the job finished successfully + .handleAsync( + this::handleStopWithSavepointOperationResult, + context.getMainThreadExecutor()); + + FutureUtils.forward(savepointFuture, operationCompletionFuture); + } + + private String handleSavepointOperationResult( + CompletedCheckpoint completedCheckpoint, Throwable throwable) { + if (throwable != null) { + context.runIfState(this, () -> handleAnyFailure(throwable)); + throw new SavepointCreationException( + "Unable to stop with savepoint. Error while creating the savepoint.", + throwable); + } + return completedCheckpoint.getExternalPointer(); + } + + private String handleStopWithSavepointOperationResult( + JobStatusAndSavepointPath statusAndPath, Throwable throwable) { + // filter out exception from first stage to show correct root cause + if (throwable != null) { + if (throwable instanceof SavepointCreationException) { + throw (SavepointCreationException) throwable; + } else { + throw new CompletionException( + "Error while stopping job after creating savepoint", throwable); + } + } + + if (statusAndPath.getStatus() == JobStatus.FINISHED) { + // ensure that all vertices from the execution graph are in state FINISHED. There might + // be cases where only some subgraphs of the job are finished. + if (!areAllVerticesFinished()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Printing vertex states:"); + for (Execution e : getExecutionGraph().getRegisteredExecutions().values()) { + getLogger().debug("{} = {}", e.getVertex(), e.getState()); + } + } + final FlinkException exception = + new FlinkException( + "Not all vertices of job are finished. Stop with savepoint operation failed."); + context.runIfState(this, () -> handleAnyFailure(exception)); + throw new CompletionException("Unable to stop with savepoint", exception); Review comment: here again we have one path for the failure handling and another path for communicating the result back to the user. I think this should be one and the same path. If we leave this state in any other state than `Finished`, then we should fail the operation. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider; +import org.apache.flink.runtime.checkpoint.NoOpFailJobCall; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex; +import static org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + + /** Test failure during savepoint creation: go to failing */ + @Test + public void testFailureDuringSavepointCreationWithNoRestart() throws Exception { + MockExecutingStateWithFailureHandlerContext ctx = + new MockExecutingStateWithFailureHandlerContext(); + + StopWithSavepointEnvironment stopWithSavepointEnv = createStopWithSavepointEnvironment(ctx); + StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState(); + stopWithSavepoint.onEnter(); + + ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart); + // fail future returned by the CheckpointCoordinator + stopWithSavepointEnv + .getSavepointFuture() + .completeExceptionally(new RuntimeException("Savepoint creation failed")); + + stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED); + + ctx.setExpectFailing(assertNonNull()); + + ctx.close(); // trigger outstanding executions + + assertThat( + stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(), + is(true)); + } + + private static class StopWithSavepointEnvironment { + private final StopWithSavepoint state; + private final CompletableFuture<JobStatus> executionGraphTerminationFuture; + private final CompletableFuture<String> operationCompletionFuture; + private final CompletableFuture<CompletedCheckpoint> savepointFuture; + + StopWithSavepointEnvironment( + StopWithSavepoint state, + CompletableFuture<JobStatus> executionGraphTerminationFuture, + CompletableFuture<String> operationCompletionFuture, + CompletableFuture<CompletedCheckpoint> savepointFuture) { + this.state = state; + this.executionGraphTerminationFuture = executionGraphTerminationFuture; + this.operationCompletionFuture = operationCompletionFuture; + this.savepointFuture = savepointFuture; + } + + public StopWithSavepoint getState() { + return state; + } + + public CompletableFuture<JobStatus> getExecutionGraphTerminationFuture() { + return executionGraphTerminationFuture; + } + + public CompletableFuture<String> getOperationCompletionFuture() { + return operationCompletionFuture; + } + + public CompletableFuture<CompletedCheckpoint> getSavepointFuture() { + return savepointFuture; + } + } + + private StopWithSavepointEnvironment createStopWithSavepointEnvironment( + MockExecutingStateWithFailureHandlerContext ctx) throws IOException { + CompletableFuture<JobStatus> executionGraphTerminationFuture = new CompletableFuture<>(); + CompletableFuture<String> operationCompletionFuture = new CompletableFuture<>(); + CompletableFuture<CompletedCheckpoint> savepointFuture = new CompletableFuture<>(); + final ExecutionGraph executionGraph = + new MockExecutionGraph(executionGraphTerminationFuture); + executionGraph.transitionToRunning(); + final ExecutionGraphHandler executionGraphHandler = + new ExecutionGraphHandler( + executionGraph, + log, + ctx.getMainThreadExecutor(), + ctx.getMainThreadExecutor()); + OperatorCoordinatorHandler operatorCoordinatorHandler = + new OperatorCoordinatorHandler( + executionGraph, + (throwable) -> { + throw new RuntimeException("Error in test", throwable); + }); + + CheckpointCoordinator checkpointCoordinator = + new MockCheckpointCoordinator(savepointFuture); + + StopWithSavepoint stopWithSavepoint = + new StopWithSavepoint( + ctx, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + log, + ClassLoader.getSystemClassLoader(), + checkpointCoordinator, + "savepointTargetDir", + false, + operationCompletionFuture); + return new StopWithSavepointEnvironment( + stopWithSavepoint, + executionGraphTerminationFuture, + operationCompletionFuture, + savepointFuture); + } + + private static class MockCheckpointCoordinator extends CheckpointCoordinator { Review comment: Let's not continue with this pattern. Extending concrete classes is a recipe for a lot of rabbit holes. Maybe an easier solution could be to define the interface the `StopWithSavepoint` state requires and then have some adapters to map it to the underlying `CheckpointCoordinator` (or even let the `CheckpointCoordinator` implement this interface). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { + private final CompletableFuture<String> operationCompletionFuture; + private final boolean advanceToEndOfEventTime; + private final String targetDirectory; + private final CheckpointCoordinator checkpointCoordinator; + private final Context context; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + CheckpointCoordinator checkpointCoordinator, Review comment: How well is the state testable with accessing the `CheckpointCoordinator` explicitly? Would it be easier if we hid the coordinator behind some interface or a `context` method? ########## File path: flink-tests/src/test/resources/log4j2-test.properties ########## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO Review comment: Probably not intended. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider; +import org.apache.flink.runtime.checkpoint.NoOpFailJobCall; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex; +import static org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + + /** Test failure during savepoint creation: go to failing */ + @Test + public void testFailureDuringSavepointCreationWithNoRestart() throws Exception { + MockExecutingStateWithFailureHandlerContext ctx = + new MockExecutingStateWithFailureHandlerContext(); + + StopWithSavepointEnvironment stopWithSavepointEnv = createStopWithSavepointEnvironment(ctx); + StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState(); + stopWithSavepoint.onEnter(); + + ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart); + // fail future returned by the CheckpointCoordinator + stopWithSavepointEnv + .getSavepointFuture() + .completeExceptionally(new RuntimeException("Savepoint creation failed")); + + stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED); + + ctx.setExpectFailing(assertNonNull()); + + ctx.close(); // trigger outstanding executions + + assertThat( + stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(), + is(true)); + } + + private static class StopWithSavepointEnvironment { + private final StopWithSavepoint state; + private final CompletableFuture<JobStatus> executionGraphTerminationFuture; + private final CompletableFuture<String> operationCompletionFuture; + private final CompletableFuture<CompletedCheckpoint> savepointFuture; + + StopWithSavepointEnvironment( + StopWithSavepoint state, + CompletableFuture<JobStatus> executionGraphTerminationFuture, + CompletableFuture<String> operationCompletionFuture, + CompletableFuture<CompletedCheckpoint> savepointFuture) { + this.state = state; + this.executionGraphTerminationFuture = executionGraphTerminationFuture; + this.operationCompletionFuture = operationCompletionFuture; + this.savepointFuture = savepointFuture; + } + + public StopWithSavepoint getState() { + return state; + } + + public CompletableFuture<JobStatus> getExecutionGraphTerminationFuture() { + return executionGraphTerminationFuture; + } + + public CompletableFuture<String> getOperationCompletionFuture() { + return operationCompletionFuture; + } + + public CompletableFuture<CompletedCheckpoint> getSavepointFuture() { + return savepointFuture; + } + } + + private StopWithSavepointEnvironment createStopWithSavepointEnvironment( + MockExecutingStateWithFailureHandlerContext ctx) throws IOException { + CompletableFuture<JobStatus> executionGraphTerminationFuture = new CompletableFuture<>(); + CompletableFuture<String> operationCompletionFuture = new CompletableFuture<>(); + CompletableFuture<CompletedCheckpoint> savepointFuture = new CompletableFuture<>(); + final ExecutionGraph executionGraph = + new MockExecutionGraph(executionGraphTerminationFuture); + executionGraph.transitionToRunning(); + final ExecutionGraphHandler executionGraphHandler = + new ExecutionGraphHandler( + executionGraph, + log, + ctx.getMainThreadExecutor(), + ctx.getMainThreadExecutor()); + OperatorCoordinatorHandler operatorCoordinatorHandler = + new OperatorCoordinatorHandler( + executionGraph, + (throwable) -> { + throw new RuntimeException("Error in test", throwable); + }); + + CheckpointCoordinator checkpointCoordinator = + new MockCheckpointCoordinator(savepointFuture); + + StopWithSavepoint stopWithSavepoint = + new StopWithSavepoint( + ctx, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + log, + ClassLoader.getSystemClassLoader(), + checkpointCoordinator, + "savepointTargetDir", + false, + operationCompletionFuture); + return new StopWithSavepointEnvironment( + stopWithSavepoint, + executionGraphTerminationFuture, + operationCompletionFuture, + savepointFuture); + } + + private static class MockCheckpointCoordinator extends CheckpointCoordinator { + private static ExecutionVertex vertex = mockExecutionVertex(new ExecutionAttemptID()); + private static ExecutionVertex[] defaultVertices = new ExecutionVertex[] {vertex}; + private final CompletableFuture<CompletedCheckpoint> savepointFuture; + + MockCheckpointCoordinator(CompletableFuture<CompletedCheckpoint> savepointFuture) { + super( + new JobID(), + new CheckpointCoordinatorConfiguration + .CheckpointCoordinatorConfigurationBuilder() + .build(), + Collections.emptyList(), + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + new CheckpointsCleaner(), + new ManuallyTriggeredScheduledExecutor(), + SharedStateRegistry.DEFAULT_FACTORY, + new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE), + new CheckpointPlanCalculator( + new JobID(), + Arrays.asList(defaultVertices), + Arrays.asList(defaultVertices), + Arrays.asList(defaultVertices)), + new ExecutionAttemptMappingProvider(Arrays.asList(defaultVertices))); Review comment: The pain to set these things up should be a good indicator that we are doing something wrong here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider; +import org.apache.flink.runtime.checkpoint.NoOpFailJobCall; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex; +import static org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + + /** Test failure during savepoint creation: go to failing */ + @Test + public void testFailureDuringSavepointCreationWithNoRestart() throws Exception { + MockExecutingStateWithFailureHandlerContext ctx = + new MockExecutingStateWithFailureHandlerContext(); + + StopWithSavepointEnvironment stopWithSavepointEnv = createStopWithSavepointEnvironment(ctx); + StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState(); + stopWithSavepoint.onEnter(); + + ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart); + // fail future returned by the CheckpointCoordinator + stopWithSavepointEnv + .getSavepointFuture() + .completeExceptionally(new RuntimeException("Savepoint creation failed")); + + stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED); + + ctx.setExpectFailing(assertNonNull()); + + ctx.close(); // trigger outstanding executions + + assertThat( + stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(), + is(true)); + } + + private static class StopWithSavepointEnvironment { + private final StopWithSavepoint state; + private final CompletableFuture<JobStatus> executionGraphTerminationFuture; + private final CompletableFuture<String> operationCompletionFuture; + private final CompletableFuture<CompletedCheckpoint> savepointFuture; + + StopWithSavepointEnvironment( + StopWithSavepoint state, + CompletableFuture<JobStatus> executionGraphTerminationFuture, + CompletableFuture<String> operationCompletionFuture, + CompletableFuture<CompletedCheckpoint> savepointFuture) { + this.state = state; + this.executionGraphTerminationFuture = executionGraphTerminationFuture; + this.operationCompletionFuture = operationCompletionFuture; + this.savepointFuture = savepointFuture; + } + + public StopWithSavepoint getState() { + return state; + } + + public CompletableFuture<JobStatus> getExecutionGraphTerminationFuture() { + return executionGraphTerminationFuture; + } + + public CompletableFuture<String> getOperationCompletionFuture() { + return operationCompletionFuture; + } + + public CompletableFuture<CompletedCheckpoint> getSavepointFuture() { + return savepointFuture; + } + } + + private StopWithSavepointEnvironment createStopWithSavepointEnvironment( + MockExecutingStateWithFailureHandlerContext ctx) throws IOException { + CompletableFuture<JobStatus> executionGraphTerminationFuture = new CompletableFuture<>(); + CompletableFuture<String> operationCompletionFuture = new CompletableFuture<>(); + CompletableFuture<CompletedCheckpoint> savepointFuture = new CompletableFuture<>(); + final ExecutionGraph executionGraph = + new MockExecutionGraph(executionGraphTerminationFuture); + executionGraph.transitionToRunning(); + final ExecutionGraphHandler executionGraphHandler = + new ExecutionGraphHandler( + executionGraph, + log, + ctx.getMainThreadExecutor(), + ctx.getMainThreadExecutor()); + OperatorCoordinatorHandler operatorCoordinatorHandler = + new OperatorCoordinatorHandler( + executionGraph, + (throwable) -> { + throw new RuntimeException("Error in test", throwable); + }); + + CheckpointCoordinator checkpointCoordinator = + new MockCheckpointCoordinator(savepointFuture); + + StopWithSavepoint stopWithSavepoint = + new StopWithSavepoint( + ctx, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + log, + ClassLoader.getSystemClassLoader(), + checkpointCoordinator, + "savepointTargetDir", + false, + operationCompletionFuture); + return new StopWithSavepointEnvironment( + stopWithSavepoint, + executionGraphTerminationFuture, + operationCompletionFuture, + savepointFuture); + } + + private static class MockCheckpointCoordinator extends CheckpointCoordinator { + private static ExecutionVertex vertex = mockExecutionVertex(new ExecutionAttemptID()); + private static ExecutionVertex[] defaultVertices = new ExecutionVertex[] {vertex}; + private final CompletableFuture<CompletedCheckpoint> savepointFuture; + + MockCheckpointCoordinator(CompletableFuture<CompletedCheckpoint> savepointFuture) { + super( + new JobID(), + new CheckpointCoordinatorConfiguration + .CheckpointCoordinatorConfigurationBuilder() + .build(), + Collections.emptyList(), + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + new CheckpointsCleaner(), + new ManuallyTriggeredScheduledExecutor(), + SharedStateRegistry.DEFAULT_FACTORY, + new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE), + new CheckpointPlanCalculator( + new JobID(), + Arrays.asList(defaultVertices), + Arrays.asList(defaultVertices), + Arrays.asList(defaultVertices)), + new ExecutionAttemptMappingProvider(Arrays.asList(defaultVertices))); + this.savepointFuture = savepointFuture; + } + + @Override + public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint( + boolean advanceToEndOfEventTime, @Nullable String targetLocation) { + return savepointFuture; + } + } + + private static class MockExecutionGraph extends ExecutionGraph { + private final CompletableFuture<JobStatus> mockTerminationFuture; + + private MockExecutionGraph(CompletableFuture<JobStatus> mockTerminationFuture) + throws IOException { + super( + new JobInformation( + new JobID(), + "Test Job", + new SerializedValue<>(new ExecutionConfig()), + new Configuration(), + Collections.emptyList(), + Collections.emptyList()), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + 1, + ExecutionGraph.class.getClassLoader(), + VoidBlobWriter.getInstance(), + PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory( + new Configuration()), + NettyShuffleMaster.INSTANCE, + NoOpJobMasterPartitionTracker.INSTANCE, + ScheduleMode.EAGER, + NoOpExecutionDeploymentListener.get(), + (execution, newState) -> {}, + 0L); + this.mockTerminationFuture = mockTerminationFuture; + } + + @Override + public CompletableFuture<JobStatus> getTerminationFuture() { + return mockTerminationFuture; + } + } Review comment: Yet another `EG` sub class... ########## File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java ########## @@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws Exception { env.execute(); } + private enum StopWithSavepointTestBehavior { + NO_FAILURE, + FAIL_ON_CHECKPOINT, + FAIL_ON_STOP, + FAIL_ON_FIRST_CHECKPOINT_ONLY + } + + @Test + public void testStopWithSavepointNoError() throws Exception { + testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE); + } + + /** Expected behavior is that the job fails. */ + @Test + public void testStopWithSavepointFailOnCheckpoint() throws Exception { + try { + testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), containsString("CheckpointException")); Review comment: `FlinkMatchers.containsMessage` ########## File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java ########## @@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws Exception { env.execute(); } + private enum StopWithSavepointTestBehavior { + NO_FAILURE, + FAIL_ON_CHECKPOINT, + FAIL_ON_STOP, + FAIL_ON_FIRST_CHECKPOINT_ONLY + } + + @Test + public void testStopWithSavepointNoError() throws Exception { + testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE); + } + + /** Expected behavior is that the job fails. */ + @Test + public void testStopWithSavepointFailOnCheckpoint() throws Exception { + try { + testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT); + fail("Expect exception"); Review comment: Shouldn't we also assert that the job is still running? ########## File path: flink-runtime/src/test/resources/log4j2-test.properties ########## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO Review comment: Seems like not intended. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** State describing a job that is currently taking a savepoint and stopping afterwards. */ +class StopWithSavepoint extends ExecutingStateWithFailureHandler { Review comment: I think in general one should be very careful when using inheritance. If the base class is not well designed for inheritance, then inheritance is usually a good way to get stuck in rabbit holes in the future. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider; +import org.apache.flink.runtime.checkpoint.NoOpFailJobCall; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex; +import static org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + + /** Test failure during savepoint creation: go to failing */ + @Test + public void testFailureDuringSavepointCreationWithNoRestart() throws Exception { + MockExecutingStateWithFailureHandlerContext ctx = + new MockExecutingStateWithFailureHandlerContext(); + + StopWithSavepointEnvironment stopWithSavepointEnv = createStopWithSavepointEnvironment(ctx); + StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState(); + stopWithSavepoint.onEnter(); + + ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart); + // fail future returned by the CheckpointCoordinator + stopWithSavepointEnv + .getSavepointFuture() + .completeExceptionally(new RuntimeException("Savepoint creation failed")); + + stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED); + + ctx.setExpectFailing(assertNonNull()); + + ctx.close(); // trigger outstanding executions + + assertThat( + stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(), Review comment: I am not a huge fan of this environment. Looking alone on this test, it is not clear to me why the env has the operation completion future. If the operation future belonged to the state, then it would be clearer, for example. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org