[hotfix] Suppress emitting non-causal exceptions from closed checkpointing thread
This avoids that an exception that is caused by closing a running snapshot is reported. With this we avoid that users get confused by their logs or that this exception could be reported before its actual cause, thus hiding the real cause in logs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08d08810 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08d08810 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08d08810 Branch: refs/heads/master Commit: 08d088103b1b4b2c0c772ec36a8b3e52b2588b5f Parents: 32e25eb Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed Jan 17 18:02:25 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Sun Feb 25 15:59:54 2018 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 119 ++++++++++++------- .../tasks/StreamTaskTerminationTest.java | 26 ++-- 2 files changed, 83 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/08d08810/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index dba4c87..7ebbc71 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -814,8 +814,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final long asyncStartNanos; - private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<>( - CheckpointingOperation.AsynCheckpointState.RUNNING); + private final AtomicReference<CheckpointingOperation.AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>( + CheckpointingOperation.AsyncCheckpointState.RUNNING); AsyncCheckpointRunnable( StreamTask<?, ?> owner, @@ -865,8 +865,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); - if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, - CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, + CheckpointingOperation.AsyncCheckpointState.COMPLETED)) { reportCompletedSnapshotStates( jobManagerTaskOperatorSubtaskStates, @@ -917,63 +917,92 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } private void handleExecutionException(Exception e) { - // the state is completed if an exception occurred in the acknowledgeCheckpoint call - // in order to clean up, we have to set it to RUNNING again. - asyncCheckpointState.compareAndSet( - CheckpointingOperation.AsynCheckpointState.COMPLETED, - CheckpointingOperation.AsynCheckpointState.RUNNING); - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } + boolean didCleanup = false; + CheckpointingOperation.AsyncCheckpointState currentState = asyncCheckpointState.get(); - Exception checkpointException = new Exception( - "Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + - owner.getName() + '.', - e); + while (CheckpointingOperation.AsyncCheckpointState.DISCARDED != currentState) { - owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( - checkpointMetaData, - checkpointException); + if (asyncCheckpointState.compareAndSet( + currentState, + CheckpointingOperation.AsyncCheckpointState.DISCARDED)) { + + didCleanup = true; + + try { + cleanup(); + } catch (Exception cleanupException) { + e.addSuppressed(cleanupException); + } + + Exception checkpointException = new Exception( + "Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + + owner.getName() + '.', + e); + + // We only report the exception for the original cause of fail and cleanup. + // Otherwise this followup exception could race the original exception in failing the task. + owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( + checkpointMetaData, + checkpointException); + + currentState = CheckpointingOperation.AsyncCheckpointState.DISCARDED; + } else { + currentState = asyncCheckpointState.get(); + } + } + + if (!didCleanup) { + LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", e); + } } @Override public void close() { - try { - cleanup(); - } catch (Exception cleanupException) { - LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException); + if (asyncCheckpointState.compareAndSet( + CheckpointingOperation.AsyncCheckpointState.RUNNING, + CheckpointingOperation.AsyncCheckpointState.DISCARDED)) { + + try { + cleanup(); + } catch (Exception cleanupException) { + LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException); + } + } else { + logFailedCleanupAttempt(); } } private void cleanup() throws Exception { - if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) { - LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName()); - Exception exception = null; + LOG.debug( + "Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", + checkpointMetaData.getCheckpointId(), + owner.getName()); - // clean up ongoing operator snapshot results and non partitioned state handles - for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { - if (operatorSnapshotResult != null) { - try { - operatorSnapshotResult.cancel(); - } catch (Exception cancelException) { - exception = ExceptionUtils.firstOrSuppressed(cancelException, exception); - } + Exception exception = null; + + // clean up ongoing operator snapshot results and non partitioned state handles + for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { + if (operatorSnapshotResult != null) { + try { + operatorSnapshotResult.cancel(); + } catch (Exception cancelException) { + exception = ExceptionUtils.firstOrSuppressed(cancelException, exception); } } + } - if (null != exception) { - throw exception; - } - } else { - LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has " + - "already been completed. Thus, the state handles are not cleaned up.", - owner.getName(), - checkpointMetaData.getCheckpointId()); + if (null != exception) { + throw exception; } } + + private void logFailedCleanupAttempt() { + LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has " + + "already been completed. Thus, the state handles are not cleaned up.", + owner.getName(), + checkpointMetaData.getCheckpointId()); + } } public CloseableRegistry getCancelables() { @@ -1088,7 +1117,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - private enum AsynCheckpointState { + private enum AsyncCheckpointState { RUNNING, DISCARDED, COMPLETED http://git-wip-us.apache.org/repos/asf/flink/blob/08d08810/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 62a903b..e5558f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -75,6 +75,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -82,6 +83,7 @@ import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; @@ -97,7 +99,6 @@ public class StreamTaskTerminationTest extends TestLogger { public static final OneShotLatch RUN_LATCH = new OneShotLatch(); public static final OneShotLatch CHECKPOINTING_LATCH = new OneShotLatch(); private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch(); - private static final OneShotLatch HANDLE_ASYNC_EXCEPTION_LATCH = new OneShotLatch(); /** * FLINK-6833 @@ -209,8 +210,7 @@ public class StreamTaskTerminationTest extends TestLogger { } @Override - protected void init() throws Exception { - + protected void init() { } @Override @@ -226,24 +226,16 @@ public class StreamTaskTerminationTest extends TestLogger { // has been stopped CLEANUP_LATCH.trigger(); - // wait until handle async exception has been called to proceed with the termination of the - // StreamTask - HANDLE_ASYNC_EXCEPTION_LATCH.await(); + // wait until all async checkpoint threads are terminated, so that no more exceptions can be reported + Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS)); } @Override - protected void cancelTask() throws Exception { - } - - @Override - public void handleAsyncException(String message, Throwable exception) { - super.handleAsyncException(message, exception); - - HANDLE_ASYNC_EXCEPTION_LATCH.trigger(); + protected void cancelTask() { } } - static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> { + private static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> { private static final long serialVersionUID = 4517845269225218312L; } @@ -252,7 +244,7 @@ public class StreamTaskTerminationTest extends TestLogger { private static final long serialVersionUID = -5053068148933314100L; @Override - public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { + public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) { throw new UnsupportedOperationException(); } @@ -269,7 +261,7 @@ public class StreamTaskTerminationTest extends TestLogger { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + TaskKvStateRegistry kvStateRegistry) { return null; }