[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-25 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582996083



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);

Review comment:
   This issue was addressed in @rmetzger 
[comment](https://github.com/apache/flink/pull/14847#discussion_r581918367)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-25 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582992549



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-25 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582991828



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-25 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582991307



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+private JobGraph jobGraph;
+private DefaultScheduler scheduler;
+
+private StopWithSavepointOperations testInstance;
+
+@Before
+public void setup() throws Exception {
+jobGraph = new JobGraph();
+
+final JobVertex jobVertex = new JobVertex("vertex #0");
+jobVertex.setInvokableClass(NoOpInvokable.class);
+jobGraph.addVertex(jobVertex);
+
+// checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+// periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+scheduler =
+SchedulerTestingUtils.createSchedulerBuilder(
+jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+.setFutureExecutor(new 
DirectScheduledExecutorService())
+.build();
+scheduler.startScheduling();
+
+// the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+disableCheckpointScheduler();
+
+testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+}
+
+@Test
+public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleSavepointCreation(savepointPath, null);
+testInstance.handleExecutionTermination(
+
Collections.singletonList(ExecutionState.FINISHED));
+});
+}
+
+@Test
+public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleExecutionTermination(
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-25 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582990665



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+private JobGraph jobGraph;
+private DefaultScheduler scheduler;
+
+private StopWithSavepointOperations testInstance;
+
+@Before
+public void setup() throws Exception {
+jobGraph = new JobGraph();
+
+final JobVertex jobVertex = new JobVertex("vertex #0");
+jobVertex.setInvokableClass(NoOpInvokable.class);
+jobGraph.addVertex(jobVertex);
+
+// checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+// periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+scheduler =
+SchedulerTestingUtils.createSchedulerBuilder(
+jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+.setFutureExecutor(new 
DirectScheduledExecutorService())
+.build();
+scheduler.startScheduling();
+
+// the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+disableCheckpointScheduler();
+
+testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+}
+
+@Test
+public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleSavepointCreation(savepointPath, null);
+testInstance.handleExecutionTermination(
+
Collections.singletonList(ExecutionState.FINISHED));
+});
+}
+
+@Test
+public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleExecutionTermination(
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-25 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582989152



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(
+CompletedCheckpoint completedSavepoint, Throwable throwable) {
+if (throwable != null) {
+checkArgument(
+completedSavepoint == null,
+"No savepoint should be provided if a throwable is 
passed.");
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+}
+}
+
+@Override
+public void handleExecutionsTermination(Collection 
terminatedExecutionStates) {
+final Set notFinishedExecutionStates =
+checkNotNull(terminatedExecutionStates).stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+
+if (notFinishedExecutionStates.isEmpty()) {
+handleExecutionsFinished();
+} else {
+handleAnyExecutionNotFinished(notFinishedExecutionStates);
+}
+}
+
+private void handleSavepointCreationSuccess(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-24 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r582029660



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -835,8 +840,21 @@ public void updateAccumulators(final AccumulatorSnapshot 
accumulatorSnapshot) {
 mainThreadExecutor);
 }
 
-private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
+@Override
+public void stopCheckpointScheduler() {
+Preconditions.checkState(
+getCheckpointCoordinator() != null,
+"Checkpointing cannot be stopped since it's not enabled.");
+getCheckpointCoordinator().stopCheckpointScheduler();
+}
+
+@Override
+public void startCheckpointScheduler() {
 mainThreadExecutor.assertRunningInMainThread();
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator();
+Preconditions.checkState(
+checkpointCoordinator != null,
+"Checkpointing cannot be started since it's not enabled.");

Review comment:
   I went for the first option. Unfortunately, I had to remove the 
`CheckpointScheduling` interface from the `CheckpointCoordinator` due to that 
since the `CheckpointCoordinator` is throwing an exception in case of a 
shutdown.
   
   The approach with having a method for checking whether Checkpoint Scheduling 
is enabled wasn't better either because it would have meant: Adding this check 
would have exposed the shutdown behavior of the `CheckpointCoordinator` which, 
as a consequence, would have required to add the `shutdown` method to the 
interface as well. This method doesn't work for the Scheduler, though. Hence, I 
decided to go for the approach that checks the availability of the 
`CheckpointCoordinator` internally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-24 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r581982820



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -835,8 +840,21 @@ public void updateAccumulators(final AccumulatorSnapshot 
accumulatorSnapshot) {
 mainThreadExecutor);
 }
 
-private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
+@Override
+public void stopCheckpointScheduler() {
+Preconditions.checkState(
+getCheckpointCoordinator() != null,
+"Checkpointing cannot be stopped since it's not enabled.");
+getCheckpointCoordinator().stopCheckpointScheduler();
+}
+
+@Override
+public void startCheckpointScheduler() {
 mainThreadExecutor.assertRunningInMainThread();
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator();
+Preconditions.checkState(
+checkpointCoordinator != null,
+"Checkpointing cannot be started since it's not enabled.");

Review comment:
   I agree: That's a valid scenario. I'm just wondering now whether we 
should simply make the call optional in that case. We could just add another 
condition to the if statement. I don't like that because it makes the interface 
less specific. On the other hand one could argue that starting the checkpoint 
scheduling was always optional due to the condition that checks whether 
periodic checkpointing is enabled.
   
   Another alternative would be to extend the interface by a method to check 
the state of the checkpointing (like `CheckpointCoordinator.shutdown` and only 
re-enable it if it's not shutdown. @rmetzger any thoughts on this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-24 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r581982820



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -835,8 +840,21 @@ public void updateAccumulators(final AccumulatorSnapshot 
accumulatorSnapshot) {
 mainThreadExecutor);
 }
 
-private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
+@Override
+public void stopCheckpointScheduler() {
+Preconditions.checkState(
+getCheckpointCoordinator() != null,
+"Checkpointing cannot be stopped since it's not enabled.");
+getCheckpointCoordinator().stopCheckpointScheduler();
+}
+
+@Override
+public void startCheckpointScheduler() {
 mainThreadExecutor.assertRunningInMainThread();
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator();
+Preconditions.checkState(
+checkpointCoordinator != null,
+"Checkpointing cannot be started since it's not enabled.");

Review comment:
   I agree: That's a valid scenario. I'm just wondering now whether we 
should simply make the call optional in that case. We could just add another 
condition to the if statement. I don't like that because it makes the interface 
less specific. On the other hand one could argue that starting the checkpoint 
scheduling was always optional due to the condition that checks whether 
periodic checkpointing is enabled.
   
   Handling it We cannot just wait for the termination to restart the 
scheduling due to the happy path. Another alternative would be to extend the 
interface by a method to check the state of the checkpointing (like 
`CheckpointCoordinator.shutdown` and only re-enable it if it's not shutdown. 
@rmetzger any thoughts on this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-24 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r581945379



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(
+CompletedCheckpoint completedSavepoint, Throwable throwable) {
+if (throwable != null) {
+checkArgument(
+completedSavepoint == null,
+"No savepoint should be provided if a throwable is 
passed.");
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+}
+}
+
+@Override
+public void handleExecutionsTermination(Collection 
terminatedExecutionStates) {
+final Set notFinishedExecutionStates =
+checkNotNull(terminatedExecutionStates).stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+
+if (notFinishedExecutionStates.isEmpty()) {
+handleExecutionsFinished();
+} else {
+handleAnyExecutionNotFinished(notFinishedExecutionStates);
+}
+}
+
+private void handleSavepointCreationSuccess(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-24 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r581941734



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -541,6 +544,59 @@ public void handleGlobalFailure() {
 assertThat(deployedExecutionVertices, contains(executionVertexId, 
executionVertexId));
 }
 
+/**
+ * This test covers the use-case where a global fail-over is followed by a 
local task failure.
+ * It verifies (besides checking the expected deployments) that the assert 
in the global
+ * recovery handling of {@link SchedulerBase#restoreState} is not 
triggered due to version
+ * updates.
+ */
+@Test
+public void handleGlobalFailureWithLocalFailure() {

Review comment:
   We cannot remove [assert in 
SchedulerBase.restoreState](https://github.com/XComp/flink/blob/7cbd97f815d3bfd4715ddd8cb1d88f92f05a282a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L419)
 as the following operation relies on all vertices being passed for restoring 
the state. Accepting less `ExecutionJobVertices` would mean that not all tasks 
would get restored.
   
   I added this test to verify the case where a local task failure proceeds a 
global fail-over before actually restarting the tasks. The local task failure 
would not trigger a version update because of the 
[isNotifiable](https://github.com/XComp/flink/blob/7cbd97f815d3bfd4715ddd8cb1d88f92f05a282a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L654)
 check in `SchedulerBase.updateTaskExecutionState`. This will only return 
`true` (and trigger the internal task state change) if the corresponding 
`ExecutionVertex` is either in state `FINISHED` or `FAILED`. The previous 
global fail-over has switched the state already to `CANCELING`. The local task 
failure will finally switch the state to `CANCELED`. Hence, no internal task 
state change is processed for the local task failure. No version upgrade is 
triggered and the global fail-over restart restarts the all vertices as 
expected.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-23 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r581376284



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(
+CompletedCheckpoint completedSavepoint, Throwable throwable) {
+if (throwable != null) {
+checkArgument(
+completedSavepoint == null,
+"No savepoint should be provided if a throwable is 
passed.");
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+}
+}
+
+@Override
+public void handleExecutionsTermination(Collection 
terminatedExecutionStates) {
+final Set notFinishedExecutionStates =
+checkNotNull(terminatedExecutionStates).stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+
+if (notFinishedExecutionStates.isEmpty()) {
+handleExecutionsFinished();
+} else {
+handleAnyExecutionNotFinished(notFinishedExecutionStates);
+}
+}
+
+private void handleSavepointCreationSuccess(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-23 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r580901980



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(

Review comment:
   I realized that I refactor the corresponding 
`StopWithSavepointTerminationManagerTest` accordingly. We don't need all the 
different test cases there. I'm fixing this now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-22 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r580542102



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(
+CompletedCheckpoint completedSavepoint, Throwable throwable) {
+if (throwable != null) {
+checkArgument(
+completedSavepoint == null,
+"No savepoint should be provided if a throwable is 
passed.");
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+}
+}
+
+@Override
+public void handleExecutionsTermination(Collection 
terminatedExecutionStates) {
+final Set notFinishedExecutionStates =
+checkNotNull(terminatedExecutionStates).stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+
+if (notFinishedExecutionStates.isEmpty()) {
+handleExecutionsFinished();
+} else {
+handleAnyExecutionNotFinished(notFinishedExecutionStates);
+}
+}
+
+private void handleSavepointCreationSuccess(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-22 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r580523907



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(
+CompletedCheckpoint completedSavepoint, Throwable throwable) {
+if (throwable != null) {
+checkArgument(
+completedSavepoint == null,
+"No savepoint should be provided if a throwable is 
passed.");
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+}
+}
+
+@Override
+public void handleExecutionsTermination(Collection 
terminatedExecutionStates) {
+final Set notFinishedExecutionStates =
+checkNotNull(terminatedExecutionStates).stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+
+if (notFinishedExecutionStates.isEmpty()) {
+handleExecutionsFinished();
+} else {
+handleAnyExecutionNotFinished(notFinishedExecutionStates);
+}
+}
+
+private void handleSavepointCreationSuccess(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-22 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r580520576



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(
+CompletedCheckpoint completedSavepoint, Throwable throwable) {
+if (throwable != null) {
+checkArgument(
+completedSavepoint == null,
+"No savepoint should be provided if a throwable is 
passed.");
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreationSuccess(checkNotNull(completedSavepoint));
+}
+}
+
+@Override
+public void handleExecutionsTermination(Collection 
terminatedExecutionStates) {
+final Set notFinishedExecutionStates =
+checkNotNull(terminatedExecutionStates).stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+
+if (notFinishedExecutionStates.isEmpty()) {
+handleExecutionsFinished();
+} else {
+handleAnyExecutionNotFinished(notFinishedExecutionStates);
+}
+}
+
+private void handleSavepointCreationSuccess(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-22 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r580518051



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+private final Executor ioExecutor;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+JobID jobId, S schedulerWithCheckpointing, Executor ioExecutor, 
Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
ioExecutor, log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+JobID jobId,
+SchedulerNG scheduler,
+CheckpointScheduling checkpointScheduling,
+Executor ioExecutor,
+Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.ioExecutor = checkNotNull(ioExecutor);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture getSavepointPath() {
+return result;
+}
+
+@Override
+public void handleSavepointCreation(

Review comment:
   `StopWithSavepointTerminationHandler` and 
`StopWithSavepointTerminationManager` are separated in a way that the manager 
focuses solely on the order of the completion whereas the handler deals with 
the result of the termination. Having `StopWithSavepointTerminationHandler` 
offer two different methods for success and failure case would mean that we 
have to put the logic of differentiating these two into the manager. That would 
destroy the separation of responsibilities partially. 
   As a consequence, we would have to distinguish between the failure case and 
the success case in the manager's test class again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-22 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r580511819



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImplTest} tests the 
stop-with-savepoint functionality
+ * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
+
+private static final JobID JOB_ID = new JobID();
+
+private final TestingCheckpointScheduling checkpointScheduling =
+new TestingCheckpointScheduling(false);
+
+private StopWithSavepointTerminationHandlerImpl 
createTestInstanceFailingOnGlobalFailOver() {
+return createTestInstance(
+throwableCausingGlobalFailOver -> fail("No global failover 
should be triggered."));
+}
+
+private StopWithSavepointTerminationHandlerImpl createTestInstance(
+Consumer handleGlobalFailureConsumer) {
+// checkpointing should be always stopped before initiating 
stop-with-savepoint
+checkpointScheduling.stopCheckpointScheduler();
+
+final SchedulerNG scheduler =
+TestingSchedulerNG.newBuilder()
+
.setHandleGlobalFailureConsumer(handleGlobalFailureConsumer)
+.build();
+return new StopWithSavepointTerminationHandlerImpl(
+JOB_ID, scheduler, checkpointScheduling, 
Executors.directExecutor(), log);
+}
+
+@Test
+public void testHappyPath() throws ExecutionException, 
InterruptedException {
+final StopWithSavepointTerminationHandlerImpl testInstance =
+createTestInstanceFailingOnGlobalFailOver();
+
+final EmptyStreamStateHandle streamStateHandle = new 
EmptyStreamStateHandle();
+final CompletedCheckpoint completedSavepoint = 
createCompletedSavepoint(streamStateHandle);
+testInstance.handleSavepointCreation(completedSavepoint, null);
+
testInstance.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED));
+
+assertThat(
+testInstance.getSavepointPath().get(), 
is(completedSavepoint.getExternalPointer()));
+
+assertFalse(
+"The savepoint should not have been discarded.", 
streamStateHandle.isDisposed());
+assertFalse("Checkpoint scheduling should be disabled.", 
checkpointScheduling.isEnabled());
+}
+
+@Test
+public void testSavepointCreationFailure() {
+final 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-20 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r579664779



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId,
+@Nonnull SchedulerNG scheduler,
+@Nonnull CheckpointScheduling checkpointScheduling,
+@Nonnull Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture handlesStopWithSavepointTermination(
+CompletableFuture completedSavepointFuture,
+CompletableFuture> 
terminatedExecutionsFuture,
+ComponentMainThreadExecutor mainThreadExecutor) {
+completedSavepointFuture
+.whenCompleteAsync(
+(completedSavepoint, throwable) -> {
+if (throwable != null) {
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreation(completedSavepoint);
+}
+},
+mainThreadExecutor)
+.thenCompose(
+aVoid ->
+terminatedExecutionsFuture.thenAcceptAsync(

Review comment:
   It's not. Correct me, if I'm wrong: The execution will happen in the 
main thread if we use a non-Async operation (like the `thenRun` you suggested) 
since the previous operation is forced to run in the main thread already 
through `handleAsync`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-20 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r579664565



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId,
+@Nonnull SchedulerNG scheduler,
+@Nonnull CheckpointScheduling checkpointScheduling,
+@Nonnull Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture handlesStopWithSavepointTermination(
+CompletableFuture completedSavepointFuture,
+CompletableFuture> 
terminatedExecutionsFuture,
+ComponentMainThreadExecutor mainThreadExecutor) {
+completedSavepointFuture
+.whenCompleteAsync(
+(completedSavepoint, throwable) -> {
+if (throwable != null) {
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreation(completedSavepoint);
+}
+},
+mainThreadExecutor)
+.thenCompose(
+aVoid ->
+terminatedExecutionsFuture.thenAcceptAsync(
+this::handleExecutionsTermination, 
mainThreadExecutor));

Review comment:
   Good point. I refactored it and introduced a 
`StopWithSavepointTerminationManager` to enforce the right execution order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-20 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r579663983



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId,
+@Nonnull SchedulerNG scheduler,
+@Nonnull CheckpointScheduling checkpointScheduling,
+@Nonnull Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture handlesStopWithSavepointTermination(
+CompletableFuture completedSavepointFuture,
+CompletableFuture> 
terminatedExecutionsFuture,
+ComponentMainThreadExecutor mainThreadExecutor) {
+completedSavepointFuture
+.whenCompleteAsync(
+(completedSavepoint, throwable) -> {
+if (throwable != null) {
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreation(completedSavepoint);
+}
+},
+mainThreadExecutor)
+.thenCompose(
+aVoid ->
+terminatedExecutionsFuture.thenAcceptAsync(
+this::handleExecutionsTermination, 
mainThreadExecutor));
+
+return result;
+}
+
+private synchronized void handleSavepointCreation(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling for job {}.",
+oldState,
+state,
+jobId);
+}
+
+private synchronized void handleSavepointCreationFailure(Throwable 
throwable) {
+final 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-19 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r579298421



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+implements StopWithSavepointTerminationHandler {
+
+private final Logger log;
+
+private final SchedulerNG scheduler;
+private final CheckpointScheduling checkpointScheduling;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private State state = new WaitingForSavepoint();
+
+public  
StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+}
+
+@VisibleForTesting
+StopWithSavepointTerminationHandlerImpl(
+@Nonnull JobID jobId,
+@Nonnull SchedulerNG scheduler,
+@Nonnull CheckpointScheduling checkpointScheduling,
+@Nonnull Logger log) {
+this.jobId = checkNotNull(jobId);
+this.scheduler = checkNotNull(scheduler);
+this.checkpointScheduling = checkNotNull(checkpointScheduling);
+this.log = checkNotNull(log);
+}
+
+@Override
+public CompletableFuture handlesStopWithSavepointTermination(
+CompletableFuture completedSavepointFuture,
+CompletableFuture> 
terminatedExecutionsFuture,
+ComponentMainThreadExecutor mainThreadExecutor) {
+completedSavepointFuture
+.whenCompleteAsync(
+(completedSavepoint, throwable) -> {
+if (throwable != null) {
+handleSavepointCreationFailure(throwable);
+} else {
+handleSavepointCreation(completedSavepoint);
+}
+},
+mainThreadExecutor)
+.thenCompose(
+aVoid ->
+terminatedExecutionsFuture.thenAcceptAsync(
+this::handleExecutionsTermination, 
mainThreadExecutor));
+
+return result;
+}
+
+private synchronized void handleSavepointCreation(CompletedCheckpoint 
completedCheckpoint) {
+final State oldState = state;
+state = state.onSavepointCreation(completedCheckpoint);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling for job {}.",
+oldState,
+state,
+jobId);
+}
+
+private synchronized void handleSavepointCreationFailure(Throwable 
throwable) {
+final 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-19 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r579292914



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImplTest.java
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.QuadConsumer;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImplTest} tests the 
stop-with-savepoint functionality
+ * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
+
+@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+private static final JobID JOB_ID = new JobID();
+
+private final TestingCheckpointScheduling checkpointScheduling =
+new TestingCheckpointScheduling(false);
+
+private StopWithSavepointTerminationHandlerImpl createTestInstance(
+Consumer handleGlobalFailureConsumer) {
+// checkpointing should be always stopped before initiating 
stop-with-savepoint
+checkpointScheduling.stopCheckpointScheduler();
+
+final SchedulerNG scheduler =
+TestingSchedulerNG.newBuilder()
+
.setHandleGlobalFailureConsumer(handleGlobalFailureConsumer)
+.build();
+return new StopWithSavepointTerminationHandlerImpl(
+JOB_ID, scheduler, checkpointScheduling, log);
+}
+
+@Test
+public void 
testHappyPathWithSavepointCreationBeforeSuccessfulTermination() throws 
Exception {
+assertHappyPath(
+(completedSavepoint,
+completedSavepointFuture,
+terminatedExecutionStates,
+executionsTerminatedFuture) -> {
+completedSavepointFuture.complete(completedSavepoint);
+
executionsTerminatedFuture.complete(terminatedExecutionStates);
+});
+}
+
+@Test
+public void testHappyPathWithSavepointCreationAfterSuccessfulTermination() 
throws Exception {
+assertHappyPath(
+(completedSavepoint,
+completedSavepointFuture,
+terminatedExecutionStates,
+executionsTerminatedFuture) -> {
+
executionsTerminatedFuture.complete(terminatedExecutionStates);
+completedSavepointFuture.complete(completedSavepoint);
+});
+}
+
+@Test
+public void testSavepointCreationFailureBeforeSuccessfulTermination() 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578584522



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578584067



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+private JobGraph jobGraph;
+private DefaultScheduler scheduler;
+
+private StopWithSavepointOperations testInstance;
+
+@Before
+public void setup() throws Exception {
+jobGraph = new JobGraph();
+
+final JobVertex jobVertex = new JobVertex("vertex #0");
+jobVertex.setInvokableClass(NoOpInvokable.class);
+jobGraph.addVertex(jobVertex);
+
+// checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+// periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+scheduler =
+SchedulerTestingUtils.createSchedulerBuilder(
+jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+.setFutureExecutor(new 
DirectScheduledExecutorService())
+.build();
+scheduler.startScheduling();
+
+// the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+disableCheckpointScheduler();
+
+testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+}
+
+@Test
+public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleSavepointCreation(savepointPath, null);
+testInstance.handleExecutionTermination(
+
Collections.singletonList(ExecutionState.FINISHED));
+});
+}
+
+@Test
+public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+assertHappyPath(
+(savepointPath) -> {
+testInstance.handleExecutionTermination(
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578582587



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+private JobGraph jobGraph;
+private DefaultScheduler scheduler;
+
+private StopWithSavepointOperations testInstance;
+
+@Before
+public void setup() throws Exception {
+jobGraph = new JobGraph();
+
+final JobVertex jobVertex = new JobVertex("vertex #0");
+jobVertex.setInvokableClass(NoOpInvokable.class);
+jobGraph.addVertex(jobVertex);
+
+// checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+// periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+scheduler =
+SchedulerTestingUtils.createSchedulerBuilder(
+jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+.setFutureExecutor(new 
DirectScheduledExecutorService())
+.build();
+scheduler.startScheduling();
+
+// the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+disableCheckpointScheduler();
+
+testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);

Review comment:
   You're right. I refactored the `StopWithSavepointContextTest`. It uses 
`TestingSchedulerNG` now instead of `DefaultScheduler` which removes some 
complexity from the test code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578581744



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointOperations} collects the steps for creating a 
savepoint and waiting for
+ * the job to stop.
+ */
+public interface StopWithSavepointOperations {
+
+/**
+ * Handles the Savepoint creation termination.
+ *
+ * @param path the path to the newly created savepoint.
+ * @param throwable the {@code Throwable} in case of failure.

Review comment:
   I followed your advice and added to methods for the two cases. That 
helps differentiating the two cases!  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578580800



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578581053



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+private final Logger log;
+
+private final SchedulerBase scheduler;
+private final CheckpointCoordinator checkpointCoordinator;
+private final JobID jobId;
+
+private final CompletableFuture result = new CompletableFuture<>();
+
+private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+private String path;
+private Set unfinishedStates;
+
+public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+this.jobId = jobId;
+this.scheduler = scheduler;
+this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+this.log = log;
+}
+
+@Override
+public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+final StopWithSavepointState oldState = state;
+state = state.onSavepointCreation(this, path, throwable);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+oldState,
+state);
+}
+
+@Override
+public synchronized void handleExecutionTermination(
+Collection executionStates) {
+final StopWithSavepointState oldState = state;
+state = state.onExecutionsTermination(this, executionStates);
+
+log.debug(
+"Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+oldState,
+state);
+}
+
+@Override
+public CompletableFuture getResult() {
+return result;
+}
+
+private StopWithSavepointState terminateExceptionWithGlobalFailover(
+Iterable unfinishedExecutionStates) {
+String errorMessage =
+String.format(
+"Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+scheduler.handleGlobalFailure(inconsistentFinalStateException);
+return terminateExceptionally(inconsistentFinalStateException);
+}
+
+private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+scheduler.startCheckpointScheduler(checkpointCoordinator);
+result.completeExceptionally(throwable);
+
+return StopWithSavepointState.Final;
+}
+
+private StopWithSavepointState terminateSuccessfully(String path) {
+result.complete(path);
+
+return StopWithSavepointState.Final;
+}
+
+private static Set extractUnfinishedStates(
+Collection executionStates) {
+return executionStates.stream()
+.filter(state -> state != ExecutionState.FINISHED)
+.collect(Collectors.toSet());
+}
+
+/**
+ * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+ * operation.
+ *
+ * The state transitions are implemented in the following 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578568213



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,49 +909,38 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionTerminationsFuture =
+getCombinedExecutionTerminationFuture();
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
-return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
-.handleAsync(
-(path, throwable) -> {
-if (throwable != null) {
-// restart the checkpoint coordinator if 
stopWithSavepoint failed.
-
startCheckpointScheduler(checkpointCoordinator);
-throw new CompletionException(throwable);
-}
+StopWithSavepointContext stopWithSavepointContext =

Review comment:
   I renamed the class into `StopWithSavepointOperationsImpl`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578567820



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
 assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
 }
 
+@Test
+public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+// we allow restarts right from the start since the failure is going 
to happen in the first
+// phase (savepoint creation) of stop-with-savepoint
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+final JobGraph jobGraph = createTwoVertexJobGraph();
+// set checkpoint timeout to a low value to simulate checkpoint 
expiration
+enableCheckpointing(jobGraph, 10);
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// we have to set a listener that checks for the termination of the 
checkpoint handling
+OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+checkpointAbortionWasTriggered.trigger());
+
+// the failure handling has to happen in the same thread as the 
checkpoint coordination -
+// that's why we have to instantiate a separate ThreadExecutorService 
here
+final ScheduledExecutorService singleThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+singleThreadExecutorService);
+
+final DefaultScheduler scheduler =
+CompletableFuture.supplyAsync(
+() ->
+createSchedulerAndStartScheduling(
+jobGraph, mainThreadExecutor),
+mainThreadExecutor)
+.get();
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+final CompletableFuture stopWithSavepointFuture =
+CompletableFuture.supplyAsync(
+() -> {
+// we have to make sure that the tasks are 
running before
+// stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+failingExecutionAttemptId,
+ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ExecutionState.RUNNING));
+
+return 
scheduler.stopWithSavepoint("savepoint-path", false);
+},
+mainThreadExecutor)
+.get();
+
+checkpointTriggeredLatch.await();
+
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+final AcknowledgeCheckpoint acknowledgeCheckpoint =
+new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+// we need to wait for the expired checkpoint to be handled
+checkpointAbortionWasTriggered.await();
+
+CompletableFuture.runAsync(
+() -> {
+scheduler.updateTaskExecutionState(
+ 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578567121



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
 assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
 }
 
+@Test
+public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+// we allow restarts right from the start since the failure is going 
to happen in the first
+// phase (savepoint creation) of stop-with-savepoint
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+final JobGraph jobGraph = createTwoVertexJobGraph();
+// set checkpoint timeout to a low value to simulate checkpoint 
expiration
+enableCheckpointing(jobGraph, 10);
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// we have to set a listener that checks for the termination of the 
checkpoint handling
+OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+checkpointAbortionWasTriggered.trigger());
+
+// the failure handling has to happen in the same thread as the 
checkpoint coordination -
+// that's why we have to instantiate a separate ThreadExecutorService 
here
+final ScheduledExecutorService singleThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+singleThreadExecutorService);
+
+final DefaultScheduler scheduler =
+CompletableFuture.supplyAsync(
+() ->
+createSchedulerAndStartScheduling(
+jobGraph, mainThreadExecutor),
+mainThreadExecutor)
+.get();
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+final CompletableFuture stopWithSavepointFuture =
+CompletableFuture.supplyAsync(
+() -> {
+// we have to make sure that the tasks are 
running before
+// stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+failingExecutionAttemptId,
+ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ExecutionState.RUNNING));
+
+return 
scheduler.stopWithSavepoint("savepoint-path", false);
+},
+mainThreadExecutor)
+.get();
+
+checkpointTriggeredLatch.await();
+
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+final AcknowledgeCheckpoint acknowledgeCheckpoint =
+new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+// we need to wait for the expired checkpoint to be handled
+checkpointAbortionWasTriggered.await();
+
+CompletableFuture.runAsync(
+() -> {
+scheduler.updateTaskExecutionState(
+ 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578561800



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -835,8 +837,15 @@ public void updateAccumulators(final AccumulatorSnapshot 
accumulatorSnapshot) {
 mainThreadExecutor);
 }
 
-private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
+@Override

Review comment:
   The `CheckpointCoordinator` does not change during the lifetime of an 
`ExecutionGraph`. That's why, I thought of making this change to introduce the 
`CheckpointScheduling` interface.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r577158379



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+try {
+stopWithSavepointFuture.get();
+fail("An exception is expected.");
+} catch (ExecutionException e) {
+Optional flinkException =
+ExceptionUtils.findThrowable(e, FlinkException.class);
+
+assertTrue(flinkException.isPresent());
+assertThat(
+flinkException.get().getMessage(),
+is(
+String.format(
+"Inconsistent execution state after 
stopping with savepoint. A 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r577157152



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handleAsync(
+(executionStates, throwable) 
-> {
+Set 
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+if (throwable != null) {

Review comment:
   @rmetzger you're right - the Execution never completes exceptionally. I 
removed this code path in the `StopWithSavepointOperations` refactoring.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r577152942



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));

Review comment:
   `StopWithSavepointContextTest` replaces this test focussing more on the 
actually functionality we want to test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r577148145



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+try {
+stopWithSavepointFuture.get();
+fail("An exception is expected.");
+} catch (ExecutionException e) {
+Optional flinkException =
+ExceptionUtils.findThrowable(e, FlinkException.class);
+
+assertTrue(flinkException.isPresent());
+assertThat(
+flinkException.get().getMessage(),
+is(
+String.format(
+"Inconsistent execution state after 
stopping with savepoint. A 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r577147425



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handleAsync(
+(executionStates, throwable) 
-> {
+Set 
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+if (throwable != null) {
+log.info(
+"Failed during 
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+throw new 
CompletionException(throwable);
+} else if 
(!nonFinishedStates.isEmpty()) {
+log.info(
+"Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+FlinkException
+
inconsistentFinalStateException =
+new 
FlinkException(
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576628701



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
 assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
 }
 
+@Test
+public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+// we allow restarts right from the start since the failure is going 
to happen in the first
+// phase (savepoint creation) of stop-with-savepoint
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+final JobGraph jobGraph = createTwoVertexJobGraph();
+// set checkpoint timeout to a low value to simulate checkpoint 
expiration
+enableCheckpointing(jobGraph, 10);
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// we have to set a listener that checks for the termination of the 
checkpoint handling
+OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+checkpointAbortionWasTriggered.trigger());
+
+// the failure handling has to happen in the same thread as the 
checkpoint coordination -
+// that's why we have to instantiate a separate ThreadExecutorService 
here
+final ScheduledExecutorService singleThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+singleThreadExecutorService);
+
+final DefaultScheduler scheduler =
+CompletableFuture.supplyAsync(
+() ->
+createSchedulerAndStartScheduling(
+jobGraph, mainThreadExecutor),
+mainThreadExecutor)
+.get();
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+final CompletableFuture stopWithSavepointFuture =
+CompletableFuture.supplyAsync(
+() -> {
+// we have to make sure that the tasks are 
running before
+// stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+failingExecutionAttemptId,
+ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ExecutionState.RUNNING));
+
+return 
scheduler.stopWithSavepoint("savepoint-path", false);
+},
+mainThreadExecutor)
+.get();
+
+checkpointTriggeredLatch.await();
+
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+final AcknowledgeCheckpoint acknowledgeCheckpoint =
+new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+// we need to wait for the expired checkpoint to be handled
+checkpointAbortionWasTriggered.await();
+
+CompletableFuture.runAsync(
+() -> {
+scheduler.updateTaskExecutionState(
+ 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576617075



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
 assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
 }
 
+@Test
+public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+// we allow restarts right from the start since the failure is going 
to happen in the first
+// phase (savepoint creation) of stop-with-savepoint
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+final JobGraph jobGraph = createTwoVertexJobGraph();
+// set checkpoint timeout to a low value to simulate checkpoint 
expiration
+enableCheckpointing(jobGraph, 10);
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// we have to set a listener that checks for the termination of the 
checkpoint handling
+OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+checkpointAbortionWasTriggered.trigger());
+
+// the failure handling has to happen in the same thread as the 
checkpoint coordination -
+// that's why we have to instantiate a separate ThreadExecutorService 
here
+final ScheduledExecutorService singleThreadExecutorService =
+Executors.newSingleThreadScheduledExecutor();
+final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+singleThreadExecutorService);
+
+final DefaultScheduler scheduler =
+CompletableFuture.supplyAsync(
+() ->
+createSchedulerAndStartScheduling(
+jobGraph, mainThreadExecutor),
+mainThreadExecutor)
+.get();
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+final CompletableFuture stopWithSavepointFuture =
+CompletableFuture.supplyAsync(
+() -> {
+// we have to make sure that the tasks are 
running before
+// stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+failingExecutionAttemptId,
+ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ExecutionState.RUNNING));
+
+return 
scheduler.stopWithSavepoint("savepoint-path", false);
+},
+mainThreadExecutor)
+.get();
+
+checkpointTriggeredLatch.await();
+
+final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+final AcknowledgeCheckpoint acknowledgeCheckpoint =
+new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+// we need to wait for the expired checkpoint to be handled
+checkpointAbortionWasTriggered.await();
+
+CompletableFuture.runAsync(
+() -> {
+scheduler.updateTaskExecutionState(
+ 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576616131



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+try {
+stopWithSavepointFuture.get();
+fail("An exception is expected.");
+} catch (ExecutionException e) {
+Optional flinkException =
+ExceptionUtils.findThrowable(e, FlinkException.class);
+
+assertTrue(flinkException.isPresent());
+assertThat(
+flinkException.get().getMessage(),
+is(
+String.format(
+"Inconsistent execution state after 
stopping with savepoint. A 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576616131



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+try {
+stopWithSavepointFuture.get();
+fail("An exception is expected.");
+} catch (ExecutionException e) {
+Optional flinkException =
+ExceptionUtils.findThrowable(e, FlinkException.class);
+
+assertTrue(flinkException.isPresent());
+assertThat(
+flinkException.get().getMessage(),
+is(
+String.format(
+"Inconsistent execution state after 
stopping with savepoint. A 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-16 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576615862



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+try {
+stopWithSavepointFuture.get();
+fail("An exception is expected.");
+} catch (ExecutionException e) {
+Optional flinkException =
+ExceptionUtils.findThrowable(e, FlinkException.class);
+
+assertTrue(flinkException.isPresent());
+assertThat(
+flinkException.get().getMessage(),
+is(
+String.format(
+"Inconsistent execution state after 
stopping with savepoint. A 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-15 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576278350



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handleAsync(
+(executionStates, throwable) 
-> {
+Set 
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+if (throwable != null) {
+log.info(
+"Failed during 
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+throw new 
CompletionException(throwable);
+} else if 
(!nonFinishedStates.isEmpty()) {
+log.info(
+"Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+FlinkException
+
inconsistentFinalStateException =
+new 
FlinkException(
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-15 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576278350



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handleAsync(
+(executionStates, throwable) 
-> {
+Set 
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+if (throwable != null) {
+log.info(
+"Failed during 
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+throw new 
CompletionException(throwable);
+} else if 
(!nonFinishedStates.isEmpty()) {
+log.info(
+"Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+FlinkException
+
inconsistentFinalStateException =
+new 
FlinkException(
+

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-15 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576241481



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));

Review comment:
   That's what I wanted to make clear with the comment above. But I could 
try to make the comment more explicit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-15 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576239135



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handleAsync(

Review comment:
   Thanks for the remark, @rmetzger. I thought about it again: We cannot 
move the `handleAsync` out of the `thenCompose` because the error handling that 
is triggered by the `handleAsync` should only be called if the savepoint 
creation succeeds. It would be also triggered by a failure which happened 
during Savepoint creation that caused all executions to terminate. The 
`CheckpointCoordinator` is triggering a restart in case of a Savepoint failure.
   
   That's why, the `testStopWithSavepointFailingWithDeclinedCheckpoint` fails.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-15 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576239135



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handleAsync(

Review comment:
   Thanks for the remark, @rmetzger. I thought about it again: We cannot 
move the `handleAsync` out of the `thenCompose` because the error handling that 
is triggered by the `handleAsync` should only be called if the savepoint 
creation succeeds. It would be also triggered by a failure which happened 
during Savepoint creation that caused all executions to terminate. The 
`CheckpointCoordinator` is triggering a restart in case of a Savepoint failure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-12 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r575246034



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
 }
 
+@Test
+public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+// initially, we don't allow any restarts since the first phase 
(savepoint creation)
+// succeeds without any failures
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+final SimpleAckingTaskManagerGateway taskManagerGateway =
+new SimpleAckingTaskManagerGateway();
+final CountDownLatch checkpointTriggeredLatch =
+getCheckpointTriggeredLatch(taskManagerGateway);
+
+// collect executions to which the checkpoint completion was confirmed
+final List 
executionAttemptIdsWithCompletedCheckpoint =
+new ArrayList<>();
+taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+(executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+(ignored0, ignored1, ignored2, ignored3) -> {
+throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+});
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+
+// we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+// trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+final CompletableFuture stopWithSavepointFuture =
+scheduler.stopWithSavepoint(savepointFolder, false);
+checkpointTriggeredLatch.await();
+
+acknowledgePendingCheckpoint(scheduler, 1);
+
+assertThat(
+"Both the executions where notified about the completed 
checkpoint.",
+executionAttemptIdsWithCompletedCheckpoint,
+containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+// The savepoint creation succeeded a failure happens in the second 
phase when finishing
+// the tasks. That's why, the restarting policy is enabled.
+testRestartBackoffTimeStrategy.setCanRestart(true);
+
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+scheduler.updateTaskExecutionState(
+new TaskExecutionState(
+jobGraph.getJobID(),
+succeedingExecutionAttemptId,
+ExecutionState.FINISHED));
+
+// the restarts due to local failure handling and global job fail-over 
are triggered
+assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+try {
+stopWithSavepointFuture.get();
+fail("An exception is expected.");
+} catch (ExecutionException e) {
+Optional flinkException =
+ExceptionUtils.findThrowable(e, FlinkException.class);
+
+assertTrue(flinkException.isPresent());
+assertThat(
+flinkException.get().getMessage(),
+is(
+String.format(
+"Inconsistent execution state after 
stopping with savepoint. A 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-12 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r575043953



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),

Review comment:
   Yeah, it took me also some digging to find a connection. I found  
`executionGraph.getSchedulingTopology().getAllPipelinedRegions()`. Not sure, 
though, whether this works conceptually.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-11 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r574578211



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),

Review comment:
   I'm not that familiar with the regions concept, yet. AFAIK, regions are 
collections of tasks that have to run together, right? Do they share the same 
`ExecutionState`? I guess, @tillrohrmann could answer that question.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-11 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r574569882



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##
@@ -600,6 +616,171 @@ public void testSubmitWithUnknownSavepointPath() throws 
Exception {
 }
 }
 
+@Test
+public void testStopWithSavepointFailingInSnapshotCreation() throws 
Exception {
+testStopWithFailingSourceInOnePipeline(
+new SnapshotFailingInfiniteTestSource(),
+folder.newFolder(),
+// two restarts expected:
+// 1. task failure restart
+// 2. job failover triggered by the CheckpointFailureManager
+2,
+assertInSnapshotCreationFailure());
+}
+
+@Test
+public void testStopWithSavepointFailingAfterSnapshotCreation() throws 
Exception {

Review comment:
   This is fixed after letting the error handling run in the 
`mainThreadExecutor` again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-11 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r574568680



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handle(
+(executionStates, throwable) 
-> {
+Set 
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+if (throwable != null) {
+log.info(
+"Failed during 
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+throw new 
CompletionException(throwable);
+} else if 
(!nonFinishedStates.isEmpty()) {
+log.info(
+"Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+FlinkException
+
inconsistentFinalStateException =
+new 
FlinkException(
+   
 

[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-11 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r574523861



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##
@@ -600,6 +616,171 @@ public void testSubmitWithUnknownSavepointPath() throws 
Exception {
 }
 }
 
+@Test
+public void testStopWithSavepointFailingInSnapshotCreation() throws 
Exception {
+testStopWithFailingSourceInOnePipeline(
+new SnapshotFailingInfiniteTestSource(),
+folder.newFolder(),
+// two restarts expected:
+// 1. task failure restart
+// 2. job failover triggered by the CheckpointFailureManager
+2,
+assertInSnapshotCreationFailure());
+}
+
+@Test
+public void testStopWithSavepointFailingAfterSnapshotCreation() throws 
Exception {

Review comment:
   Good catch. I missed running them individually. This test failure would 
be fixed if we apply the change you suggested above to run the error handling 
in the `mainThreadExecutor`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14847: [FLINK-21030][runtime] Add global failover in case of a stop-with-savepoint failure

2021-02-11 Thread GitBox


XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r574522921



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
 // will be restarted by the CheckpointCoordinatorDeActivator.
 checkpointCoordinator.stopCheckpointScheduler();
 
+final CompletableFuture> 
executionGraphTerminationFuture =
+FutureUtils.combineAll(
+StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getTerminalStateFuture)
+.collect(Collectors.toList()));
+
 final CompletableFuture savepointFuture =
 checkpointCoordinator
 .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
 .thenApply(CompletedCheckpoint::getExternalPointer);
 
-final CompletableFuture terminationFuture =
-executionGraph
-.getTerminationFuture()
-.handle(
-(jobstatus, throwable) -> {
-if (throwable != null) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: {}",
-jobGraph.getJobID(),
-throwable.getMessage());
-throw new 
CompletionException(throwable);
-} else if (jobstatus != 
JobStatus.FINISHED) {
-log.info(
-"Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-jobGraph.getJobID(),
-jobstatus);
-throw new CompletionException(
-new FlinkException(
-"Reached state "
-+ jobstatus
-+ " instead of 
FINISHED."));
-}
-return jobstatus;
-});
-
 return savepointFuture
-.thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+.thenCompose(
+path ->
+executionGraphTerminationFuture
+.handle(
+(executionStates, throwable) 
-> {
+Set 
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+if (throwable != null) {
+log.info(
+"Failed during 
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+throw new 
CompletionException(throwable);
+} else if 
(!nonFinishedStates.isEmpty()) {
+log.info(
+"Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+FlinkException
+
inconsistentFinalStateException =
+new 
FlinkException(
+