This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2a225d613f942ea1aaba455cfe933449af167570 Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Apr 13 10:37:26 2021 +0200 [hotfix][coordination] Make failed event valve shutting smoother. Failing to shut the event valve is quite common, it happens whenever a checkpoint gets canceled. So we don't handle this as an IllegalState, because it is very much an expected state. --- .../coordination/OperatorCoordinatorHolder.java | 14 ++++++---- .../operators/coordination/OperatorEventValve.java | 13 ++------- .../coordination/OperatorEventValveTest.java | 32 ++++++++++++++++------ 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 89e1fc4..a6e2317 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -282,13 +282,15 @@ public class OperatorCoordinatorHolder () -> { if (failure != null) { result.completeExceptionally(failure); + } else if (eventValve.tryShutValve(checkpointId)) { + result.complete(success); } else { - try { - eventValve.shutValve(checkpointId); - result.complete(success); - } catch (Exception e) { - result.completeExceptionally(e); - } + // if we cannot shut the valve, this means the checkpoint + // has been aborted before, so the future is already + // completed exceptionally. but we try to complete it here + // again, just in case, as a safety net. + result.completeExceptionally( + new FlinkException("Cannot shut event valve")); } })); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java index 98c90d5..8e71e36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java @@ -128,21 +128,14 @@ final class OperatorEventValve { * Shuts the value. All events sent through this valve are blocked until the valve is re-opened. * If the valve is already shut, this does nothing. */ - public void shutValve(long checkpointId) { + public boolean tryShutValve(long checkpointId) { checkRunsInMainThread(); if (checkpointId == currentCheckpointId) { shut = true; - } else { - throw new IllegalStateException( - String.format( - "Cannot shut valve for non-prepared checkpoint. " - + "Prepared checkpoint = %s, attempting-to-close checkpoint = %d", - (currentCheckpointId == NO_CHECKPOINT - ? "(none)" - : String.valueOf(currentCheckpointId)), - checkpointId)); + return true; } + return false; } public void openValveAndUnmarkCheckpoint(long expectedCheckpointId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java index bc5f2fe..dbe207d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java @@ -47,19 +47,33 @@ public class OperatorEventValveTest { assertTrue(future.isDone()); } - @Test(expected = IllegalStateException.class) - public void errorShuttingUnmarkedValve() { + @Test + public void shuttingMarkedValve() { + final OperatorEventValve valve = new OperatorEventValve(); + + valve.markForCheckpoint(200L); + final boolean shut = valve.tryShutValve(200L); + + assertTrue(shut); + } + + @Test + public void notShuttingUnmarkedValve() { final OperatorEventValve valve = new OperatorEventValve(); - valve.shutValve(123L); + final boolean shut = valve.tryShutValve(123L); + + assertFalse(shut); } - @Test(expected = IllegalStateException.class) - public void errorShuttingValveForOtherMark() { + @Test + public void notShuttingValveForOtherMark() { final OperatorEventValve valve = new OperatorEventValve(); valve.markForCheckpoint(100L); - valve.shutValve(123L); + final boolean shut = valve.tryShutValve(123L); + + assertFalse(shut); } @Test @@ -68,7 +82,7 @@ public class OperatorEventValveTest { final OperatorEventValve valve = new OperatorEventValve(); valve.markForCheckpoint(1L); - valve.shutValve(1L); + valve.tryShutValve(1L); final CompletableFuture<Acknowledge> future = new CompletableFuture<>(); valve.sendEvent(sender.createSendAction(new TestOperatorEvent(), 1), future); @@ -83,7 +97,7 @@ public class OperatorEventValveTest { final OperatorEventValve valve = new OperatorEventValve(); valve.markForCheckpoint(17L); - valve.shutValve(17L); + valve.tryShutValve(17L); final OperatorEvent event1 = new TestOperatorEvent(); final OperatorEvent event2 = new TestOperatorEvent(); @@ -108,7 +122,7 @@ public class OperatorEventValveTest { final OperatorEventValve valve = new OperatorEventValve(); valve.markForCheckpoint(17L); - valve.shutValve(17L); + valve.tryShutValve(17L); final CompletableFuture<Acknowledge> future = new CompletableFuture<>(); valve.sendEvent(sender.createSendAction(new TestOperatorEvent(), 10), future);