[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582996083 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** {@code StopWithSavepointContext} implements {@link StopWithSavepointOperations}. */ +public class StopWithSavepointContext implements StopWithSavepointOperations { + +private final Logger log; + +private final SchedulerBase scheduler; +private final CheckpointCoordinator checkpointCoordinator; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private StopWithSavepointState state = StopWithSavepointState.InitialWait; +private String path; +private Set unfinishedStates; + +public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, Logger log) { +this.jobId = jobId; +this.scheduler = scheduler; +this.checkpointCoordinator = scheduler.getCheckpointCoordinator(); +this.log = log; +} + +@Override +public synchronized void handleSavepointCreation(String path, Throwable throwable) { +final StopWithSavepointState oldState = state; +state = state.onSavepointCreation(this, path, throwable); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling.", +oldState, +state); +} + +@Override +public synchronized void handleExecutionTermination( +Collection executionStates) { +final StopWithSavepointState oldState = state; +state = state.onExecutionsTermination(this, executionStates); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on execution termination handling.", +oldState, +state); +} + +@Override +public CompletableFuture getResult() { +return result; +} + +private StopWithSavepointState terminateExceptionWithGlobalFailover( +Iterable unfinishedExecutionStates) { +String errorMessage = +String.format( +"Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", +StringUtils.join(unfinishedExecutionStates, ", "), jobId); +FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + +scheduler.handleGlobalFailure(inconsistentFinalStateException); +return terminateExceptionally(inconsistentFinalStateException); +} + +private StopWithSavepointState terminateExceptionally(Throwable throwable) { +scheduler.startCheckpointScheduler(checkpointCoordinator); Review comment: This issue was addressed in @rmetzger [comment](https://github.com/apache/flink/pull/14847#discussion_r581918367) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582992549 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** {@code StopWithSavepointContext} implements {@link StopWithSavepointOperations}. */ +public class StopWithSavepointContext implements StopWithSavepointOperations { + +private final Logger log; + +private final SchedulerBase scheduler; +private final CheckpointCoordinator checkpointCoordinator; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private StopWithSavepointState state = StopWithSavepointState.InitialWait; +private String path; +private Set unfinishedStates; + +public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, Logger log) { +this.jobId = jobId; +this.scheduler = scheduler; +this.checkpointCoordinator = scheduler.getCheckpointCoordinator(); +this.log = log; +} + +@Override +public synchronized void handleSavepointCreation(String path, Throwable throwable) { +final StopWithSavepointState oldState = state; +state = state.onSavepointCreation(this, path, throwable); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling.", +oldState, +state); +} + +@Override +public synchronized void handleExecutionTermination( +Collection executionStates) { +final StopWithSavepointState oldState = state; +state = state.onExecutionsTermination(this, executionStates); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on execution termination handling.", +oldState, +state); +} + +@Override +public CompletableFuture getResult() { +return result; +} + +private StopWithSavepointState terminateExceptionWithGlobalFailover( +Iterable unfinishedExecutionStates) { +String errorMessage = +String.format( +"Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", +StringUtils.join(unfinishedExecutionStates, ", "), jobId); +FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + +scheduler.handleGlobalFailure(inconsistentFinalStateException); +return terminateExceptionally(inconsistentFinalStateException); +} + +private StopWithSavepointState terminateExceptionally(Throwable throwable) { +scheduler.startCheckpointScheduler(checkpointCoordinator); +result.completeExceptionally(throwable); + +return StopWithSavepointState.Final; +} + +private StopWithSavepointState terminateSuccessfully(String path) { +result.complete(path); + +return StopWithSavepointState.Final; +} + +private static Set extractUnfinishedStates( +Collection executionStates) { +return executionStates.stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); +} + +/** + * {@code StopWithSavepointState} represents the different states during the stop-with-savepoint + * operation. + * + * The state transitions are implemented in the following
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582991828 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** {@code StopWithSavepointContext} implements {@link StopWithSavepointOperations}. */ +public class StopWithSavepointContext implements StopWithSavepointOperations { + +private final Logger log; + +private final SchedulerBase scheduler; +private final CheckpointCoordinator checkpointCoordinator; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private StopWithSavepointState state = StopWithSavepointState.InitialWait; +private String path; +private Set unfinishedStates; + +public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, Logger log) { +this.jobId = jobId; +this.scheduler = scheduler; +this.checkpointCoordinator = scheduler.getCheckpointCoordinator(); +this.log = log; +} + +@Override +public synchronized void handleSavepointCreation(String path, Throwable throwable) { +final StopWithSavepointState oldState = state; +state = state.onSavepointCreation(this, path, throwable); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling.", +oldState, +state); +} + +@Override +public synchronized void handleExecutionTermination( +Collection executionStates) { +final StopWithSavepointState oldState = state; +state = state.onExecutionsTermination(this, executionStates); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on execution termination handling.", +oldState, +state); +} + +@Override +public CompletableFuture getResult() { +return result; +} + +private StopWithSavepointState terminateExceptionWithGlobalFailover( +Iterable unfinishedExecutionStates) { +String errorMessage = +String.format( +"Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", +StringUtils.join(unfinishedExecutionStates, ", "), jobId); +FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + +scheduler.handleGlobalFailure(inconsistentFinalStateException); +return terminateExceptionally(inconsistentFinalStateException); +} + +private StopWithSavepointState terminateExceptionally(Throwable throwable) { +scheduler.startCheckpointScheduler(checkpointCoordinator); +result.completeExceptionally(throwable); + +return StopWithSavepointState.Final; +} + +private StopWithSavepointState terminateSuccessfully(String path) { +result.complete(path); + +return StopWithSavepointState.Final; +} + +private static Set extractUnfinishedStates( +Collection executionStates) { +return executionStates.stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); +} + +/** + * {@code StopWithSavepointState} represents the different states during the stop-with-savepoint + * operation. + * + * The state transitions are implemented in the following
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582991307 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java ## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointContextTest} tests the stop-with-savepoint functionality of {@link + * SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointContextTest extends TestLogger { + +private JobGraph jobGraph; +private DefaultScheduler scheduler; + +private StopWithSavepointOperations testInstance; + +@Before +public void setup() throws Exception { +jobGraph = new JobGraph(); + +final JobVertex jobVertex = new JobVertex("vertex #0"); +jobVertex.setInvokableClass(NoOpInvokable.class); +jobGraph.addVertex(jobVertex); + +// checkpointInterval has to be set to a value lower than Long.MAX_VALUE to enable +// periodic checkpointing - only then can we enable/disable the CheckpointCoordinator +SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, Long.MAX_VALUE - 1); +scheduler = +SchedulerTestingUtils.createSchedulerBuilder( +jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()) +.setFutureExecutor(new DirectScheduledExecutorService()) +.build(); +scheduler.startScheduling(); + +// the checkpoint scheduler is stopped before triggering the stop-with-savepoint +disableCheckpointScheduler(); + +testInstance = new StopWithSavepointContext(jobGraph.getJobID(), scheduler, this.log); +} + +@Test +public void testHappyPathWithSavepointCreationBeforeTermination() throws Exception { +assertHappyPath( +(savepointPath) -> { +testInstance.handleSavepointCreation(savepointPath, null); +testInstance.handleExecutionTermination( + Collections.singletonList(ExecutionState.FINISHED)); +}); +} + +@Test +public void testHappyPathWithSavepointCreationAfterTermination() throws Exception { +assertHappyPath( +(savepointPath) -> { +testInstance.handleExecutionTermination( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582990665 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java ## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointContextTest} tests the stop-with-savepoint functionality of {@link + * SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointContextTest extends TestLogger { + +private JobGraph jobGraph; +private DefaultScheduler scheduler; + +private StopWithSavepointOperations testInstance; + +@Before +public void setup() throws Exception { +jobGraph = new JobGraph(); + +final JobVertex jobVertex = new JobVertex("vertex #0"); +jobVertex.setInvokableClass(NoOpInvokable.class); +jobGraph.addVertex(jobVertex); + +// checkpointInterval has to be set to a value lower than Long.MAX_VALUE to enable +// periodic checkpointing - only then can we enable/disable the CheckpointCoordinator +SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, Long.MAX_VALUE - 1); +scheduler = +SchedulerTestingUtils.createSchedulerBuilder( +jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()) +.setFutureExecutor(new DirectScheduledExecutorService()) +.build(); +scheduler.startScheduling(); + +// the checkpoint scheduler is stopped before triggering the stop-with-savepoint +disableCheckpointScheduler(); + +testInstance = new StopWithSavepointContext(jobGraph.getJobID(), scheduler, this.log); +} + +@Test +public void testHappyPathWithSavepointCreationBeforeTermination() throws Exception { +assertHappyPath( +(savepointPath) -> { +testInstance.handleSavepointCreation(savepointPath, null); +testInstance.handleExecutionTermination( + Collections.singletonList(ExecutionState.FINISHED)); +}); +} + +@Test +public void testHappyPathWithSavepointCreationAfterTermination() throws Exception { +assertHappyPath( +(savepointPath) -> { +testInstance.handleExecutionTermination( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582989152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( +CompletedCheckpoint completedSavepoint, Throwable throwable) { +if (throwable != null) { +checkArgument( +completedSavepoint == null, +"No savepoint should be provided if a throwable is passed."); +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreationSuccess(checkNotNull(completedSavepoint)); +} +} + +@Override +public void handleExecutionsTermination(Collection terminatedExecutionStates) { +final Set notFinishedExecutionStates = +checkNotNull(terminatedExecutionStates).stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); + +if (notFinishedExecutionStates.isEmpty()) { +handleExecutionsFinished(); +} else { +handleAnyExecutionNotFinished(notFinishedExecutionStates); +} +} + +private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r582029660 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -835,8 +840,21 @@ public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) { mainThreadExecutor); } -private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { +@Override +public void stopCheckpointScheduler() { +Preconditions.checkState( +getCheckpointCoordinator() != null, +"Checkpointing cannot be stopped since it's not enabled."); +getCheckpointCoordinator().stopCheckpointScheduler(); +} + +@Override +public void startCheckpointScheduler() { mainThreadExecutor.assertRunningInMainThread(); +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(); +Preconditions.checkState( +checkpointCoordinator != null, +"Checkpointing cannot be started since it's not enabled."); Review comment: I went for the first option. Unfortunately, I had to remove the `CheckpointScheduling` interface from the `CheckpointCoordinator` due to that since the `CheckpointCoordinator` is throwing an exception in case of a shutdown. The approach with having a method for checking whether Checkpoint Scheduling is enabled wasn't better either because it would have meant: Adding this check would have exposed the shutdown behavior of the `CheckpointCoordinator` which, as a consequence, would have required to add the `shutdown` method to the interface as well. This method doesn't work for the Scheduler, though. Hence, I decided to go for the approach that checks the availability of the `CheckpointCoordinator` internally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r581982820 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -835,8 +840,21 @@ public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) { mainThreadExecutor); } -private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { +@Override +public void stopCheckpointScheduler() { +Preconditions.checkState( +getCheckpointCoordinator() != null, +"Checkpointing cannot be stopped since it's not enabled."); +getCheckpointCoordinator().stopCheckpointScheduler(); +} + +@Override +public void startCheckpointScheduler() { mainThreadExecutor.assertRunningInMainThread(); +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(); +Preconditions.checkState( +checkpointCoordinator != null, +"Checkpointing cannot be started since it's not enabled."); Review comment: I agree: That's a valid scenario. I'm just wondering now whether we should simply make the call optional in that case. We could just add another condition to the if statement. I don't like that because it makes the interface less specific. On the other hand one could argue that starting the checkpoint scheduling was always optional due to the condition that checks whether periodic checkpointing is enabled. Another alternative would be to extend the interface by a method to check the state of the checkpointing (like `CheckpointCoordinator.shutdown` and only re-enable it if it's not shutdown. @rmetzger any thoughts on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r581982820 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -835,8 +840,21 @@ public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) { mainThreadExecutor); } -private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { +@Override +public void stopCheckpointScheduler() { +Preconditions.checkState( +getCheckpointCoordinator() != null, +"Checkpointing cannot be stopped since it's not enabled."); +getCheckpointCoordinator().stopCheckpointScheduler(); +} + +@Override +public void startCheckpointScheduler() { mainThreadExecutor.assertRunningInMainThread(); +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(); +Preconditions.checkState( +checkpointCoordinator != null, +"Checkpointing cannot be started since it's not enabled."); Review comment: I agree: That's a valid scenario. I'm just wondering now whether we should simply make the call optional in that case. We could just add another condition to the if statement. I don't like that because it makes the interface less specific. On the other hand one could argue that starting the checkpoint scheduling was always optional due to the condition that checks whether periodic checkpointing is enabled. Handling it We cannot just wait for the termination to restart the scheduling due to the happy path. Another alternative would be to extend the interface by a method to check the state of the checkpointing (like `CheckpointCoordinator.shutdown` and only re-enable it if it's not shutdown. @rmetzger any thoughts on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r581945379 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( +CompletedCheckpoint completedSavepoint, Throwable throwable) { +if (throwable != null) { +checkArgument( +completedSavepoint == null, +"No savepoint should be provided if a throwable is passed."); +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreationSuccess(checkNotNull(completedSavepoint)); +} +} + +@Override +public void handleExecutionsTermination(Collection terminatedExecutionStates) { +final Set notFinishedExecutionStates = +checkNotNull(terminatedExecutionStates).stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); + +if (notFinishedExecutionStates.isEmpty()) { +handleExecutionsFinished(); +} else { +handleAnyExecutionNotFinished(notFinishedExecutionStates); +} +} + +private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r581941734 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -541,6 +544,59 @@ public void handleGlobalFailure() { assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId)); } +/** + * This test covers the use-case where a global fail-over is followed by a local task failure. + * It verifies (besides checking the expected deployments) that the assert in the global + * recovery handling of {@link SchedulerBase#restoreState} is not triggered due to version + * updates. + */ +@Test +public void handleGlobalFailureWithLocalFailure() { Review comment: We cannot remove [assert in SchedulerBase.restoreState](https://github.com/XComp/flink/blob/7cbd97f815d3bfd4715ddd8cb1d88f92f05a282a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L419) as the following operation relies on all vertices being passed for restoring the state. Accepting less `ExecutionJobVertices` would mean that not all tasks would get restored. I added this test to verify the case where a local task failure proceeds a global fail-over before actually restarting the tasks. The local task failure would not trigger a version update because of the [isNotifiable](https://github.com/XComp/flink/blob/7cbd97f815d3bfd4715ddd8cb1d88f92f05a282a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L654) check in `SchedulerBase.updateTaskExecutionState`. This will only return `true` (and trigger the internal task state change) if the corresponding `ExecutionVertex` is either in state `FINISHED` or `FAILED`. The previous global fail-over has switched the state already to `CANCELING`. The local task failure will finally switch the state to `CANCELED`. Hence, no internal task state change is processed for the local task failure. No version upgrade is triggered and the global fail-over restart restarts the all vertices as expected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r581376284 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( +CompletedCheckpoint completedSavepoint, Throwable throwable) { +if (throwable != null) { +checkArgument( +completedSavepoint == null, +"No savepoint should be provided if a throwable is passed."); +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreationSuccess(checkNotNull(completedSavepoint)); +} +} + +@Override +public void handleExecutionsTermination(Collection terminatedExecutionStates) { +final Set notFinishedExecutionStates = +checkNotNull(terminatedExecutionStates).stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); + +if (notFinishedExecutionStates.isEmpty()) { +handleExecutionsFinished(); +} else { +handleAnyExecutionNotFinished(notFinishedExecutionStates); +} +} + +private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r580901980 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( Review comment: I realized that I refactor the corresponding `StopWithSavepointTerminationManagerTest` accordingly. We don't need all the different test cases there. I'm fixing this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r580542102 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( +CompletedCheckpoint completedSavepoint, Throwable throwable) { +if (throwable != null) { +checkArgument( +completedSavepoint == null, +"No savepoint should be provided if a throwable is passed."); +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreationSuccess(checkNotNull(completedSavepoint)); +} +} + +@Override +public void handleExecutionsTermination(Collection terminatedExecutionStates) { +final Set notFinishedExecutionStates = +checkNotNull(terminatedExecutionStates).stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); + +if (notFinishedExecutionStates.isEmpty()) { +handleExecutionsFinished(); +} else { +handleAnyExecutionNotFinished(notFinishedExecutionStates); +} +} + +private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r580523907 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( +CompletedCheckpoint completedSavepoint, Throwable throwable) { +if (throwable != null) { +checkArgument( +completedSavepoint == null, +"No savepoint should be provided if a throwable is passed."); +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreationSuccess(checkNotNull(completedSavepoint)); +} +} + +@Override +public void handleExecutionsTermination(Collection terminatedExecutionStates) { +final Set notFinishedExecutionStates = +checkNotNull(terminatedExecutionStates).stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); + +if (notFinishedExecutionStates.isEmpty()) { +handleExecutionsFinished(); +} else { +handleAnyExecutionNotFinished(notFinishedExecutionStates); +} +} + +private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r580520576 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( +CompletedCheckpoint completedSavepoint, Throwable throwable) { +if (throwable != null) { +checkArgument( +completedSavepoint == null, +"No savepoint should be provided if a throwable is passed."); +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreationSuccess(checkNotNull(completedSavepoint)); +} +} + +@Override +public void handleExecutionsTermination(Collection terminatedExecutionStates) { +final Set notFinishedExecutionStates = +checkNotNull(terminatedExecutionStates).stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); + +if (notFinishedExecutionStates.isEmpty()) { +handleExecutionsFinished(); +} else { +handleAnyExecutionNotFinished(notFinishedExecutionStates); +} +} + +private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r580518051 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; +private final Executor ioExecutor; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, ioExecutor, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +JobID jobId, +SchedulerNG scheduler, +CheckpointScheduling checkpointScheduling, +Executor ioExecutor, +Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.ioExecutor = checkNotNull(ioExecutor); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture getSavepointPath() { +return result; +} + +@Override +public void handleSavepointCreation( Review comment: `StopWithSavepointTerminationHandler` and `StopWithSavepointTerminationManager` are separated in a way that the manager focuses solely on the order of the completion whereas the handler deals with the result of the termination. Having `StopWithSavepointTerminationHandler` offer two different methods for success and failure case would mean that we have to put the logic of differentiating these two into the manager. That would destroy the separation of responsibilities partially. As a consequence, we would have to distinguish between the failure case and the success case in the manager's test class again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r580511819 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.stopwithsavepoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.scheduler.TestingSchedulerNG; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointTerminationHandlerImplTest} tests the stop-with-savepoint functionality + * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointTerminationHandlerImplTest extends TestLogger { + +private static final JobID JOB_ID = new JobID(); + +private final TestingCheckpointScheduling checkpointScheduling = +new TestingCheckpointScheduling(false); + +private StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver() { +return createTestInstance( +throwableCausingGlobalFailOver -> fail("No global failover should be triggered.")); +} + +private StopWithSavepointTerminationHandlerImpl createTestInstance( +Consumer handleGlobalFailureConsumer) { +// checkpointing should be always stopped before initiating stop-with-savepoint +checkpointScheduling.stopCheckpointScheduler(); + +final SchedulerNG scheduler = +TestingSchedulerNG.newBuilder() + .setHandleGlobalFailureConsumer(handleGlobalFailureConsumer) +.build(); +return new StopWithSavepointTerminationHandlerImpl( +JOB_ID, scheduler, checkpointScheduling, Executors.directExecutor(), log); +} + +@Test +public void testHappyPath() throws ExecutionException, InterruptedException { +final StopWithSavepointTerminationHandlerImpl testInstance = +createTestInstanceFailingOnGlobalFailOver(); + +final EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle(); +final CompletedCheckpoint completedSavepoint = createCompletedSavepoint(streamStateHandle); +testInstance.handleSavepointCreation(completedSavepoint, null); + testInstance.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED)); + +assertThat( +testInstance.getSavepointPath().get(), is(completedSavepoint.getExternalPointer())); + +assertFalse( +"The savepoint should not have been discarded.", streamStateHandle.isDisposed()); +assertFalse("Checkpoint scheduling should be disabled.", checkpointScheduling.isEnabled()); +} + +@Test +public void testSavepointCreationFailure() { +final
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r579664779 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, +@Nonnull SchedulerNG scheduler, +@Nonnull CheckpointScheduling checkpointScheduling, +@Nonnull Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture handlesStopWithSavepointTermination( +CompletableFuture completedSavepointFuture, +CompletableFuture> terminatedExecutionsFuture, +ComponentMainThreadExecutor mainThreadExecutor) { +completedSavepointFuture +.whenCompleteAsync( +(completedSavepoint, throwable) -> { +if (throwable != null) { +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreation(completedSavepoint); +} +}, +mainThreadExecutor) +.thenCompose( +aVoid -> +terminatedExecutionsFuture.thenAcceptAsync( Review comment: It's not. Correct me, if I'm wrong: The execution will happen in the main thread if we use a non-Async operation (like the `thenRun` you suggested) since the previous operation is forced to run in the main thread already through `handleAsync` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r579664565 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, +@Nonnull SchedulerNG scheduler, +@Nonnull CheckpointScheduling checkpointScheduling, +@Nonnull Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture handlesStopWithSavepointTermination( +CompletableFuture completedSavepointFuture, +CompletableFuture> terminatedExecutionsFuture, +ComponentMainThreadExecutor mainThreadExecutor) { +completedSavepointFuture +.whenCompleteAsync( +(completedSavepoint, throwable) -> { +if (throwable != null) { +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreation(completedSavepoint); +} +}, +mainThreadExecutor) +.thenCompose( +aVoid -> +terminatedExecutionsFuture.thenAcceptAsync( +this::handleExecutionsTermination, mainThreadExecutor)); Review comment: Good point. I refactored it and introduced a `StopWithSavepointTerminationManager` to enforce the right execution order. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r579663983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, +@Nonnull SchedulerNG scheduler, +@Nonnull CheckpointScheduling checkpointScheduling, +@Nonnull Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture handlesStopWithSavepointTermination( +CompletableFuture completedSavepointFuture, +CompletableFuture> terminatedExecutionsFuture, +ComponentMainThreadExecutor mainThreadExecutor) { +completedSavepointFuture +.whenCompleteAsync( +(completedSavepoint, throwable) -> { +if (throwable != null) { +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreation(completedSavepoint); +} +}, +mainThreadExecutor) +.thenCompose( +aVoid -> +terminatedExecutionsFuture.thenAcceptAsync( +this::handleExecutionsTermination, mainThreadExecutor)); + +return result; +} + +private synchronized void handleSavepointCreation(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", +oldState, +state, +jobId); +} + +private synchronized void handleSavepointCreationFailure(Throwable throwable) { +final
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r579298421 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl +implements StopWithSavepointTerminationHandler { + +private final Logger log; + +private final SchedulerNG scheduler; +private final CheckpointScheduling checkpointScheduling; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private State state = new WaitingForSavepoint(); + +public StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { +this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); +} + +@VisibleForTesting +StopWithSavepointTerminationHandlerImpl( +@Nonnull JobID jobId, +@Nonnull SchedulerNG scheduler, +@Nonnull CheckpointScheduling checkpointScheduling, +@Nonnull Logger log) { +this.jobId = checkNotNull(jobId); +this.scheduler = checkNotNull(scheduler); +this.checkpointScheduling = checkNotNull(checkpointScheduling); +this.log = checkNotNull(log); +} + +@Override +public CompletableFuture handlesStopWithSavepointTermination( +CompletableFuture completedSavepointFuture, +CompletableFuture> terminatedExecutionsFuture, +ComponentMainThreadExecutor mainThreadExecutor) { +completedSavepointFuture +.whenCompleteAsync( +(completedSavepoint, throwable) -> { +if (throwable != null) { +handleSavepointCreationFailure(throwable); +} else { +handleSavepointCreation(completedSavepoint); +} +}, +mainThreadExecutor) +.thenCompose( +aVoid -> +terminatedExecutionsFuture.thenAcceptAsync( +this::handleExecutionsTermination, mainThreadExecutor)); + +return result; +} + +private synchronized void handleSavepointCreation(CompletedCheckpoint completedCheckpoint) { +final State oldState = state; +state = state.onSavepointCreation(completedCheckpoint); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", +oldState, +state, +jobId); +} + +private synchronized void handleSavepointCreationFailure(Throwable throwable) { +final
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r579292914 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImplTest.java ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.QuadConsumer; +import org.apache.flink.util.function.TriConsumer; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointTerminationHandlerImplTest} tests the stop-with-savepoint functionality + * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointTerminationHandlerImplTest extends TestLogger { + +@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + +private static final JobID JOB_ID = new JobID(); + +private final TestingCheckpointScheduling checkpointScheduling = +new TestingCheckpointScheduling(false); + +private StopWithSavepointTerminationHandlerImpl createTestInstance( +Consumer handleGlobalFailureConsumer) { +// checkpointing should be always stopped before initiating stop-with-savepoint +checkpointScheduling.stopCheckpointScheduler(); + +final SchedulerNG scheduler = +TestingSchedulerNG.newBuilder() + .setHandleGlobalFailureConsumer(handleGlobalFailureConsumer) +.build(); +return new StopWithSavepointTerminationHandlerImpl( +JOB_ID, scheduler, checkpointScheduling, log); +} + +@Test +public void testHappyPathWithSavepointCreationBeforeSuccessfulTermination() throws Exception { +assertHappyPath( +(completedSavepoint, +completedSavepointFuture, +terminatedExecutionStates, +executionsTerminatedFuture) -> { +completedSavepointFuture.complete(completedSavepoint); + executionsTerminatedFuture.complete(terminatedExecutionStates); +}); +} + +@Test +public void testHappyPathWithSavepointCreationAfterSuccessfulTermination() throws Exception { +assertHappyPath( +(completedSavepoint, +completedSavepointFuture, +terminatedExecutionStates, +executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete(terminatedExecutionStates); +completedSavepointFuture.complete(completedSavepoint); +}); +} + +@Test +public void testSavepointCreationFailureBeforeSuccessfulTermination()
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578584522 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** {@code StopWithSavepointContext} implements {@link StopWithSavepointOperations}. */ +public class StopWithSavepointContext implements StopWithSavepointOperations { + +private final Logger log; + +private final SchedulerBase scheduler; +private final CheckpointCoordinator checkpointCoordinator; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private StopWithSavepointState state = StopWithSavepointState.InitialWait; +private String path; +private Set unfinishedStates; + +public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, Logger log) { +this.jobId = jobId; +this.scheduler = scheduler; +this.checkpointCoordinator = scheduler.getCheckpointCoordinator(); +this.log = log; +} + +@Override +public synchronized void handleSavepointCreation(String path, Throwable throwable) { +final StopWithSavepointState oldState = state; +state = state.onSavepointCreation(this, path, throwable); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling.", +oldState, +state); +} + +@Override +public synchronized void handleExecutionTermination( +Collection executionStates) { +final StopWithSavepointState oldState = state; +state = state.onExecutionsTermination(this, executionStates); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on execution termination handling.", +oldState, +state); +} + +@Override +public CompletableFuture getResult() { +return result; +} + +private StopWithSavepointState terminateExceptionWithGlobalFailover( +Iterable unfinishedExecutionStates) { +String errorMessage = +String.format( +"Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", +StringUtils.join(unfinishedExecutionStates, ", "), jobId); +FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + +scheduler.handleGlobalFailure(inconsistentFinalStateException); +return terminateExceptionally(inconsistentFinalStateException); +} + +private StopWithSavepointState terminateExceptionally(Throwable throwable) { +scheduler.startCheckpointScheduler(checkpointCoordinator); +result.completeExceptionally(throwable); + +return StopWithSavepointState.Final; +} + +private StopWithSavepointState terminateSuccessfully(String path) { +result.complete(path); + +return StopWithSavepointState.Final; +} + +private static Set extractUnfinishedStates( +Collection executionStates) { +return executionStates.stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); +} + +/** + * {@code StopWithSavepointState} represents the different states during the stop-with-savepoint + * operation. + * + * The state transitions are implemented in the following
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578584067 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java ## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointContextTest} tests the stop-with-savepoint functionality of {@link + * SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointContextTest extends TestLogger { + +private JobGraph jobGraph; +private DefaultScheduler scheduler; + +private StopWithSavepointOperations testInstance; + +@Before +public void setup() throws Exception { +jobGraph = new JobGraph(); + +final JobVertex jobVertex = new JobVertex("vertex #0"); +jobVertex.setInvokableClass(NoOpInvokable.class); +jobGraph.addVertex(jobVertex); + +// checkpointInterval has to be set to a value lower than Long.MAX_VALUE to enable +// periodic checkpointing - only then can we enable/disable the CheckpointCoordinator +SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, Long.MAX_VALUE - 1); +scheduler = +SchedulerTestingUtils.createSchedulerBuilder( +jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()) +.setFutureExecutor(new DirectScheduledExecutorService()) +.build(); +scheduler.startScheduling(); + +// the checkpoint scheduler is stopped before triggering the stop-with-savepoint +disableCheckpointScheduler(); + +testInstance = new StopWithSavepointContext(jobGraph.getJobID(), scheduler, this.log); +} + +@Test +public void testHappyPathWithSavepointCreationBeforeTermination() throws Exception { +assertHappyPath( +(savepointPath) -> { +testInstance.handleSavepointCreation(savepointPath, null); +testInstance.handleExecutionTermination( + Collections.singletonList(ExecutionState.FINISHED)); +}); +} + +@Test +public void testHappyPathWithSavepointCreationAfterTermination() throws Exception { +assertHappyPath( +(savepointPath) -> { +testInstance.handleExecutionTermination( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578582587 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java ## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointContextTest} tests the stop-with-savepoint functionality of {@link + * SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointContextTest extends TestLogger { + +private JobGraph jobGraph; +private DefaultScheduler scheduler; + +private StopWithSavepointOperations testInstance; + +@Before +public void setup() throws Exception { +jobGraph = new JobGraph(); + +final JobVertex jobVertex = new JobVertex("vertex #0"); +jobVertex.setInvokableClass(NoOpInvokable.class); +jobGraph.addVertex(jobVertex); + +// checkpointInterval has to be set to a value lower than Long.MAX_VALUE to enable +// periodic checkpointing - only then can we enable/disable the CheckpointCoordinator +SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, Long.MAX_VALUE - 1); +scheduler = +SchedulerTestingUtils.createSchedulerBuilder( +jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()) +.setFutureExecutor(new DirectScheduledExecutorService()) +.build(); +scheduler.startScheduling(); + +// the checkpoint scheduler is stopped before triggering the stop-with-savepoint +disableCheckpointScheduler(); + +testInstance = new StopWithSavepointContext(jobGraph.getJobID(), scheduler, this.log); Review comment: You're right. I refactored the `StopWithSavepointContextTest`. It uses `TestingSchedulerNG` now instead of `DefaultScheduler` which removes some complexity from the test code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578581744 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * {@code StopWithSavepointOperations} collects the steps for creating a savepoint and waiting for + * the job to stop. + */ +public interface StopWithSavepointOperations { + +/** + * Handles the Savepoint creation termination. + * + * @param path the path to the newly created savepoint. + * @param throwable the {@code Throwable} in case of failure. Review comment: I followed your advice and added to methods for the two cases. That helps differentiating the two cases! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578580800 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** {@code StopWithSavepointContext} implements {@link StopWithSavepointOperations}. */ +public class StopWithSavepointContext implements StopWithSavepointOperations { + +private final Logger log; + +private final SchedulerBase scheduler; +private final CheckpointCoordinator checkpointCoordinator; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private StopWithSavepointState state = StopWithSavepointState.InitialWait; +private String path; +private Set unfinishedStates; + +public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, Logger log) { +this.jobId = jobId; +this.scheduler = scheduler; +this.checkpointCoordinator = scheduler.getCheckpointCoordinator(); +this.log = log; +} + +@Override +public synchronized void handleSavepointCreation(String path, Throwable throwable) { +final StopWithSavepointState oldState = state; +state = state.onSavepointCreation(this, path, throwable); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling.", +oldState, +state); +} + +@Override +public synchronized void handleExecutionTermination( +Collection executionStates) { +final StopWithSavepointState oldState = state; +state = state.onExecutionsTermination(this, executionStates); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on execution termination handling.", +oldState, +state); +} + +@Override +public CompletableFuture getResult() { +return result; +} + +private StopWithSavepointState terminateExceptionWithGlobalFailover( +Iterable unfinishedExecutionStates) { +String errorMessage = +String.format( +"Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", +StringUtils.join(unfinishedExecutionStates, ", "), jobId); +FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + +scheduler.handleGlobalFailure(inconsistentFinalStateException); +return terminateExceptionally(inconsistentFinalStateException); +} + +private StopWithSavepointState terminateExceptionally(Throwable throwable) { +scheduler.startCheckpointScheduler(checkpointCoordinator); +result.completeExceptionally(throwable); + +return StopWithSavepointState.Final; +} + +private StopWithSavepointState terminateSuccessfully(String path) { +result.complete(path); + +return StopWithSavepointState.Final; +} + +private static Set extractUnfinishedStates( +Collection executionStates) { +return executionStates.stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); +} + +/** + * {@code StopWithSavepointState} represents the different states during the stop-with-savepoint + * operation. + * + * The state transitions are implemented in the following
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578581053 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** {@code StopWithSavepointContext} implements {@link StopWithSavepointOperations}. */ +public class StopWithSavepointContext implements StopWithSavepointOperations { + +private final Logger log; + +private final SchedulerBase scheduler; +private final CheckpointCoordinator checkpointCoordinator; +private final JobID jobId; + +private final CompletableFuture result = new CompletableFuture<>(); + +private StopWithSavepointState state = StopWithSavepointState.InitialWait; +private String path; +private Set unfinishedStates; + +public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, Logger log) { +this.jobId = jobId; +this.scheduler = scheduler; +this.checkpointCoordinator = scheduler.getCheckpointCoordinator(); +this.log = log; +} + +@Override +public synchronized void handleSavepointCreation(String path, Throwable throwable) { +final StopWithSavepointState oldState = state; +state = state.onSavepointCreation(this, path, throwable); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on savepoint creation handling.", +oldState, +state); +} + +@Override +public synchronized void handleExecutionTermination( +Collection executionStates) { +final StopWithSavepointState oldState = state; +state = state.onExecutionsTermination(this, executionStates); + +log.debug( +"Stop-with-savepoint transitioned from {} to {} on execution termination handling.", +oldState, +state); +} + +@Override +public CompletableFuture getResult() { +return result; +} + +private StopWithSavepointState terminateExceptionWithGlobalFailover( +Iterable unfinishedExecutionStates) { +String errorMessage = +String.format( +"Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", +StringUtils.join(unfinishedExecutionStates, ", "), jobId); +FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + +scheduler.handleGlobalFailure(inconsistentFinalStateException); +return terminateExceptionally(inconsistentFinalStateException); +} + +private StopWithSavepointState terminateExceptionally(Throwable throwable) { +scheduler.startCheckpointScheduler(checkpointCoordinator); +result.completeExceptionally(throwable); + +return StopWithSavepointState.Final; +} + +private StopWithSavepointState terminateSuccessfully(String path) { +result.complete(path); + +return StopWithSavepointState.Final; +} + +private static Set extractUnfinishedStates( +Collection executionStates) { +return executionStates.stream() +.filter(state -> state != ExecutionState.FINISHED) +.collect(Collectors.toSet()); +} + +/** + * {@code StopWithSavepointState} represents the different states during the stop-with-savepoint + * operation. + * + * The state transitions are implemented in the following
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578568213 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,49 +909,38 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionTerminationsFuture = +getCombinedExecutionTerminationFuture(); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - -return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) -.handleAsync( -(path, throwable) -> { -if (throwable != null) { -// restart the checkpoint coordinator if stopWithSavepoint failed. - startCheckpointScheduler(checkpointCoordinator); -throw new CompletionException(throwable); -} +StopWithSavepointContext stopWithSavepointContext = Review comment: I renamed the class into `StopWithSavepointOperationsImpl`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578567820 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -817,6 +818,126 @@ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); } +@Test +public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { +// we allow restarts right from the start since the failure is going to happen in the first +// phase (savepoint creation) of stop-with-savepoint +testRestartBackoffTimeStrategy.setCanRestart(true); + +final JobGraph jobGraph = createTwoVertexJobGraph(); +// set checkpoint timeout to a low value to simulate checkpoint expiration +enableCheckpointing(jobGraph, 10); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// we have to set a listener that checks for the termination of the checkpoint handling +OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> +checkpointAbortionWasTriggered.trigger()); + +// the failure handling has to happen in the same thread as the checkpoint coordination - +// that's why we have to instantiate a separate ThreadExecutorService here +final ScheduledExecutorService singleThreadExecutorService = +Executors.newSingleThreadScheduledExecutor(); +final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +singleThreadExecutorService); + +final DefaultScheduler scheduler = +CompletableFuture.supplyAsync( +() -> +createSchedulerAndStartScheduling( +jobGraph, mainThreadExecutor), +mainThreadExecutor) +.get(); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +final CompletableFuture stopWithSavepointFuture = +CompletableFuture.supplyAsync( +() -> { +// we have to make sure that the tasks are running before +// stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +failingExecutionAttemptId, +ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), + succeedingExecutionAttemptId, +ExecutionState.RUNNING)); + +return scheduler.stopWithSavepoint("savepoint-path", false); +}, +mainThreadExecutor) +.get(); + +checkpointTriggeredLatch.await(); + +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + +final AcknowledgeCheckpoint acknowledgeCheckpoint = +new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + +checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + +// we need to wait for the expired checkpoint to be handled +checkpointAbortionWasTriggered.await(); + +CompletableFuture.runAsync( +() -> { +scheduler.updateTaskExecutionState( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578567121 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -817,6 +818,126 @@ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); } +@Test +public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { +// we allow restarts right from the start since the failure is going to happen in the first +// phase (savepoint creation) of stop-with-savepoint +testRestartBackoffTimeStrategy.setCanRestart(true); + +final JobGraph jobGraph = createTwoVertexJobGraph(); +// set checkpoint timeout to a low value to simulate checkpoint expiration +enableCheckpointing(jobGraph, 10); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// we have to set a listener that checks for the termination of the checkpoint handling +OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> +checkpointAbortionWasTriggered.trigger()); + +// the failure handling has to happen in the same thread as the checkpoint coordination - +// that's why we have to instantiate a separate ThreadExecutorService here +final ScheduledExecutorService singleThreadExecutorService = +Executors.newSingleThreadScheduledExecutor(); +final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +singleThreadExecutorService); + +final DefaultScheduler scheduler = +CompletableFuture.supplyAsync( +() -> +createSchedulerAndStartScheduling( +jobGraph, mainThreadExecutor), +mainThreadExecutor) +.get(); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +final CompletableFuture stopWithSavepointFuture = +CompletableFuture.supplyAsync( +() -> { +// we have to make sure that the tasks are running before +// stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +failingExecutionAttemptId, +ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), + succeedingExecutionAttemptId, +ExecutionState.RUNNING)); + +return scheduler.stopWithSavepoint("savepoint-path", false); +}, +mainThreadExecutor) +.get(); + +checkpointTriggeredLatch.await(); + +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + +final AcknowledgeCheckpoint acknowledgeCheckpoint = +new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + +checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + +// we need to wait for the expired checkpoint to be handled +checkpointAbortionWasTriggered.await(); + +CompletableFuture.runAsync( +() -> { +scheduler.updateTaskExecutionState( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578561800 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -835,8 +837,15 @@ public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) { mainThreadExecutor); } -private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { +@Override Review comment: The `CheckpointCoordinator` does not change during the lifetime of an `ExecutionGraph`. That's why, I thought of making this change to introduce the `CheckpointScheduling` interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r577158379 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); +taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + +try { +stopWithSavepointFuture.get(); +fail("An exception is expected."); +} catch (ExecutionException e) { +Optional flinkException = +ExceptionUtils.findThrowable(e, FlinkException.class); + +assertTrue(flinkException.isPresent()); +assertThat( +flinkException.get().getMessage(), +is( +String.format( +"Inconsistent execution state after stopping with savepoint. A
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r577157152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handleAsync( +(executionStates, throwable) -> { +Set nonFinishedStates = + extractNonFinishedStates( + executionStates); +if (throwable != null) { Review comment: @rmetzger you're right - the Execution never completes exceptionally. I removed this code path in the `StopWithSavepointOperations` refactoring. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r577152942 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); Review comment: `StopWithSavepointContextTest` replaces this test focussing more on the actually functionality we want to test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r577148145 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); +taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + +try { +stopWithSavepointFuture.get(); +fail("An exception is expected."); +} catch (ExecutionException e) { +Optional flinkException = +ExceptionUtils.findThrowable(e, FlinkException.class); + +assertTrue(flinkException.isPresent()); +assertThat( +flinkException.get().getMessage(), +is( +String.format( +"Inconsistent execution state after stopping with savepoint. A
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r577147425 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handleAsync( +(executionStates, throwable) -> { +Set nonFinishedStates = + extractNonFinishedStates( + executionStates); +if (throwable != null) { +log.info( +"Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); +throw new CompletionException(throwable); +} else if (!nonFinishedStates.isEmpty()) { +log.info( +"Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); +FlinkException + inconsistentFinalStateException = +new FlinkException( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576628701 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -817,6 +818,126 @@ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); } +@Test +public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { +// we allow restarts right from the start since the failure is going to happen in the first +// phase (savepoint creation) of stop-with-savepoint +testRestartBackoffTimeStrategy.setCanRestart(true); + +final JobGraph jobGraph = createTwoVertexJobGraph(); +// set checkpoint timeout to a low value to simulate checkpoint expiration +enableCheckpointing(jobGraph, 10); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// we have to set a listener that checks for the termination of the checkpoint handling +OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> +checkpointAbortionWasTriggered.trigger()); + +// the failure handling has to happen in the same thread as the checkpoint coordination - +// that's why we have to instantiate a separate ThreadExecutorService here +final ScheduledExecutorService singleThreadExecutorService = +Executors.newSingleThreadScheduledExecutor(); +final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +singleThreadExecutorService); + +final DefaultScheduler scheduler = +CompletableFuture.supplyAsync( +() -> +createSchedulerAndStartScheduling( +jobGraph, mainThreadExecutor), +mainThreadExecutor) +.get(); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +final CompletableFuture stopWithSavepointFuture = +CompletableFuture.supplyAsync( +() -> { +// we have to make sure that the tasks are running before +// stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +failingExecutionAttemptId, +ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), + succeedingExecutionAttemptId, +ExecutionState.RUNNING)); + +return scheduler.stopWithSavepoint("savepoint-path", false); +}, +mainThreadExecutor) +.get(); + +checkpointTriggeredLatch.await(); + +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + +final AcknowledgeCheckpoint acknowledgeCheckpoint = +new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + +checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + +// we need to wait for the expired checkpoint to be handled +checkpointAbortionWasTriggered.await(); + +CompletableFuture.runAsync( +() -> { +scheduler.updateTaskExecutionState( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576617075 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -817,6 +818,126 @@ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); } +@Test +public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { +// we allow restarts right from the start since the failure is going to happen in the first +// phase (savepoint creation) of stop-with-savepoint +testRestartBackoffTimeStrategy.setCanRestart(true); + +final JobGraph jobGraph = createTwoVertexJobGraph(); +// set checkpoint timeout to a low value to simulate checkpoint expiration +enableCheckpointing(jobGraph, 10); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// we have to set a listener that checks for the termination of the checkpoint handling +OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> +checkpointAbortionWasTriggered.trigger()); + +// the failure handling has to happen in the same thread as the checkpoint coordination - +// that's why we have to instantiate a separate ThreadExecutorService here +final ScheduledExecutorService singleThreadExecutorService = +Executors.newSingleThreadScheduledExecutor(); +final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +singleThreadExecutorService); + +final DefaultScheduler scheduler = +CompletableFuture.supplyAsync( +() -> +createSchedulerAndStartScheduling( +jobGraph, mainThreadExecutor), +mainThreadExecutor) +.get(); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +final CompletableFuture stopWithSavepointFuture = +CompletableFuture.supplyAsync( +() -> { +// we have to make sure that the tasks are running before +// stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +failingExecutionAttemptId, +ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), + succeedingExecutionAttemptId, +ExecutionState.RUNNING)); + +return scheduler.stopWithSavepoint("savepoint-path", false); +}, +mainThreadExecutor) +.get(); + +checkpointTriggeredLatch.await(); + +final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + +final AcknowledgeCheckpoint acknowledgeCheckpoint = +new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + +checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + +// we need to wait for the expired checkpoint to be handled +checkpointAbortionWasTriggered.await(); + +CompletableFuture.runAsync( +() -> { +scheduler.updateTaskExecutionState( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576616131 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); +taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + +try { +stopWithSavepointFuture.get(); +fail("An exception is expected."); +} catch (ExecutionException e) { +Optional flinkException = +ExceptionUtils.findThrowable(e, FlinkException.class); + +assertTrue(flinkException.isPresent()); +assertThat( +flinkException.get().getMessage(), +is( +String.format( +"Inconsistent execution state after stopping with savepoint. A
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576616131 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); +taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + +try { +stopWithSavepointFuture.get(); +fail("An exception is expected."); +} catch (ExecutionException e) { +Optional flinkException = +ExceptionUtils.findThrowable(e, FlinkException.class); + +assertTrue(flinkException.isPresent()); +assertThat( +flinkException.get().getMessage(), +is( +String.format( +"Inconsistent execution state after stopping with savepoint. A
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576615862 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); +taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + +try { +stopWithSavepointFuture.get(); +fail("An exception is expected."); +} catch (ExecutionException e) { +Optional flinkException = +ExceptionUtils.findThrowable(e, FlinkException.class); + +assertTrue(flinkException.isPresent()); +assertThat( +flinkException.get().getMessage(), +is( +String.format( +"Inconsistent execution state after stopping with savepoint. A
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576278350 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handleAsync( +(executionStates, throwable) -> { +Set nonFinishedStates = + extractNonFinishedStates( + executionStates); +if (throwable != null) { +log.info( +"Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); +throw new CompletionException(throwable); +} else if (!nonFinishedStates.isEmpty()) { +log.info( +"Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); +FlinkException + inconsistentFinalStateException = +new FlinkException( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576278350 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handleAsync( +(executionStates, throwable) -> { +Set nonFinishedStates = + extractNonFinishedStates( + executionStates); +if (throwable != null) { +log.info( +"Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); +throw new CompletionException(throwable); +} else if (!nonFinishedStates.isEmpty()) { +log.info( +"Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); +FlinkException + inconsistentFinalStateException = +new FlinkException( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576241481 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); Review comment: That's what I wanted to make clear with the comment above. But I could try to make the comment more explicit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576239135 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handleAsync( Review comment: Thanks for the remark, @rmetzger. I thought about it again: We cannot move the `handleAsync` out of the `thenCompose` because the error handling that is triggered by the `handleAsync` should only be called if the savepoint creation succeeds. It would be also triggered by a failure which happened during Savepoint creation that caused all executions to terminate. The `CheckpointCoordinator` is triggering a restart in case of a Savepoint failure. That's why, the `testStopWithSavepointFailingWithDeclinedCheckpoint` fails. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576239135 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handleAsync( Review comment: Thanks for the remark, @rmetzger. I thought about it again: We cannot move the `handleAsync` out of the `thenCompose` because the error handling that is triggered by the `handleAsync` should only be called if the savepoint creation succeeds. It would be also triggered by a failure which happened during Savepoint creation that caused all executions to terminate. The `CheckpointCoordinator` is triggering a restart in case of a Savepoint failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r575246034 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } +@Test +public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { +// initially, we don't allow any restarts since the first phase (savepoint creation) +// succeeds without any failures +testRestartBackoffTimeStrategy.setCanRestart(false); + +final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + +final SimpleAckingTaskManagerGateway taskManagerGateway = +new SimpleAckingTaskManagerGateway(); +final CountDownLatch checkpointTriggeredLatch = +getCheckpointTriggeredLatch(taskManagerGateway); + +// collect executions to which the checkpoint completion was confirmed +final List executionAttemptIdsWithCompletedCheckpoint = +new ArrayList<>(); +taskManagerGateway.setNotifyCheckpointCompleteConsumer( +(executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); +taskManagerGateway.setNotifyCheckpointAbortedConsumer( +(ignored0, ignored1, ignored2, ignored3) -> { +throw new UnsupportedOperationException("notifyCheckpointAborted was called"); +}); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) +.getCurrentExecutionAttempt() +.getAttemptId(); +final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); + +// we have to make sure that the tasks are running before stop-with-savepoint is triggered +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + +final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + +// trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway +final CompletableFuture stopWithSavepointFuture = +scheduler.stopWithSavepoint(savepointFolder, false); +checkpointTriggeredLatch.await(); + +acknowledgePendingCheckpoint(scheduler, 1); + +assertThat( +"Both the executions where notified about the completed checkpoint.", +executionAttemptIdsWithCompletedCheckpoint, +containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + +// The savepoint creation succeeded a failure happens in the second phase when finishing +// the tasks. That's why, the restarting policy is enabled. +testRestartBackoffTimeStrategy.setCanRestart(true); + +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); +scheduler.updateTaskExecutionState( +new TaskExecutionState( +jobGraph.getJobID(), +succeedingExecutionAttemptId, +ExecutionState.FINISHED)); + +// the restarts due to local failure handling and global job fail-over are triggered +assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); +taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + +try { +stopWithSavepointFuture.get(); +fail("An exception is expected."); +} catch (ExecutionException e) { +Optional flinkException = +ExceptionUtils.findThrowable(e, FlinkException.class); + +assertTrue(flinkException.isPresent()); +assertThat( +flinkException.get().getMessage(), +is( +String.format( +"Inconsistent execution state after stopping with savepoint. A
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r575043953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,56 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), Review comment: Yeah, it took me also some digging to find a connection. I found `executionGraph.getSchedulingTopology().getAllPipelinedRegions()`. Not sure, though, whether this works conceptually. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574578211 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,56 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), Review comment: I'm not that familiar with the regions concept, yet. AFAIK, regions are collections of tasks that have to run together, right? Do they share the same `ExecutionState`? I guess, @tillrohrmann could answer that question. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574569882 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ## @@ -600,6 +616,171 @@ public void testSubmitWithUnknownSavepointPath() throws Exception { } } +@Test +public void testStopWithSavepointFailingInSnapshotCreation() throws Exception { +testStopWithFailingSourceInOnePipeline( +new SnapshotFailingInfiniteTestSource(), +folder.newFolder(), +// two restarts expected: +// 1. task failure restart +// 2. job failover triggered by the CheckpointFailureManager +2, +assertInSnapshotCreationFailure()); +} + +@Test +public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception { Review comment: This is fixed after letting the error handling run in the `mainThreadExecutor` again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574568680 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,56 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handle( +(executionStates, throwable) -> { +Set nonFinishedStates = + extractNonFinishedStates( + executionStates); +if (throwable != null) { +log.info( +"Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); +throw new CompletionException(throwable); +} else if (!nonFinishedStates.isEmpty()) { +log.info( +"Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); +FlinkException + inconsistentFinalStateException = +new FlinkException( +
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574523861 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ## @@ -600,6 +616,171 @@ public void testSubmitWithUnknownSavepointPath() throws Exception { } } +@Test +public void testStopWithSavepointFailingInSnapshotCreation() throws Exception { +testStopWithFailingSourceInOnePipeline( +new SnapshotFailingInfiniteTestSource(), +folder.newFolder(), +// two restarts expected: +// 1. task failure restart +// 2. job failover triggered by the CheckpointFailureManager +2, +assertInSnapshotCreationFailure()); +} + +@Test +public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception { Review comment: Good catch. I missed running them individually. This test failure would be fixed if we apply the change you suggested above to run the error handling in the `mainThreadExecutor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure
XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574522921 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -908,38 +909,56 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); +final CompletableFuture> executionGraphTerminationFuture = +FutureUtils.combineAll( +StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), +false) + .map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getTerminalStateFuture) +.collect(Collectors.toList())); + final CompletableFuture savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); -final CompletableFuture terminationFuture = -executionGraph -.getTerminationFuture() -.handle( -(jobstatus, throwable) -> { -if (throwable != null) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: {}", -jobGraph.getJobID(), -throwable.getMessage()); -throw new CompletionException(throwable); -} else if (jobstatus != JobStatus.FINISHED) { -log.info( -"Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", -jobGraph.getJobID(), -jobstatus); -throw new CompletionException( -new FlinkException( -"Reached state " -+ jobstatus -+ " instead of FINISHED.")); -} -return jobstatus; -}); - return savepointFuture -.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) +.thenCompose( +path -> +executionGraphTerminationFuture +.handle( +(executionStates, throwable) -> { +Set nonFinishedStates = + extractNonFinishedStates( + executionStates); +if (throwable != null) { +log.info( +"Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); +throw new CompletionException(throwable); +} else if (!nonFinishedStates.isEmpty()) { +log.info( +"Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); +FlinkException + inconsistentFinalStateException = +new FlinkException( +