This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 936489ddda86545392292ee8f82ba99b0bb1b204
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 15:44:47 2021 +0100

    [hotfix] Remove unused jobStatus parameter from 
CheckpointCoordinator.shutdown()
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  3 +-
 .../runtime/executiongraph/ExecutionGraph.java     |  2 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |  3 +-
 .../CheckpointCoordinatorRestoringTest.java        |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 42 +++++++++++-----------
 .../CheckpointCoordinatorTriggeringTest.java       |  8 ++---
 6 files changed, 29 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 558f16a..09ad7d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -406,7 +405,7 @@ public class CheckpointCoordinator {
      * <p>After this method has been called, the coordinator does not accept 
and further messages
      * and cannot trigger any further checkpoints.
      */
-    public void shutdown(JobStatus jobStatus) throws Exception {
+    public void shutdown() throws Exception {
         synchronized (lock) {
             if (!shutdown) {
                 shutdown = true;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 13a4c03..aff1f4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1231,7 +1231,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
             CheckpointCoordinator coord = this.checkpointCoordinator;
             this.checkpointCoordinator = null;
             if (coord != null) {
-                coord.shutdown(status);
+                coord.shutdown();
             }
             if (checkpointCoordinatorTimer != null) {
                 checkpointCoordinatorTimer.shutdownNow();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index ba2c4ee..07f3146 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -142,7 +141,7 @@ public class CheckpointCoordinatorMasterHooksTest {
         verify(hook2, times(1)).reset();
 
         // shutdown
-        cc.shutdown(JobStatus.CANCELED);
+        cc.shutdown();
         verify(hook1, times(1)).close();
         verify(hook2, times(1)).close();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index e485d02..e8134cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -397,7 +397,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
             verify(statefulExec1, 
times(1)).setInitialState(MockitoHamcrest.argThat(matcher));
             verify(statelessExec1, 
times(0)).setInitialState(Mockito.<JobManagerTaskRestore>any());
 
-            coord.shutdown(JobStatus.FINISHED);
+            coord.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c7d66fe..8e68fbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -201,7 +201,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             assertEquals(0, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
             assertEquals(0, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -227,7 +227,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             assertEquals(0, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
             assertEquals(0, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -253,7 +253,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             assertEquals(0, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
             assertEquals(0, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -314,7 +314,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             assertEquals(errorMsg, e.getMessage());
         } finally {
             try {
-                checkpointCoordinator.shutdown(JobStatus.FINISHED);
+                checkpointCoordinator.shutdown();
             } catch (Exception e) {
                 e.printStackTrace();
                 fail(e.getMessage());
@@ -346,7 +346,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             assertTrue(e instanceof RuntimeException);
             assertEquals(errorMsg, e.getMessage());
         } finally {
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         }
     }
 
@@ -455,7 +455,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     TASK_MANAGER_LOCATION_INFO);
             assertTrue(checkpoint.isDisposed());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -604,7 +604,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             verify(vertex2.getCurrentExecutionAttempt(), times(1))
                     .notifyCheckpointAborted(eq(checkpoint1Id), 
any(Long.class));
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -794,7 +794,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .notifyCheckpointComplete(eq(checkpointIdNew), 
any(Long.class));
             }
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -951,7 +951,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             assertEquals(jobId, sc2.getJobId());
             assertTrue(sc2.getOperatorStates().isEmpty());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -1169,7 +1169,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                     TASK_MANAGER_LOCATION_INFO);
             verify(subtaskState13, times(1)).discardState();
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
             completedCheckpointStore.shutdown(
                     JobStatus.FINISHED, new CheckpointsCleaner(), () -> {});
 
@@ -1255,7 +1255,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             verify(commitVertex.getCurrentExecutionAttempt(), times(0))
                     .notifyCheckpointComplete(anyLong(), anyLong());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -1316,7 +1316,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                     new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), 
checkpointId),
                     TASK_MANAGER_LOCATION_INFO);
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -1663,7 +1663,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                     .notifyCheckpointComplete(eq(checkpointIdNew), 
any(Long.class));
         }
 
-        checkpointCoordinator.shutdown(JobStatus.FINISHED);
+        checkpointCoordinator.shutdown();
     }
 
     /**
@@ -1858,7 +1858,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             manuallyTriggeredScheduledExecutor.triggerAll();
             assertEquals(maxConcurrentAttempts + 1, numCalls.get());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -1929,7 +1929,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             
assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
             
assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -2110,7 +2110,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             }
 
             // the now we should have a completed checkpoint
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -2629,7 +2629,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                 .getMessage()
                                 .equals(expectedRootCause.getMessage()));
 
-        coordinator.shutdown(JobStatus.FAILING);
+        coordinator.shutdown();
     }
 
     /** Tests that do not trigger checkpoint when stop the coordinator after 
the eager pre-check. */
@@ -2667,7 +2667,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                         
checkpointExceptionOptional.get().getCheckpointFailureReason());
             }
         } finally {
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         }
     }
 
@@ -2719,7 +2719,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             assertTrue(props.isSavepoint());
             assertFalse(props.forceCheckpoint());
         } finally {
-            coordinator.shutdown(JobStatus.FINISHED);
+            coordinator.shutdown();
         }
     }
 
@@ -2889,7 +2889,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
         assertEquals(jobId, success.getJobId());
         assertEquals(2, success.getOperatorStates().size());
 
-        checkpointCoordinator.shutdown(JobStatus.FINISHED);
+        checkpointCoordinator.shutdown();
     }
 
     @Test
@@ -3110,7 +3110,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
             assertEquals(Collections.singletonList(1L), 
context.getAbortedCheckpoints());
             assertEquals(Collections.singletonList(2L), 
context.getCompletedCheckpoints());
         } finally {
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 5509e71..c999306 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -172,7 +172,7 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
             manuallyTriggeredScheduledExecutor.triggerAll();
             assertEquals(5, numCalls.get());
 
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -261,7 +261,7 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
             }
         } finally {
             checkpointCoordinator.stopCheckpointScheduler();
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
         }
     }
 
@@ -305,7 +305,7 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
 
-        checkpointCoordinator.shutdown(JobStatus.FAILED);
+        checkpointCoordinator.shutdown();
         manuallyTriggeredScheduledExecutor.triggerAll();
         try {
             onCompletionPromise.get();
@@ -600,7 +600,7 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
                         instanceOf(CheckpointException.class));
             }
         } finally {
-            checkpointCoordinator.shutdown(JobStatus.FINISHED);
+            checkpointCoordinator.shutdown();
             ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
scheduledExecutorService);
         }
     }

Reply via email to