Repository: flink
Updated Branches:
  refs/heads/master 0a22acef4 -> 7c63526ad


[FLINK-4809] [checkpoints] Operators should tolerate checkpoint failures.

This closes #4883.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c63526a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c63526a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c63526a

Branch: refs/heads/master
Commit: 7c63526ad6f27c6f15625b8b6c48359d9532890b
Parents: 0a22ace
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri Oct 20 16:59:45 2017 +0800
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Nov 20 13:18:55 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state/checkpointing.md          |   5 +
 .../flink/api/common/ExecutionConfig.java       |  23 +
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../api/environment/CheckpointConfig.java       |  20 +
 .../api/graph/StreamingJobGraphGenerator.java   |  10 +-
 .../tasks/CheckpointExceptionHandler.java       |  36 ++
 .../CheckpointExceptionHandlerFactory.java      |  79 +++
 .../streaming/runtime/tasks/StreamTask.java     | 135 ++++--
 .../runtime/tasks/BlockingCheckpointsTest.java  | 314 ------------
 ...kpointExceptionHandlerConfigurationTest.java | 146 ++++++
 .../tasks/CheckpointExceptionHandlerTest.java   |  96 ++++
 .../streaming/runtime/tasks/StreamTaskTest.java |  19 +
 .../tasks/TaskCheckpointingBehaviourTest.java   | 481 +++++++++++++++++++
 13 files changed, 1003 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/docs/dev/stream/state/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/checkpointing.md 
