This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2a225d613f942ea1aaba455cfe933449af167570
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Apr 13 10:37:26 2021 +0200

    [hotfix][coordination] Make failed event valve shutting smoother.
    
    Failing to shut the event valve is quite common, it happens whenever a 
checkpoint gets canceled.
    So we don't handle this as an IllegalState, because it is very much an 
expected state.
---
 .../coordination/OperatorCoordinatorHolder.java    | 14 ++++++----
 .../operators/coordination/OperatorEventValve.java | 13 ++-------
 .../coordination/OperatorEventValveTest.java       | 32 ++++++++++++++++------
 3 files changed, 34 insertions(+), 25 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 89e1fc4..a6e2317 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -282,13 +282,15 @@ public class OperatorCoordinatorHolder
                                 () -> {
                                     if (failure != null) {
                                         result.completeExceptionally(failure);
+                                    } else if 
(eventValve.tryShutValve(checkpointId)) {
+                                        result.complete(success);
                                     } else {
-                                        try {
-                                            eventValve.shutValve(checkpointId);
-                                            result.complete(success);
-                                        } catch (Exception e) {
-                                            result.completeExceptionally(e);
-                                        }
+                                        // if we cannot shut the valve, this 
means the checkpoint
+                                        // has been aborted before, so the 
future is already
+                                        // completed exceptionally. but we try 
to complete it here
+                                        // again, just in case, as a safety 
net.
+                                        result.completeExceptionally(
+                                                new FlinkException("Cannot 
shut event valve"));
                                     }
                                 }));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index 98c90d5..8e71e36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -128,21 +128,14 @@ final class OperatorEventValve {
      * Shuts the value. All events sent through this valve are blocked until 
the valve is re-opened.
      * If the valve is already shut, this does nothing.
      */
-    public void shutValve(long checkpointId) {
+    public boolean tryShutValve(long checkpointId) {
         checkRunsInMainThread();
 
         if (checkpointId == currentCheckpointId) {
             shut = true;
-        } else {
-            throw new IllegalStateException(
-                    String.format(
-                            "Cannot shut valve for non-prepared checkpoint. "
-                                    + "Prepared checkpoint = %s, 
attempting-to-close checkpoint = %d",
-                            (currentCheckpointId == NO_CHECKPOINT
-                                    ? "(none)"
-                                    : String.valueOf(currentCheckpointId)),
-                            checkpointId));
+            return true;
         }
+        return false;
     }
 
     public void openValveAndUnmarkCheckpoint(long expectedCheckpointId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
index bc5f2fe..dbe207d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -47,19 +47,33 @@ public class OperatorEventValveTest {
         assertTrue(future.isDone());
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void errorShuttingUnmarkedValve() {
+    @Test
+    public void shuttingMarkedValve() {
+        final OperatorEventValve valve = new OperatorEventValve();
+
+        valve.markForCheckpoint(200L);
+        final boolean shut = valve.tryShutValve(200L);
+
+        assertTrue(shut);
+    }
+
+    @Test
+    public void notShuttingUnmarkedValve() {
         final OperatorEventValve valve = new OperatorEventValve();
 
-        valve.shutValve(123L);
+        final boolean shut = valve.tryShutValve(123L);
+
+        assertFalse(shut);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void errorShuttingValveForOtherMark() {
+    @Test
+    public void notShuttingValveForOtherMark() {
         final OperatorEventValve valve = new OperatorEventValve();
 
         valve.markForCheckpoint(100L);
-        valve.shutValve(123L);
+        final boolean shut = valve.tryShutValve(123L);
+
+        assertFalse(shut);
     }
 
     @Test
@@ -68,7 +82,7 @@ public class OperatorEventValveTest {
         final OperatorEventValve valve = new OperatorEventValve();
 
         valve.markForCheckpoint(1L);
-        valve.shutValve(1L);
+        valve.tryShutValve(1L);
 
         final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
         valve.sendEvent(sender.createSendAction(new TestOperatorEvent(), 1), 
future);
@@ -83,7 +97,7 @@ public class OperatorEventValveTest {
         final OperatorEventValve valve = new OperatorEventValve();
 
         valve.markForCheckpoint(17L);
-        valve.shutValve(17L);
+        valve.tryShutValve(17L);
 
         final OperatorEvent event1 = new TestOperatorEvent();
         final OperatorEvent event2 = new TestOperatorEvent();
@@ -108,7 +122,7 @@ public class OperatorEventValveTest {
         final OperatorEventValve valve = new OperatorEventValve();
 
         valve.markForCheckpoint(17L);
-        valve.shutValve(17L);
+        valve.tryShutValve(17L);
 
         final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
         valve.sendEvent(sender.createSendAction(new TestOperatorEvent(), 10), 
future);

Reply via email to