This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 936489ddda86545392292ee8f82ba99b0bb1b204 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 15:44:47 2021 +0100 [hotfix] Remove unused jobStatus parameter from CheckpointCoordinator.shutdown() --- .../runtime/checkpoint/CheckpointCoordinator.java | 3 +- .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../CheckpointCoordinatorMasterHooksTest.java | 3 +- .../CheckpointCoordinatorRestoringTest.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 42 +++++++++++----------- .../CheckpointCoordinatorTriggeringTest.java | 8 ++--- 6 files changed, 29 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 558f16a..09ad7d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -406,7 +405,7 @@ public class CheckpointCoordinator { * <p>After this method has been called, the coordinator does not accept and further messages * and cannot trigger any further checkpoints. */ - public void shutdown(JobStatus jobStatus) throws Exception { + public void shutdown() throws Exception { synchronized (lock) { if (!shutdown) { shutdown = true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 13a4c03..aff1f4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1231,7 +1231,7 @@ public class ExecutionGraph implements AccessExecutionGraph { CheckpointCoordinator coord = this.checkpointCoordinator; this.checkpointCoordinator = null; if (coord != null) { - coord.shutdown(status); + coord.shutdown(); } if (checkpointCoordinatorTimer != null) { checkpointCoordinatorTimer.shutdownNow(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index ba2c4ee..07f3146 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; @@ -142,7 +141,7 @@ public class CheckpointCoordinatorMasterHooksTest { verify(hook2, times(1)).reset(); // shutdown - cc.shutdown(JobStatus.CANCELED); + cc.shutdown(); verify(hook1, times(1)).close(); verify(hook2, times(1)).close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index e485d02..e8134cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -397,7 +397,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { verify(statefulExec1, times(1)).setInitialState(MockitoHamcrest.argThat(matcher)); verify(statelessExec1, times(0)).setInitialState(Mockito.<JobManagerTaskRestore>any()); - coord.shutdown(JobStatus.FINISHED); + coord.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index c7d66fe..8e68fbf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -201,7 +201,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -227,7 +227,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -253,7 +253,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -314,7 +314,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(errorMsg, e.getMessage()); } finally { try { - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -346,7 +346,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertTrue(e instanceof RuntimeException); assertEquals(errorMsg, e.getMessage()); } finally { - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } } @@ -455,7 +455,7 @@ public class CheckpointCoordinatorTest extends TestLogger { TASK_MANAGER_LOCATION_INFO); assertTrue(checkpoint.isDisposed()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -604,7 +604,7 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(vertex2.getCurrentExecutionAttempt(), times(1)) .notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class)); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -794,7 +794,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class)); } - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -951,7 +951,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(jobId, sc2.getJobId()); assertTrue(sc2.getOperatorStates().isEmpty()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1169,7 +1169,7 @@ public class CheckpointCoordinatorTest extends TestLogger { TASK_MANAGER_LOCATION_INFO); verify(subtaskState13, times(1)).discardState(); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); completedCheckpointStore.shutdown( JobStatus.FINISHED, new CheckpointsCleaner(), () -> {}); @@ -1255,7 +1255,7 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(commitVertex.getCurrentExecutionAttempt(), times(0)) .notifyCheckpointComplete(anyLong(), anyLong()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1316,7 +1316,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1663,7 +1663,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .notifyCheckpointComplete(eq(checkpointIdNew), any(Long.class)); } - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } /** @@ -1858,7 +1858,7 @@ public class CheckpointCoordinatorTest extends TestLogger { manuallyTriggeredScheduledExecutor.triggerAll(); assertEquals(maxConcurrentAttempts + 1, numCalls.get()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1929,7 +1929,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L)); assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L)); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -2110,7 +2110,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } // the now we should have a completed checkpoint - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -2629,7 +2629,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .getMessage() .equals(expectedRootCause.getMessage())); - coordinator.shutdown(JobStatus.FAILING); + coordinator.shutdown(); } /** Tests that do not trigger checkpoint when stop the coordinator after the eager pre-check. */ @@ -2667,7 +2667,7 @@ public class CheckpointCoordinatorTest extends TestLogger { checkpointExceptionOptional.get().getCheckpointFailureReason()); } } finally { - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } } @@ -2719,7 +2719,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertTrue(props.isSavepoint()); assertFalse(props.forceCheckpoint()); } finally { - coordinator.shutdown(JobStatus.FINISHED); + coordinator.shutdown(); } } @@ -2889,7 +2889,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(jobId, success.getJobId()); assertEquals(2, success.getOperatorStates().size()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } @Test @@ -3110,7 +3110,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(Collections.singletonList(1L), context.getAbortedCheckpoints()); assertEquals(Collections.singletonList(2L), context.getCompletedCheckpoints()); } finally { - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java index 5509e71..c999306 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java @@ -172,7 +172,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger { manuallyTriggeredScheduledExecutor.triggerAll(); assertEquals(5, numCalls.get()); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -261,7 +261,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger { } } finally { checkpointCoordinator.stopCheckpointScheduler(); - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); } } @@ -305,7 +305,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger { final CompletableFuture<CompletedCheckpoint> onCompletionPromise = triggerPeriodicCheckpoint(checkpointCoordinator); - checkpointCoordinator.shutdown(JobStatus.FAILED); + checkpointCoordinator.shutdown(); manuallyTriggeredScheduledExecutor.triggerAll(); try { onCompletionPromise.get(); @@ -600,7 +600,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger { instanceOf(CheckpointException.class)); } } finally { - checkpointCoordinator.shutdown(JobStatus.FINISHED); + checkpointCoordinator.shutdown(); ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, scheduledExecutorService); } }