b/docs/dev/stream/state/checkpointing.md
index 2a2edc9..4994ac7 100644
--- a/docs/dev/stream/state/checkpointing.md
+++ b/docs/dev/stream/state/checkpointing.md
@@ -74,6 +74,8 @@ Other parameters for checkpointing include:
 
   - *externalized checkpoints*: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are *not* automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the [deployment notes on externalized checkpoints]({{ 
site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints).
 
+  - *fail/continue task on checkpoint errors*: This determines if a task will 
be failed if an error occurs in the execution of the task's checkpoint 
procedure. This is the default behaviour. Alternatively, when this is disabled, 
the task will simply decline the checkpoint to the checkpoint coordinator and 
continue running.
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -118,6 +120,9 @@ env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
 // checkpoints have to complete within one minute, or are discarded
 env.getCheckpointConfig.setCheckpointTimeout(60000)
 
+// prevent the tasks from failing if an error happens in their checkpointing, 
the checkpoint will just be declined.
+env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
+
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 88d524e..9f39c46 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -150,6 +150,9 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        /** This flag defines if we use compression for the state snapshot data 
or not. Default: false */
        private boolean useSnapshotCompression = false;
 
+       /** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
+       private boolean failTaskOnCheckpointError = true;
+
        // ------------------------------- User code values 
--------------------------------------------
 
        private GlobalJobParameters globalJobParameters;
@@ -860,6 +863,26 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                this.useSnapshotCompression = useSnapshotCompression;
        }
 
+       /**
+        * This method is visible because of the way the configuration is 
currently forwarded from the checkpoint config to
+        * the task. This should not be called by the user, please use 
CheckpointConfig.isFailTaskOnCheckpointError()
+        * instead.
+        */
+       @Internal
+       public boolean isFailTaskOnCheckpointError() {
+               return failTaskOnCheckpointError;
+       }
+
+       /**
+        * This method is visible because of the way the configuration is 
currently forwarded from the checkpoint config to
+        * the task. This should not be called by the user, please use 
CheckpointConfig.setFailOnCheckpointingErrors(...)
+        * instead.
+        */
+       @Internal
+       public void setFailTaskOnCheckpointError(boolean 
failTaskOnCheckpointError) {
+               this.failTaskOnCheckpointError = failTaskOnCheckpointError;
+       }
+
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof ExecutionConfig) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 8ed06b2..0125a5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -53,6 +53,7 @@ public class DummyEnvironment implements Environment {
        private final ExecutionConfig executionConfig = new ExecutionConfig();
        private final TaskInfo taskInfo;
        private KvStateRegistry kvStateRegistry = new KvStateRegistry();
+       private final AccumulatorRegistry accumulatorRegistry = new 
AccumulatorRegistry(jobId, executionId);
 
        public DummyEnvironment(String taskName, int numSubTasks, int 
subTaskIndex) {
                this.taskInfo = new TaskInfo(taskName, numSubTasks, 
subTaskIndex, numSubTasks, 0);
@@ -143,7 +144,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public AccumulatorRegistry getAccumulatorRegistry() {
-               return null;
+               return accumulatorRegistry;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index e1b566e..342d4a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -70,6 +70,9 @@ public class CheckpointConfig implements java.io.Serializable 
{
        /** Cleanup behaviour for persistent checkpoints. */
        private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
 
+       /** Determines if a tasks are failed or not if there is an error in 
their checkpointing. Default: true */
+       private boolean failOnCheckpointingErrors = true;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -231,6 +234,23 @@ public class CheckpointConfig implements 
java.io.Serializable {
        }
 
        /**
+        * This determines the behaviour of tasks if there is an error in their 
local checkpointing. If this returns true,
+        * tasks will fail as a reaction. If this returns false, task will only 
decline the failed checkpoint.
+        */
+       public boolean isFailOnCheckpointingErrors() {
+               return failOnCheckpointingErrors;
+       }
+
+       /**
+        * Sets the expected behaviour for tasks in case that they encounter an 
error in their checkpointing procedure.
+        * If this is set to true, the task will fail on checkpointing error. 
If this is set to false, the task will only
+        * decline a the checkpoint and continue running. The default is true.
+        */
+       public void setFailOnCheckpointingErrors(boolean 
failOnCheckpointingErrors) {
+               this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+       }
+
+       /**
         * Enables checkpoints to be persisted externally.
         *
         * <p>Externalized checkpoints write their meta data out to persistent

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0364223..fce9dc9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -574,10 +575,15 @@ public class StreamingJobGraphGenerator {
 
                long interval = cfg.getCheckpointInterval();
                if (interval > 0) {
+
+                       ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
+                       // propagate the expected behaviour for checkpoint 
errors to task.
+                       
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
+
                        // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy
-                       if 
(streamGraph.getExecutionConfig().getRestartStrategy() == null) {
+                       if (executionConfig.getRestartStrategy() == null) {
                                // if the user enabled checkpointing, the 
default number of exec retries is infinite.
-                               
streamGraph.getExecutionConfig().setRestartStrategy(
+                               executionConfig.setRestartStrategy(
                                        
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
                        }
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
new file mode 100644
index 0000000..0140795
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+
+/**
+ * Handler for exceptions that happen on checkpointing. The handler can reject 
and rethrow the offered exceptions.
+ */
+public interface CheckpointExceptionHandler {
+
+       /**
+        * Offers the exception for handling. If the exception cannot be 
handled from this instance, it is rethrown.
+        *
+        * @param checkpointMetaData metadata for the checkpoint for which the 
exception occurred.
+        * @param exception  the exception to handle.
+        * @throws Exception rethrows the exception if it cannot be handled.
+        */
+       void tryHandleCheckpointException(CheckpointMetaData 
checkpointMetaData, Exception exception) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
new file mode 100644
index 0000000..430f43e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This factory produces {@link CheckpointExceptionHandler} instances that 
handle exceptions during checkpointing in a
+ * {@link StreamTask}.
+ */
+public class CheckpointExceptionHandlerFactory {
+
+       /**
+        * Returns a {@link CheckpointExceptionHandler} that either causes a 
task to fail completely or to just declines
+        * checkpoint on exception, depending on the parameter flag.
+        */
+       public CheckpointExceptionHandler createCheckpointExceptionHandler(
+               boolean failTaskOnCheckpointException,
+               Environment environment) {
+
+               if (failTaskOnCheckpointException) {
+                       return new FailingCheckpointExceptionHandler();
+               } else {
+                       return new 
DecliningCheckpointExceptionHandler(environment);
+               }
+       }
+
+       /**
+        * This handler makes the task fail by rethrowing a reported exception.
+        */
+       static final class FailingCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
+
+               @Override
+               public void tryHandleCheckpointException(
+                       CheckpointMetaData checkpointMetaData,
+                       Exception exception) throws Exception {
+
+                       throw exception;
+               }
+       }
+
+       /**
+        * This handler makes the task decline the checkpoint as reaction to 
the reported exception. The task is not failed.
+        */
+       static final class DecliningCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
+
+               final Environment environment;
+
+               DecliningCheckpointExceptionHandler(Environment environment) {
+                       this.environment = 
Preconditions.checkNotNull(environment);
+               }
+
+               @Override
+               public void tryHandleCheckpointException(
+                       CheckpointMetaData checkpointMetaData,
+                       Exception exception) throws Exception {
+
+                       
environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 68f590e..36e6748 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -172,6 +172,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        /** Thread pool for async snapshot workers. */
        private ExecutorService asyncOperationsThreadPool;
 
+       /** Handler for exceptions during checkpointing in the stream task. 
Used in synchronous part of the checkpoint. */
+       private CheckpointExceptionHandler 
synchronousCheckpointExceptionHandler;
+
+       /** Wrapper for synchronousCheckpointExceptionHandler to deal with 
rethrown exceptions. Used in the async part. */
+       private AsyncCheckpointExceptionHandler 
asynchronousCheckpointExceptionHandler;
+
        // 
------------------------------------------------------------------------
        //  Life cycle methods for specific implementations
        // 
------------------------------------------------------------------------
@@ -215,6 +221,14 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                        configuration = new 
StreamConfig(getTaskConfiguration());
 
+                       CheckpointExceptionHandlerFactory 
cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
+
+                       synchronousCheckpointExceptionHandler = 
cpExceptionHandlerFactory.createCheckpointExceptionHandler(
+                               
getExecutionConfig().isFailTaskOnCheckpointError(),
+                               getEnvironment());
+
+                       asynchronousCheckpointExceptionHandler = new 
AsyncCheckpointExceptionHandler(this);
+
                        stateBackend = createStateBackend();
 
                        accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
@@ -784,6 +798,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        targetLocation);
        }
 
+       protected CheckpointExceptionHandlerFactory 
createCheckpointExceptionHandlerFactory() {
+               return new CheckpointExceptionHandlerFactory();
+       }
+
        private String createOperatorIdentifier(StreamOperator<?> operator, int 
vertexId) {
 
                TaskInfo taskInfo = getEnvironment().getTaskInfo();
@@ -850,11 +868,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        CheckpointingOperation.AsynCheckpointState.RUNNING);
 
                AsyncCheckpointRunnable(
-                               StreamTask<?, ?> owner,
-                               Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress,
-                               CheckpointMetaData checkpointMetaData,
-                               CheckpointMetrics checkpointMetrics,
-                               long asyncStartNanos) {
+                       StreamTask<?, ?> owner,
+                       Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress,
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointMetrics checkpointMetrics,
+                       long asyncStartNanos) {
 
                        this.owner = Preconditions.checkNotNull(owner);
                        this.operatorSnapshotsInProgress = 
Preconditions.checkNotNull(operatorSnapshotsInProgress);
@@ -929,14 +947,14 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        e.addSuppressed(cleanupException);
                                }
 
-                               // registers the exception and tries to fail 
the whole task
-                               AsynchronousException asyncException = new 
AsynchronousException(
-                                       new Exception(
-                                               "Could not materialize 
checkpoint " + checkpointMetaData.getCheckpointId() +
-                                                       " for operator " + 
owner.getName() + '.',
-                                               e));
+                               Exception checkpointException = new Exception(
+                                       "Could not materialize checkpoint " + 
checkpointMetaData.getCheckpointId() + " for operator " +
+                                               owner.getName() + '.',
+                                       e);
 
-                               owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
+                               
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
+                                       checkpointMetaData,
+                                       checkpointException);
                        } finally {
                                owner.cancelables.unregisterCloseable(this);
                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
@@ -1020,7 +1038,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                public void executeCheckpointing() throws Exception {
                        startSyncPartNano = System.nanoTime();
 
-                       boolean failed = true;
                        try {
                                for (StreamOperator<?> op : allOperators) {
                                        checkpointStreamOperator(op);
@@ -1028,16 +1045,23 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Finished synchronous 
checkpoints for checkpoint {} on task {}",
-                                                       
checkpointMetaData.getCheckpointId(), owner.getName());
+                                               
checkpointMetaData.getCheckpointId(), owner.getName());
                                }
 
                                startAsyncPartNano = System.nanoTime();
 
                                
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - 
startSyncPartNano) / 1_000_000);
 
-                               // at this point we are transferring ownership 
over snapshotInProgressList for cleanup to the thread
-                               runAsyncCheckpointingAndAcknowledge();
-                               failed = false;
+                               // we are transferring ownership over 
snapshotInProgressList for cleanup to the thread, active on submit
+                               AsyncCheckpointRunnable asyncCheckpointRunnable 
= new AsyncCheckpointRunnable(
+                                       owner,
+                                       operatorSnapshotsInProgress,
+                                       checkpointMetaData,
+                                       checkpointMetrics,
+                                       startAsyncPartNano);
+
+                               
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
+                               
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
 
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("{} - finished synchronous 
part of checkpoint {}." +
@@ -1046,27 +1070,27 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                                                
checkpointMetrics.getSyncDurationMillis());
                                }
-                       } finally {
-                               if (failed) {
-                                       // Cleanup to release resources
-                                       for (OperatorSnapshotResult 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
-                                               if (null != 
operatorSnapshotResult) {
-                                                       try {
-                                                               
operatorSnapshotResult.cancel();
-                                                       } catch (Exception e) {
-                                                               LOG.warn("Could 
not properly cancel an operator snapshot result.", e);
-                                                       }
+                       } catch (Exception ex) {
+                               // Cleanup to release resources
+                               for (OperatorSnapshotResult 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
+                                       if (null != operatorSnapshotResult) {
+                                               try {
+                                                       
operatorSnapshotResult.cancel();
+                                               } catch (Exception e) {
+                                                       LOG.warn("Could not 
properly cancel an operator snapshot result.", e);
                                                }
                                        }
+                               }
 
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}." +
-                                                               "Alignment 
duration: {} ms, snapshot duration {} ms",
-                                                       owner.getName(), 
checkpointMetaData.getCheckpointId(),
-                                                       
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
-                                                       
checkpointMetrics.getSyncDurationMillis());
-                                       }
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}." +
+                                                       "Alignment duration: {} 
ms, snapshot duration {} ms",
+                                               owner.getName(), 
checkpointMetaData.getCheckpointId(),
+                                               
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+                                               
checkpointMetrics.getSyncDurationMillis());
                                }
