[hotfix] Suppress emitting non-causal exceptions from closed checkpointing 
thread

This avoids that an exception that is caused by closing a running snapshot is 
reported.
With this we avoid that users get confused by their logs or that this exception 
could be
reported before its actual cause, thus hiding the real cause in logs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08d08810
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08d08810
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08d08810

Branch: refs/heads/master
Commit: 08d088103b1b4b2c0c772ec36a8b3e52b2588b5f
Parents: 32e25eb
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed Jan 17 18:02:25 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Sun Feb 25 15:59:54 2018 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 119 ++++++++++++-------
 .../tasks/StreamTaskTerminationTest.java        |  26 ++--
 2 files changed, 83 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08d08810/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index dba4c87..7ebbc71 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -814,8 +814,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                private final long asyncStartNanos;
 
-               private final 
AtomicReference<CheckpointingOperation.AsynCheckpointState> 
asyncCheckpointState = new AtomicReference<>(
-                       CheckpointingOperation.AsynCheckpointState.RUNNING);
+               private final 
AtomicReference<CheckpointingOperation.AsyncCheckpointState> 
asyncCheckpointState = new AtomicReference<>(
+                       CheckpointingOperation.AsyncCheckpointState.RUNNING);
 
                AsyncCheckpointRunnable(
                        StreamTask<?, ?> owner,
@@ -865,8 +865,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
 
-                               if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
-                                       
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+                               if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
+                                       
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
 
                                        reportCompletedSnapshotStates(
                                                
jobManagerTaskOperatorSubtaskStates,
@@ -917,63 +917,92 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
 
                private void handleExecutionException(Exception e) {
-                       // the state is completed if an exception occurred in 
the acknowledgeCheckpoint call
-                       // in order to clean up, we have to set it to RUNNING 
again.
-                       asyncCheckpointState.compareAndSet(
-                               
CheckpointingOperation.AsynCheckpointState.COMPLETED,
-                               
CheckpointingOperation.AsynCheckpointState.RUNNING);
 
-                       try {
-                               cleanup();
-                       } catch (Exception cleanupException) {
-                               e.addSuppressed(cleanupException);
-                       }
+                       boolean didCleanup = false;
+                       CheckpointingOperation.AsyncCheckpointState 
currentState = asyncCheckpointState.get();
 
-                       Exception checkpointException = new Exception(
-                               "Could not materialize checkpoint " + 
checkpointMetaData.getCheckpointId() + " for operator " +
-                                       owner.getName() + '.',
-                               e);
+                       while 
(CheckpointingOperation.AsyncCheckpointState.DISCARDED != currentState) {
 
-                       
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-                               checkpointMetaData,
-                               checkpointException);
+                               if (asyncCheckpointState.compareAndSet(
+                                       currentState,
+                                       
CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
+
+                                       didCleanup = true;
+
+                                       try {
+                                               cleanup();
+                                       } catch (Exception cleanupException) {
+                                               
e.addSuppressed(cleanupException);
+                                       }
+
+                                       Exception checkpointException = new 
Exception(
+                                               "Could not materialize 
checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +
+                                                       owner.getName() + '.',
+                                               e);
+
+                                       // We only report the exception for the 
original cause of fail and cleanup.
+                                       // Otherwise this followup exception 
could race the original exception in failing the task.
+                                       
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
+                                               checkpointMetaData,
+                                               checkpointException);
+
+                                       currentState = 
CheckpointingOperation.AsyncCheckpointState.DISCARDED;
+                               } else {
+                                       currentState = 
asyncCheckpointState.get();
+                               }
+                       }
+
+                       if (!didCleanup) {
+                               LOG.trace("Caught followup exception from a 
failed checkpoint thread. This can be ignored.", e);
+                       }
                }
 
                @Override
                public void close() {
-                       try {
-                               cleanup();
-                       } catch (Exception cleanupException) {
-                               LOG.warn("Could not properly clean up the async 
checkpoint runnable.", cleanupException);
+                       if (asyncCheckpointState.compareAndSet(
+                               
CheckpointingOperation.AsyncCheckpointState.RUNNING,
+                               
CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
+
+                               try {
+                                       cleanup();
+                               } catch (Exception cleanupException) {
+                                       LOG.warn("Could not properly clean up 
the async checkpoint runnable.", cleanupException);
+                               }
+                       } else {
+                               logFailedCleanupAttempt();
                        }
                }
 
                private void cleanup() throws Exception {
-                       if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
 CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
-                               LOG.debug("Cleanup AsyncCheckpointRunnable for 
checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName());
-                               Exception exception = null;
+                       LOG.debug(
+                               "Cleanup AsyncCheckpointRunnable for checkpoint 
{} of {}.",
+                               checkpointMetaData.getCheckpointId(),
+                               owner.getName());
 
-                               // clean up ongoing operator snapshot results 
and non partitioned state handles
-                               for (OperatorSnapshotFutures 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
-                                       if (operatorSnapshotResult != null) {
-                                               try {
-                                                       
operatorSnapshotResult.cancel();
-                                               } catch (Exception 
cancelException) {
-                                                       exception = 
ExceptionUtils.firstOrSuppressed(cancelException, exception);
-                                               }
+                       Exception exception = null;
+
+                       // clean up ongoing operator snapshot results and non 
partitioned state handles
+                       for (OperatorSnapshotFutures operatorSnapshotResult : 
operatorSnapshotsInProgress.values()) {
+                               if (operatorSnapshotResult != null) {
+                                       try {
+                                               operatorSnapshotResult.cancel();
+                                       } catch (Exception cancelException) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(cancelException, exception);
                                        }
                                }
+                       }
 
-                               if (null != exception) {
-                                       throw exception;
-                               }
-                       } else {
-                               LOG.debug("{} - asynchronous checkpointing 
operation for checkpoint {} has " +
-                                               "already been completed. Thus, 
the state handles are not cleaned up.",
-                                       owner.getName(),
-                                       checkpointMetaData.getCheckpointId());
+                       if (null != exception) {
+                               throw exception;
                        }
                }
+
+               private void logFailedCleanupAttempt() {
+                       LOG.debug("{} - asynchronous checkpointing operation 
for checkpoint {} has " +
+                                       "already been completed. Thus, the 
state handles are not cleaned up.",
+                               owner.getName(),
+                               checkpointMetaData.getCheckpointId());
+               }
        }
 
        public CloseableRegistry getCancelables() {
@@ -1088,7 +1117,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
 
-               private enum AsynCheckpointState {
+               private enum AsyncCheckpointState {
                        RUNNING,
                        DISCARDED,
                        COMPLETED

http://git-wip-us.apache.org/repos/asf/flink/blob/08d08810/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 62a903b..e5558f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -75,6 +75,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -82,6 +83,7 @@ import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
@@ -97,7 +99,6 @@ public class StreamTaskTerminationTest extends TestLogger {
        public static final OneShotLatch RUN_LATCH = new OneShotLatch();
        public static final OneShotLatch CHECKPOINTING_LATCH = new 
OneShotLatch();
        private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch();
-       private static final OneShotLatch HANDLE_ASYNC_EXCEPTION_LATCH = new 
OneShotLatch();
 
        /**
         * FLINK-6833
@@ -209,8 +210,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                }
 
                @Override
-               protected void init() throws Exception {
-
+               protected void init() {
                }
 
                @Override
@@ -226,24 +226,16 @@ public class StreamTaskTerminationTest extends TestLogger 
{
                        // has been stopped
                        CLEANUP_LATCH.trigger();
 
-                       // wait until handle async exception has been called to 
proceed with the termination of the
-                       // StreamTask
-                       HANDLE_ASYNC_EXCEPTION_LATCH.await();
+                       // wait until all async checkpoint threads are 
terminated, so that no more exceptions can be reported
+                       
Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, 
TimeUnit.SECONDS));
                }
 
                @Override
-               protected void cancelTask() throws Exception {
-               }
-
-               @Override
-               public void handleAsyncException(String message, Throwable 
exception) {
-                       super.handleAsyncException(message, exception);
-
-                       HANDLE_ASYNC_EXCEPTION_LATCH.trigger();
+               protected void cancelTask() {
                }
        }
 
-       static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
+       private static class NoOpStreamOperator<T> extends 
AbstractStreamOperator<T> {
                private static final long serialVersionUID = 
4517845269225218312L;
        }
 
@@ -252,7 +244,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                private static final long serialVersionUID = 
-5053068148933314100L;
 
                @Override
-               public CompletedCheckpointStorageLocation 
resolveCheckpoint(String pointer) throws IOException {
+               public CompletedCheckpointStorageLocation 
resolveCheckpoint(String pointer) {
                        throw new UnsupportedOperationException();
                }
 
@@ -269,7 +261,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+                       TaskKvStateRegistry kvStateRegistry) {
                        return null;
                }
 

Reply via email to