+
+                               
owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 ex);
                        }
                }
 
@@ -1082,23 +1106,40 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
 
-               public void runAsyncCheckpointingAndAcknowledge() throws 
IOException {
-
-                       AsyncCheckpointRunnable asyncCheckpointRunnable = new 
AsyncCheckpointRunnable(
-                                       owner,
-                                       operatorSnapshotsInProgress,
-                                       checkpointMetaData,
-                                       checkpointMetrics,
-                                       startAsyncPartNano);
-
-                       
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
-                       
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
-               }
-
                private enum AsynCheckpointState {
                        RUNNING,
                        DISCARDED,
                        COMPLETED
                }
        }
+
+       /**
+        * Wrapper for synchronous {@link CheckpointExceptionHandler}. This 
implementation catches unhandled, rethrown
+        * exceptions and reports them through {@link 
#handleAsyncException(String, Throwable)}. As this implementation
+        * always handles the exception in some way, it never rethrows.
+        */
+       static final class AsyncCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
+
+               /** Owning stream task to which we report async exceptions. */
+               final StreamTask<?, ?> owner;
+
+               /** Synchronous exception handler to which we delegate. */
+               final CheckpointExceptionHandler 
synchronousCheckpointExceptionHandler;
+
+               AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
+                       this.owner = Preconditions.checkNotNull(owner);
+                       this.synchronousCheckpointExceptionHandler =
+                               
Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler);
+               }
+
+               @Override
+               public void tryHandleCheckpointException(CheckpointMetaData 
checkpointMetaData, Exception exception) {
+                       try {
+                               
synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 exception);
+                       } catch (Exception unhandled) {
+                               AsynchronousException asyncException = new 
AsynchronousException(unhandled);
+                               owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
deleted file mode 100644
index 81b3130..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
-import org.apache.flink.runtime.blob.PermanentBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamFilter;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * This test checks that task checkpoints that block and do not react to 
thread interrupts.
- */
-public class BlockingCheckpointsTest {
-
-       private static final OneShotLatch IN_CHECKPOINT_LATCH = new 
OneShotLatch();
-
-       @Test
-       public void testBlockingNonInterruptibleCheckpoint() throws Exception {
-
-               Configuration taskConfig = new Configuration();
-               StreamConfig cfg = new StreamConfig(taskConfig);
-               cfg.setStreamOperator(new TestOperator());
-               cfg.setOperatorID(new OperatorID());
-               cfg.setStateBackend(new LockingStreamStateBackend());
-
-               Task task = createTask(taskConfig);
-
-               // start the task and wait until it is in "restore"
-               task.startTaskThread();
-               IN_CHECKPOINT_LATCH.await();
-
-               // cancel the task and wait. unless cancellation properly closes
-               // the streams, this will never terminate
-               task.cancelExecution();
-               task.getExecutingThread().join();
-
-               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
-               assertNull(task.getFailureCause());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static Task createTask(Configuration taskConfig) throws 
IOException {
-
-               JobInformation jobInformation = new JobInformation(
-                               new JobID(),
-                               "test job name",
-                               new SerializedValue<>(new ExecutionConfig()),
-                               new Configuration(),
-                               Collections.emptyList(),
-                               Collections.emptyList());
-
-               TaskInformation taskInformation = new TaskInformation(
-                               new JobVertexID(),
-                               "test task name",
-                               1,
-                               11,
-                               TestStreamTask.class.getName(),
-                               taskConfig);
-
-               TaskKvStateRegistry mockKvRegistry = 
mock(TaskKvStateRegistry.class);
-               NetworkEnvironment network = mock(NetworkEnvironment.class);
-               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class))).thenReturn(mockKvRegistry);
-
-               BlobCacheService blobService =
-                       new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
-
-               return new Task(
-                               jobInformation,
-                               taskInformation,
-                               new ExecutionAttemptID(),
-                               new AllocationID(),
-                               0,
-                               0,
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               0,
-                               null,
-                               mock(MemoryManager.class),
-                               mock(IOManager.class),
-                               network,
-                               mock(BroadcastVariableManager.class),
-                               mock(TaskManagerActions.class),
-                               mock(InputSplitProvider.class),
-                               mock(CheckpointResponder.class),
-                               blobService,
-                               new BlobLibraryCacheManager(
-                                       blobService.getPermanentBlobService(),
-                                       
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
-                                       new String[0]),
-                               new FileCache(new String[] { 
EnvironmentInformation.getTemporaryFileDirectory() }),
-                               new TestingTaskManagerRuntimeInfo(),
-                               new UnregisteredTaskMetricsGroup(),
-                               mock(ResultPartitionConsumableNotifier.class),
-                               mock(PartitionProducerStateChecker.class),
-                               Executors.directExecutor());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  state backend with locking output stream
-       // 
------------------------------------------------------------------------
-
-       private static class LockingStreamStateBackend extends 
AbstractStateBackend {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public CheckpointStreamFactory createStreamFactory(JobID jobId, 
String operatorIdentifier) throws IOException {
-                       return new LockingOutputStreamFactory();
-               }
-
-               @Override
-               public CheckpointStreamFactory 
createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String 
targetLocation) throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-                               Environment env, JobID jobID, String 
operatorIdentifier,
-                               TypeSerializer<K> keySerializer, int 
numberOfKeyGroups,
-                               KeyGroupRange keyGroupRange, 
TaskKvStateRegistry kvStateRegistry) {
-
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public OperatorStateBackend 
createOperatorStateBackend(Environment env, String operatorIdentifier) throws 
Exception {
-                       return new DefaultOperatorStateBackend(
-                               getClass().getClassLoader(),
-                               new ExecutionConfig(),
-                               true);
-               }
-       }
-
-       private static final class LockingOutputStreamFactory implements 
CheckpointStreamFactory {
-
-               @Override
-               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) {
-                       return new LockingOutputStream();
-               }
-
-               @Override
-               public void close() {}
-       }
-
-       private static final class LockingOutputStream extends 
CheckpointStateOutputStream {
-
-               private final Object lock = new Object();
-               private volatile boolean closed;
-
-               @Override
-               public StreamStateHandle closeAndGetHandle() throws IOException 
{
-                       return null;
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       // this needs to not react to interrupts until the 
handle is closed
-                       synchronized (lock) {
-                               while (!closed) {
-                                       try {
-                                               lock.wait();
-                                       }
-                                       catch (InterruptedException ignored) {}
-                               }
-                       }
-               }
-
-               @Override
-               public void close() throws IOException {
-                       synchronized (lock) {
-                               closed = true;
-                               lock.notifyAll();
-                       }
-               }
-
-               @Override
-               public long getPos() {
-                       return 0;
-               }
-
-               @Override
-               public void flush() {}
-
-               @Override
-               public void sync() {}
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test source operator that calls into the locking checkpoint output 
stream
-       // 
------------------------------------------------------------------------
-
-       @SuppressWarnings("serial")
-       private static final class TestOperator extends StreamFilter<Object> {
-               private static final long serialVersionUID = 1L;
-
-               public TestOperator() {
-                       super(new FilterFunction<Object>() {
-                               @Override
-                               public boolean filter(Object value) {
-                                       return false;
-                               }
-                       });
-               }
-
-               @Override
-               public void snapshotState(StateSnapshotContext context) throws 
Exception {
-                       OperatorStateCheckpointOutputStream outStream = 
context.getRawOperatorStateOutput();
-
-                       IN_CHECKPOINT_LATCH.trigger();
-
-                       // this should lock
-                       outStream.write(1);
-               }
-       }
-
-       /**
-        * Stream task that simply triggers a checkpoint.
-        */
-       public static final class TestStreamTask extends 
OneInputStreamTask<Object, Object> {
-
-               @Override
-               public void init() {}
-
-               @Override
-               protected void run() throws Exception {
-                       triggerCheckpointOnBarrier(new CheckpointMetaData(11L, 
System.currentTimeMillis()), CheckpointOptions.forCheckpoint(), new 
CheckpointMetrics());
-               }
-
-               @Override
-               protected void cleanup() {}
-
-               @Override
-               protected void cancelTask() {}
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
new file mode 100644
index 0000000..69f6935
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test that the configuration mechanism for how tasks react on checkpoint 
errors works correctly.
+ */
+public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
+
+       @Test
+       public void testConfigurationFailOnException() throws Exception {
+               testConfigForwarding(true);
+       }
+
+       @Test
+       public void testConfigurationDeclineOnException() throws Exception {
+               testConfigForwarding(false);
+       }
+
+       @Test
+       public void testFailIsDefaultConfig() {
+               ExecutionConfig newExecutionConfig = new ExecutionConfig();
+               
Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError());
+       }
+
+       private void testConfigForwarding(boolean failOnException) throws 
Exception {
+
+               final boolean expectedHandlerFlag = failOnException;
+               DummyEnvironment environment = new DummyEnvironment("test", 1, 
0);
+               
environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
+               final CheckpointExceptionHandlerFactory inspectingFactory = new 
CheckpointExceptionHandlerFactory() {
+
+                       @Override
+                       public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
+                               boolean failTaskOnCheckpointException,
+                               Environment environment) {
+
+                               Assert.assertEquals(expectedHandlerFlag, 
failTaskOnCheckpointException);
+                               return 
super.createCheckpointExceptionHandler(failTaskOnCheckpointException, 
environment);
+                       }
+               };
+
+               StreamTask streamTask = new StreamTask() {
+                       @Override
+                       protected void init() throws Exception {
+
+                       }
+
+                       @Override
+                       protected void run() throws Exception {
+
+                       }
+
+                       @Override
+                       protected void cleanup() throws Exception {
+
+                       }
+
+                       @Override
+                       protected void cancelTask() throws Exception {
+
+                       }
+
+                       @Override
+                       protected CheckpointExceptionHandlerFactory 
createCheckpointExceptionHandlerFactory() {
+                               return inspectingFactory;
+                       }
+               };
+
+               streamTask.setEnvironment(environment);
+               streamTask.invoke();
+       }
+
+       @Test
+       public void testCheckpointConfigDefault() throws Exception {
+               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
+       }
+
+       @Test
+       public void testPropagationFailFromCheckpointConfig() throws Exception {
+               doTestPropagationFromCheckpointConfig(true);
+       }
+
+       @Test
+       public void testPropagationDeclineFromCheckpointConfig() throws 
Exception {
+               doTestPropagationFromCheckpointConfig(false);
+       }
+
+       public void doTestPropagationFromCheckpointConfig(boolean 
failTaskOnCheckpointErrors) throws Exception {
+               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               streamExecutionEnvironment.setParallelism(1);
+               
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
+               
streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
+               streamExecutionEnvironment.addSource(new 
SourceFunction<Integer>() {
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+
+               }).addSink(new DiscardingSink<>());
+
+               StreamGraph streamGraph = 
streamExecutionEnvironment.getStreamGraph();
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+               SerializedValue<ExecutionConfig> serializedExecutionConfig = 
jobGraph.getSerializedExecutionConfig();
+               ExecutionConfig executionConfig =
+                       
serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
+
+               Assert.assertEquals(failTaskOnCheckpointErrors, 
executionConfig.isFailTaskOnCheckpointError());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
new file mode 100644
index 0000000..2f58162
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for the current implementations of {@link CheckpointExceptionHandler} 
and their factory.
+ */
+public class CheckpointExceptionHandlerTest extends TestLogger {
+
+       @Test
+       public void testRethrowingHandler() {
+               DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+               CheckpointExceptionHandler exceptionHandler =
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+               CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+               Exception testException = new Exception("test");
+               try {
+                       
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+                       Assert.fail("Exception not rethrown.");
+               } catch (Exception e) {
+                       Assert.assertEquals(testException, e);
+               }
+
+               Assert.assertNull(environment.getLastDeclinedCheckpointCause());
+       }
+
+       @Test
+       public void testDecliningHandler() {
+               DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+               CheckpointExceptionHandler exceptionHandler =
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false, 
environment);
+
+               CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+               Exception testException = new Exception("test");
+               try {
+                       
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+               } catch (Exception e) {
+                       Assert.fail("Exception not handled, but rethrown.");
+               }
+
+               Assert.assertEquals(failedCheckpointMetaData.getCheckpointId(), 
environment.getLastDeclinedCheckpointId());
+               Assert.assertEquals(testException, 
environment.getLastDeclinedCheckpointCause());
+       }
+
+       static final class DeclineDummyEnvironment extends DummyEnvironment {
+
+               private long lastDeclinedCheckpointId;
+               private Throwable lastDeclinedCheckpointCause;
+
+               DeclineDummyEnvironment() {
+                       super("test", 1, 0);
+                       this.lastDeclinedCheckpointId = Long.MIN_VALUE;
+                       this.lastDeclinedCheckpointCause = null;
+               }
+
+               @Override
+               public void declineCheckpoint(long checkpointId, Throwable 
cause) {
+                       this.lastDeclinedCheckpointId = checkpointId;
+                       this.lastDeclinedCheckpointCause = cause;
+               }
+
+               long getLastDeclinedCheckpointId() {
+                       return lastDeclinedCheckpointId;
+               }
+
+               Throwable getLastDeclinedCheckpointCause() {
+                       return lastDeclinedCheckpointCause;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 4b1eb3f..b31fb41 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -344,10 +344,20 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, "cancelables", new 
CloseableRegistry());
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
 
+               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+               CheckpointExceptionHandler checkpointExceptionHandler =
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
+               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+
+               StreamTask.AsyncCheckpointExceptionHandler 
asyncCheckpointExceptionHandler =
+                       new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
+               Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+
                try {
                        streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpoint());
                        fail("Expected test exception here.");
                } catch (Exception e) {
+                       e.printStackTrace();
                        assertEquals(testException, e.getCause());
                }
 
@@ -412,6 +422,15 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, 
"asyncOperationsThreadPool", new DirectExecutorService());
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
 
+               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+               CheckpointExceptionHandler checkpointExceptionHandler =
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
+               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+
+               StreamTask.AsyncCheckpointExceptionHandler 
asyncCheckpointExceptionHandler =
+                       new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
+               Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+
                streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpoint());
 
                verify(streamTask).handleAsyncException(anyString(), 
any(Throwable.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
new file mode 100644
index 0000000..d755c56
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test checks that task checkpoints that block and do not react to 
thread interrupts. It also checks correct
+ * working of different policies how tasks deal with checkpoint failures (fail 
task, decline checkpoint and continue).
+ */
+public class TaskCheckpointingBehaviourTest extends TestLogger {
+
+       private static final OneShotLatch IN_CHECKPOINT_LATCH = new 
OneShotLatch();
+
+       @Test
+       public void testDeclineOnCheckpointErrorInSyncPart() throws Exception {
+               runTestDeclineOnCheckpointError(new 
SyncFailureInducingStateBackend());
+       }
+
+       @Test
+       public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
+               runTestDeclineOnCheckpointError(new 
AsyncFailureInducingStateBackend());
+       }
+
+       @Test
+       public void testTaskFailingOnCheckpointErrorInSyncPart() throws 
Exception {
+               Throwable failureCause = 
runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend());
+               assertNotNull(failureCause);
+
+               String expectedMessageStart = "Could not perform checkpoint";
+               assertEquals(expectedMessageStart, 
failureCause.getMessage().substring(0, expectedMessageStart.length()));
+       }
+
+       @Test
+       public void testTaskFailingOnCheckpointErrorInAsyncPart() throws 
Exception {
+               Throwable failureCause = 
runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend());
+               assertEquals(AsynchronousException.class, 
failureCause.getClass());
+       }
+
+       @Test
+       public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+               Task task =
+                       createTask(new TestOperator(), new 
LockingStreamStateBackend(), mock(CheckpointResponder.class), true);
+
+               // start the task and wait until it is in "restore"
+               task.startTaskThread();
+               IN_CHECKPOINT_LATCH.await();
+
+               // cancel the task and wait. unless cancellation properly closes
+               // the streams, this will never terminate
+               task.cancelExecution();
+               task.getExecutingThread().join();
+
+               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+               assertNull(task.getFailureCause());
+       }
+
+       private void runTestDeclineOnCheckpointError(AbstractStateBackend 
backend) throws Exception{
+
+               TestDeclinedCheckpointResponder checkpointResponder = new 
TestDeclinedCheckpointResponder();
+
+               Task task =
+                       createTask(new FilterOperator(), backend, 
checkpointResponder, false);
+
+               // start the task and wait until it is in "restore"
+               task.startTaskThread();
+
+               checkpointResponder.declinedLatch.await();
+
+               Assert.assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
+
+               task.cancelExecution();
+               task.getExecutingThread().join();
+       }
+
+       private Throwable 
runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws 
Exception {
+
+               Task task =
+                       createTask(new FilterOperator(), backend, 
mock(CheckpointResponder.class), true);
+
+               // start the task and wait until it is in "restore"
+               task.startTaskThread();
+
+               task.getExecutingThread().join();
+
+               assertEquals(ExecutionState.FAILED, task.getExecutionState());
+               return task.getFailureCause();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static Task createTask(
+               StreamOperator<?> op,
+               AbstractStateBackend backend,
+               CheckpointResponder checkpointResponder,
+               boolean failOnCheckpointErrors) throws IOException {
+
+               Configuration taskConfig = new Configuration();
+               StreamConfig cfg = new StreamConfig(taskConfig);
+               cfg.setStreamOperator(op);
+               cfg.setOperatorID(new OperatorID());
+               cfg.setStateBackend(backend);
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
+
+               JobInformation jobInformation = new JobInformation(
+                               new JobID(),
+                               "test job name",
+                               new SerializedValue<>(executionConfig),
+                               new Configuration(),
+                               Collections.emptyList(),
+                               Collections.emptyList());
+
+               TaskInformation taskInformation = new TaskInformation(
+                               new JobVertexID(),
+                               "test task name",
+                               1,
+                               11,
+                               TestStreamTask.class.getName(),
+                               taskConfig);
+
+               TaskKvStateRegistry mockKvRegistry = 
mock(TaskKvStateRegistry.class);
+               NetworkEnvironment network = mock(NetworkEnvironment.class);
+               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class))).thenReturn(mockKvRegistry);
+
+               BlobCacheService blobService =
+                       new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
+
+               return new Task(
+                               jobInformation,
+                               taskInformation,
+                               new ExecutionAttemptID(),
+                               new AllocationID(),
+                               0,
+                               0,
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               0,
+                               null,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               network,
+                               mock(BroadcastVariableManager.class),
+                               mock(TaskManagerActions.class),
+                               mock(InputSplitProvider.class),
+                               checkpointResponder,
+                               blobService,
+                               new BlobLibraryCacheManager(
+                                       blobService.getPermanentBlobService(),
+                                       
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+                                       new String[0]),
+                               new FileCache(new String[] { 
EnvironmentInformation.getTemporaryFileDirectory() }),
+                               new TestingTaskManagerRuntimeInfo(),
+                               new UnregisteredTaskMetricsGroup(),
+                               mock(ResultPartitionConsumableNotifier.class),
+                               mock(PartitionProducerStateChecker.class),
+                               Executors.directExecutor());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  checkpoint responder that records a call to decline.
+       // 
------------------------------------------------------------------------
+       private static class TestDeclinedCheckpointResponder implements 
CheckpointResponder {
+
+               OneShotLatch declinedLatch = new OneShotLatch();
+
+               @Override
+               public void acknowledgeCheckpoint(
+                       JobID jobID,
+                       ExecutionAttemptID executionAttemptID,
+                       long checkpointId,
+                       CheckpointMetrics checkpointMetrics,
+                       TaskStateSnapshot subtaskState) {
+
+                       throw new RuntimeException("Unexpected call.");
+               }
+
+               @Override
+               public void declineCheckpoint(
+                       JobID jobID,
+                       ExecutionAttemptID executionAttemptID,
+                       long checkpointId,
+                       Throwable cause) {
+
+                       declinedLatch.trigger();
+               }
+
+               public OneShotLatch getDeclinedLatch() {
+                       return declinedLatch;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  state backends that trigger errors in checkpointing.
+       // 
------------------------------------------------------------------------
+
+       private static class SyncFailureInducingStateBackend extends 
MemoryStateBackend {
+
+               @Override
+               public OperatorStateBackend 
createOperatorStateBackend(Environment env, String operatorIdentifier) throws 
Exception {
+                       return new DefaultOperatorStateBackend(
+                               env.getUserClassLoader(),
+                               env.getExecutionConfig(),
+                               true) {
+                               @Override
+                               public RunnableFuture<OperatorStateHandle> 
snapshot(
+                                       long checkpointId,
+                                       long timestamp,
+                                       CheckpointStreamFactory streamFactory,
+                                       CheckpointOptions checkpointOptions) 
throws Exception {
+
+                                       throw new Exception("Sync part snapshot 
exception.");
+                               }
+                       };
+               }
+       }
+
+       private static class AsyncFailureInducingStateBackend extends 
MemoryStateBackend {
+
+               @Override
+               public OperatorStateBackend 
createOperatorStateBackend(Environment env, String operatorIdentifier) throws 
Exception {
+                       return new DefaultOperatorStateBackend(
+                               env.getUserClassLoader(),
+                               env.getExecutionConfig(),
+                               true) {
+                               @Override
+                               public RunnableFuture<OperatorStateHandle> 
snapshot(
+                                       long checkpointId,
+                                       long timestamp,
+                                       CheckpointStreamFactory streamFactory,
+                                       CheckpointOptions checkpointOptions) 
throws Exception {
+
+                                       return new FutureTask<>(new 
Callable<OperatorStateHandle>() {
+                                               @Override
+                                               public OperatorStateHandle 
call() throws Exception {
+                                                       throw new 
Exception("Async part snapshot exception.");
+                                               }
+                                       });
+                               }
+                       };
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  state backend with locking output stream.
+       // 
------------------------------------------------------------------------
+
+       private static class LockingStreamStateBackend extends 
MemoryStateBackend {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public CheckpointStreamFactory createStreamFactory(JobID jobId, 
String operatorIdentifier) throws IOException {
+                       return new LockingOutputStreamFactory();
+               }
+
+               @Override
+               public OperatorStateBackend 
createOperatorStateBackend(Environment env, String operatorIdentifier) throws 
Exception {
+                       return new DefaultOperatorStateBackend(
+                               getClass().getClassLoader(),
+                               new ExecutionConfig(),
+                               true);
+               }
+       }
+
+       private static final class LockingOutputStreamFactory implements 
CheckpointStreamFactory {
+
+               @Override
+               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) {
+                       return new LockingOutputStream();
+               }
+
+               @Override
+               public void close() {}
+       }
+
+       private static final class LockingOutputStream extends 
CheckpointStateOutputStream {
+
+               private final Object lock = new Object();
+               private volatile boolean closed;
+
+               @Override
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       return null;
+               }
+
+               @Override
+               public void write(int b) throws IOException {
+                       // this needs to not react to interrupts until the 
handle is closed
+                       synchronized (lock) {
+                               while (!closed) {
+                                       try {
+                                               lock.wait();
+                                       }
+                                       catch (InterruptedException ignored) {}
+                               }
+                       }
+               }
+
+               @Override
+               public void close() throws IOException {
+                       synchronized (lock) {
+                               closed = true;
+                               lock.notifyAll();
+                       }
+               }
+
+               @Override
+               public long getPos() {
+                       return 0;
+               }
+
+               @Override
+               public void flush() {}
+
+               @Override
+               public void sync() {}
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test source operator that calls into the locking checkpoint output 
stream.
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class FilterOperator extends StreamFilter<Object> {
+               private static final long serialVersionUID = 1L;
+
+               public FilterOperator() {
+                       super(new FilterFunction<Object>() {
+                               @Override
+                               public boolean filter(Object value) {
+                                       return false;
+                               }
+                       });
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class TestOperator extends StreamFilter<Object> {
+               private static final long serialVersionUID = 1L;
+
+               public TestOperator() {
+                       super(new FilterFunction<Object>() {
+                               @Override
+                               public boolean filter(Object value) {
+                                       return false;
+                               }
+                       });
+               }
+
+               @Override
+               public void snapshotState(StateSnapshotContext context) throws 
Exception {
+                       OperatorStateCheckpointOutputStream outStream = 
context.getRawOperatorStateOutput();
+
+                       IN_CHECKPOINT_LATCH.trigger();
+
+                       // this should lock
+                       outStream.write(1);
+               }
+       }
+
+       /**
+        * Stream task that simply triggers a checkpoint.
+        */
+       public static final class TestStreamTask extends 
OneInputStreamTask<Object, Object> {
+
+               @Override
+               public void init() {}
+
+               @Override
+               protected void run() throws Exception {
+
+                       triggerCheckpointOnBarrier(
+                               new CheckpointMetaData(
+                                       11L,
+                                       System.currentTimeMillis()),
+                               CheckpointOptions.forCheckpoint(),
+                               new CheckpointMetrics());
+
+                       while (isRunning()) {
+                               Thread.sleep(1L);
+                       }
+               }
+
+               @Override
+               protected void cleanup() {}
+
+               @Override
+               protected void cancelTask() {}
+       }
+}

Reply via email to