[FLINK-7619] Improved abstraction of AbstractAsyncIOCallable to better fit the 
current usage pattern.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5af463a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5af463a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5af463a9

Branch: refs/heads/master
Commit: 5af463a9c0ff62603bc342a78dfd5483d834e8a7
Parents: 0073204
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Thu Sep 7 11:24:12 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Sep 25 16:04:15 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 198 +++++++++++--------
 .../apache/flink/core/fs/CloseableRegistry.java |   4 +-
 .../flink/core/fs/ClosingFSDataInputStream.java |   4 +-
 .../core/fs/ClosingFSDataOutputStream.java      |   4 +-
 .../core/fs/SafetyNetCloseableRegistry.java     |   8 +-
 .../flink/util/AbstractCloseableRegistry.java   |  11 +-
 .../core/fs/AbstractCloseableRegistryTest.java  |  11 +-
 .../flink/core/fs/CloseableRegistryTest.java    |   2 +-
 .../AbstractAsyncSnapshotIOCallable.java        | 109 ----------
 .../AbstractAsyncCallableWithResources.java     | 194 ++++++++++++++++++
 .../io/async/AbstractAsyncIOCallable.java       | 157 ---------------
 .../flink/runtime/io/async/AsyncStoppable.java  |   4 +-
 .../state/DefaultOperatorStateBackend.java      |  66 +++++--
 .../state/StateInitializationContextImpl.java   |  17 +-
 .../StateSnapshotContextSynchronousImpl.java    |  25 +--
 .../state/heap/HeapKeyedStateBackend.java       |  85 +++++---
 .../streaming/runtime/tasks/StreamTask.java     |   8 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   2 +-
 .../util/AbstractStreamOperatorTestHarness.java |   2 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   3 +-
 20 files changed, 468 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index dd5b852..a1500c7 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -43,7 +43,7 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -384,8 +384,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                final CheckpointStreamFactory streamFactory) throws Exception {
 
                long startTime = System.currentTimeMillis();
+               final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
+
+               final RocksDBFullSnapshotOperation<K> snapshotOperation;
 
-               final RocksDBFullSnapshotOperation<K> snapshotOperation = new 
RocksDBFullSnapshotOperation<>(this, streamFactory);
                // hold the db lock while operation on the db to guard us 
against async db disposal
                synchronized (asyncSnapshotLock) {
 
@@ -399,6 +401,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        return DoneFuture.nullValue();
                                }
 
+                               snapshotOperation =
+                                       new 
RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
+
                                snapshotOperation.takeDBSnapShot(checkpointId, 
timestamp);
                        } else {
                                throw new IOException("RocksDB closed.");
@@ -406,30 +411,55 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                // implementation of the async IO operation, based on FutureTask
-               AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
-                       new AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
+               AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable 
=
+                       new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
 
                                @Override
-                               public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
+                               protected void acquireResources() throws 
Exception {
+                                       
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
                                        
snapshotOperation.openCheckpointStream();
-                                       return snapshotOperation.getOutStream();
                                }
 
                                @Override
-                               public KeyGroupsStateHandle performOperation() 
throws Exception {
-                                       long startTime = 
System.currentTimeMillis();
+                               protected void releaseResources() throws 
Exception {
+                                       closeLocalRegistry();
+                                       releaseSnapshotOperationResources();
+                               }
+
+                               private void 
releaseSnapshotOperationResources() {
+                                       // hold the db lock while operation on 
the db to guard us against async db disposal
                                        synchronized (asyncSnapshotLock) {
+                                               
snapshotOperation.releaseSnapshotResources();
+                                       }
+                               }
+
+                               @Override
+                               protected void stopOperation() throws Exception 
{
+                                       closeLocalRegistry();
+                               }
+
+                               private void closeLocalRegistry() {
+                                       if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
                                                try {
-                                                       // hold the db lock 
while operation on the db to guard us against async db disposal
-                                                       if (db == null) {
-                                                               throw new 
IOException("RocksDB closed.");
-                                                       }
+                                                       
snapshotCloseableRegistry.close();
+                                               } catch (Exception ex) {
+                                                       LOG.warn("Error closing 
local registry", ex);
+                                               }
+                                       }
+                               }
 
-                                                       
snapshotOperation.writeDBSnapshot();
+                               @Override
+                               public KeyGroupsStateHandle performOperation() 
throws Exception {
+                                       long startTime = 
System.currentTimeMillis();
 
-                                               } finally {
-                                                       
snapshotOperation.closeCheckpointStream();
+                                       synchronized (asyncSnapshotLock) {
+                                               // hold the db lock while 
operation on the db to guard us against async db disposal
+                                               if (db == null) {
+                                                       throw new 
IOException("RocksDB closed.");
                                                }
+
+                                               
snapshotOperation.writeDBSnapshot();
+                                               
snapshotOperation.createSnapshotResultStateHandleFromOutputStream();
                                        }
 
                                        LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
@@ -437,18 +467,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                        return 
snapshotOperation.getSnapshotResultStateHandle();
                                }
-
-                               private void 
releaseSnapshotOperationResources(boolean canceled) {
-                                       // hold the db lock while operation on 
the db to guard us against async db disposal
-                                       synchronized (asyncSnapshotLock) {
-                                               
snapshotOperation.releaseSnapshotResources(canceled);
-                                       }
-                               }
-
-                               @Override
-                               public void done(boolean canceled) {
-                                       
releaseSnapshotOperationResources(canceled);
-                               }
                        };
 
                LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", 
synchronous part) in thread " +
@@ -468,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private final RocksDBKeyedStateBackend<K> stateBackend;
                private final KeyGroupRangeOffsets keyGroupRangeOffsets;
                private final CheckpointStreamFactory checkpointStreamFactory;
+               private final CloseableRegistry snapshotCloseableRegistry;
 
                private long checkpointId;
                private long checkpointTimeStamp;
@@ -482,11 +501,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                RocksDBFullSnapshotOperation(
                        RocksDBKeyedStateBackend<K> stateBackend,
-                       CheckpointStreamFactory checkpointStreamFactory) {
+                       CheckpointStreamFactory checkpointStreamFactory,
+                       CloseableRegistry registry) {
 
                        this.stateBackend = stateBackend;
                        this.checkpointStreamFactory = checkpointStreamFactory;
                        this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
+                       this.snapshotCloseableRegistry = registry;
                }
 
                /**
@@ -510,9 +531,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 */
                public void openCheckpointStream() throws Exception {
                        Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
-                       outStream = checkpointStreamFactory.
-                               createCheckpointStateOutputStream(checkpointId, 
checkpointTimeStamp);
-                       
stateBackend.cancelStreamRegistry.registerClosable(outStream);
+                       outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, 
checkpointTimeStamp);
+                       snapshotCloseableRegistry.registerCloseable(outStream);
                        outputView = new DataOutputViewStreamWrapper(outStream);
                }
 
@@ -537,18 +557,25 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 *
                 * @throws IOException
                 */
-               public void closeCheckpointStream() throws IOException {
-                       if (outStream != null) {
-                               snapshotResultStateHandle = 
closeSnapshotStreamAndGetHandle();
-                       } else {
-                               snapshotResultStateHandle = null;
+               public void createSnapshotResultStateHandleFromOutputStream() 
throws IOException {
+
+                       if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
+
+                               StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
+                               outStream = null;
+
+                               if (stateHandle != null) {
+                                       this.snapshotResultStateHandle = new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+                               }
                        }
                }
 
                /**
                 * 5) Release the snapshot object for RocksDB and clean up.
                 */
-               public void releaseSnapshotResources(boolean canceled) {
+               public void releaseSnapshotResources() {
+
+                       outStream = null;
 
                        if (null != kvStateIterators) {
                                for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
@@ -569,12 +596,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                IOUtils.closeQuietly(readOptions);
                                readOptions = null;
                        }
+               }
 
-                       if (canceled) {
+               /**
+                * Drop the created snapshot if we have ben cancelled.
+                */
+               public void dropSnapshotResult() {
+                       if (null != snapshotResultStateHandle) {
                                try {
-                                       if (null != snapshotResultStateHandle) {
-                                               
snapshotResultStateHandle.discardState();
-                                       }
+                                       
snapshotResultStateHandle.discardState();
                                } catch (Exception e) {
                                        LOG.warn("Exception occurred during 
snapshot state handle cleanup.", e);
                                }
@@ -719,13 +749,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                }
 
-               private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() 
throws IOException {
-                       
stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
-                       StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
-                       outStream = null;
-                       return stateHandle != null ? new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
-               }
-
                private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
                        BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
                        BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
@@ -805,11 +828,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                FileSystem backupFileSystem = 
backupPath.getFileSystem();
                                inputStream = backupFileSystem.open(filePath);
-                               closeableRegistry.registerClosable(inputStream);
+                               
closeableRegistry.registerCloseable(inputStream);
 
                                outputStream = checkpointStreamFactory
                                        
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-                               
closeableRegistry.registerClosable(outputStream);
+                               
closeableRegistry.registerCloseable(outputStream);
 
                                while (true) {
                                        int numBytes = inputStream.read(buffer);
@@ -821,19 +844,19 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        outputStream.write(buffer, 0, numBytes);
                                }
 
-                               
closeableRegistry.unregisterClosable(outputStream);
-                               StreamStateHandle result = 
outputStream.closeAndGetHandle();
-                               outputStream = null;
-
+                               StreamStateHandle result = null;
+                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
+                                       result = 
outputStream.closeAndGetHandle();
+                                       outputStream = null;
+                               }
                                return result;
+
                        } finally {
-                               if (inputStream != null) {
-                                       
closeableRegistry.unregisterClosable(inputStream);
+                               if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
                                        inputStream.close();
                                }
 
-                               if (outputStream != null) {
-                                       
closeableRegistry.unregisterClosable(outputStream);
+                               if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
                                        outputStream.close();
                                }
                        }
@@ -845,7 +868,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        try {
                                outputStream = checkpointStreamFactory
                                        
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-                               
closeableRegistry.registerClosable(outputStream);
+                               
closeableRegistry.registerCloseable(outputStream);
 
                                //no need for compression scheme support 
because sst-files are already compressed
                                KeyedBackendSerializationProxy<K> 
serializationProxy =
@@ -858,15 +881,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                serializationProxy.write(out);
 
-                               
closeableRegistry.unregisterClosable(outputStream);
-                               StreamStateHandle result = 
outputStream.closeAndGetHandle();
-                               outputStream = null;
-
+                               StreamStateHandle result = null;
+                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
+                                       result = 
outputStream.closeAndGetHandle();
+                                       outputStream = null;
+                               }
                                return result;
                        } finally {
                                if (outputStream != null) {
-                                       
closeableRegistry.unregisterClosable(outputStream);
-                                       outputStream.close();
+                                       if 
(closeableRegistry.unregisterCloseable(outputStream)) {
+                                               outputStream.close();
+                                       }
                                }
                        }
                }
@@ -905,7 +930,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                KeyedStateHandle materializeSnapshot() throws Exception {
 
-                       
stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
+                       
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
 
                        // write meta data
                        metaStateHandle = materializeMetaData();
@@ -954,15 +979,25 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                void stop() {
-                       try {
-                               closeableRegistry.close();
-                       } catch (IOException e) {
-                               LOG.warn("Could not properly close io 
streams.", e);
+
+                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+                               try {
+                                       closeableRegistry.close();
+                               } catch (IOException e) {
+                                       LOG.warn("Could not properly close io 
streams.", e);
+                               }
                        }
                }
 
                void releaseResources(boolean canceled) {
-                       
stateBackend.cancelStreamRegistry.unregisterClosable(closeableRegistry);
+
+                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+                               try {
+                                       closeableRegistry.close();
+                               } catch (IOException e) {
+                                       LOG.warn("Exception on closing 
registry.", e);
+                               }
+                       }
 
                        if (backupPath != null) {
                                try {
@@ -1128,13 +1163,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throws IOException, StateMigrationException, 
RocksDBException {
                        try {
                                currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
+                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
                                currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
                                restoreKVStateMetaData();
                                restoreKVStateData();
                        } finally {
-                               if (currentStateHandleInStream != null) {
-                                       
rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
+                               if (currentStateHandleInStream != null
+                                       && 
rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
                                        
IOUtils.closeQuietly(currentStateHandleInStream);
                                }
                        }
@@ -1275,7 +1310,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        try {
                                inputStream = metaStateHandle.openInputStream();
-                               
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
                                KeyedBackendSerializationProxy<T> 
serializationProxy =
                                        new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
@@ -1298,8 +1333,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
-                               if (inputStream != null) {
-                                       
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+                               if (inputStream != null && 
stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                                        inputStream.close();
                                }
                        }
@@ -1316,10 +1350,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        try {
                                inputStream = 
remoteFileHandle.openInputStream();
-                               
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
                                outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
-                               
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+                               
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
 
                                byte[] buffer = new byte[8 * 1024];
                                while (true) {
@@ -1331,13 +1365,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        outputStream.write(buffer, 0, numBytes);
                                }
                        } finally {
-                               if (inputStream != null) {
-                                       
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+                               if (inputStream != null && 
stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                                        inputStream.close();
                                }
 
-                               if (outputStream != null) {
-                                       
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+                               if (outputStream != null && 
stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
                                        outputStream.close();
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 29f363c..87d33d2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -49,7 +49,7 @@ public class CloseableRegistry extends 
AbstractCloseableRegistry<Closeable, Obje
        }
 
        @Override
-       protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull 
Map<Closeable, Object> closeableMap) {
-               closeableMap.remove(closeable);
+       protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull 
Map<Closeable, Object> closeableMap) {
+               return closeableMap.remove(closeable) != null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
index 7c97271..173a890 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -55,7 +55,7 @@ public class ClosingFSDataInputStream
        public void close() throws IOException {
                if (!closed) {
                        closed = true;
-                       registry.unregisterClosable(this);
+                       registry.unregisterCloseable(this);
                        inputStream.close();
                }
        }
@@ -93,7 +93,7 @@ public class ClosingFSDataInputStream
                        FSDataInputStream delegate, SafetyNetCloseableRegistry 
registry, String debugInfo) throws IOException{
 
                ClosingFSDataInputStream inputStream = new 
ClosingFSDataInputStream(delegate, registry, debugInfo);
-               registry.registerClosable(inputStream);
+               registry.registerCloseable(inputStream);
                return inputStream;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
index c517a83..cb7de92 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -60,7 +60,7 @@ public class ClosingFSDataOutputStream
        public void close() throws IOException {
                if (!closed) {
                        closed = true;
-                       registry.unregisterClosable(this);
+                       registry.unregisterCloseable(this);
                        outputStream.close();
                }
        }
@@ -98,7 +98,7 @@ public class ClosingFSDataOutputStream
                        FSDataOutputStream delegate, SafetyNetCloseableRegistry 
registry, String debugInfo) throws IOException {
 
                ClosingFSDataOutputStream inputStream = new 
ClosingFSDataOutputStream(delegate, registry, debugInfo);
-               registry.registerClosable(inputStream);
+               registry.registerCloseable(inputStream);
                return inputStream;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 6097334..9c4272f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -98,7 +98,7 @@ public class SafetyNetCloseableRegistry extends
        }
 
        @Override
-       protected void doUnRegister(
+       protected boolean doUnRegister(
                @Nonnull WrappingProxyCloseable<? extends Closeable> closeable,
                @Nonnull Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) {
 
@@ -106,11 +106,7 @@ public class SafetyNetCloseableRegistry extends
 
                Closeable innerCloseable = 
WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
 
-               if (null == innerCloseable) {
-                       return;
-               }
-
-               closeableMap.remove(innerCloseable);
+               return null != innerCloseable && 
closeableMap.remove(innerCloseable) != null;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 4527b5e..14e765c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -68,7 +68,7 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
         * @param closeable Closeable tor register
         * @throws IOException exception when the registry was closed before
         */
-       public final void registerClosable(C closeable) throws IOException {
+       public final void registerCloseable(C closeable) throws IOException {
 
                if (null == closeable) {
                        return;
@@ -89,15 +89,16 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
         * Removes a {@link Closeable} from the registry.
         *
         * @param closeable instance to remove from the registry.
+        * @return true if the closeable was previously registered and became 
unregistered through this call.
         */
-       public final void unregisterClosable(C closeable) {
+       public final boolean unregisterCloseable(C closeable) {
 
                if (null == closeable) {
-                       return;
+                       return false;
                }
 
                synchronized (getSynchronizationLock()) {
-                       doUnRegister(closeable, closeableToRef);
+                       return doUnRegister(closeable, closeableToRef);
                }
        }
 
@@ -137,7 +138,7 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
         * Does the actual un-registration of the closeable from the registry 
map. This should not do any long running or
         * potentially blocking operations as is is executed under the 
registry's lock.
         */
-       protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull 
Map<Closeable, T> closeableMap);
+       protected abstract boolean doUnRegister(@Nonnull C closeable, @Nonnull 
Map<Closeable, T> closeableMap);
 
        /**
         * Returns the lock on which manipulations to members closeableToRef 
and closeable must be synchronized.

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index 41b69c8..f9425f3 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -91,7 +91,7 @@ public abstract class AbstractCloseableRegistryTest<C extends 
Closeable, T> {
 
                try {
 
-                       closeableRegistry.registerClosable(testCloseable);
+                       closeableRegistry.registerCloseable(testCloseable);
 
                        Assert.fail("Closed registry should not accept 
closeables!");
 
@@ -120,7 +120,7 @@ public abstract class AbstractCloseableRegistryTest<C 
extends Closeable, T> {
                        return null;
                }).when(spyCloseable).close();
 
-               closeableRegistry.registerClosable(spyCloseable);
+               closeableRegistry.registerCloseable(spyCloseable);
 
                Assert.assertEquals(1, 
closeableRegistry.getNumberOfRegisteredCloseables());
 
@@ -138,7 +138,7 @@ public abstract class AbstractCloseableRegistryTest<C 
extends Closeable, T> {
                final C testCloseable = spy(createCloseable());
 
                try {
-                       closeableRegistry.registerClosable(testCloseable);
+                       closeableRegistry.registerCloseable(testCloseable);
                        Assert.fail("Closed registry should not accept 
closeables!");
                }catch (IOException ignore) {
                }
@@ -214,10 +214,7 @@ public abstract class AbstractCloseableRegistryTest<C 
extends Closeable, T> {
 
                @Override
                public synchronized void close() throws IOException {
-                       if (refCount != null) {
-                               refCount.decrementAndGet();
-                               refCount = null;
-                       }
+                       refCount.decrementAndGet();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java 
b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index eb8d1f4..c3bf6e6 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -52,7 +52,7 @@ public class CloseableRegistryTest extends 
AbstractCloseableRegistryTest<Closeab
                        @Override
                        protected void createAndRegisterStream() throws 
IOException {
                                TestStream testStream = new 
TestStream(unclosedCounter);
-                               registry.registerClosable(testStream);
+                               registry.registerCloseable(testStream);
                        }
                };
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
deleted file mode 100644
index 1aaa473..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Abstract base class for async IO operations of snapshots against a
- * {@link java.util.zip.CheckedOutputStream}. This includes participating in 
lifecycle management
- * through a {@link CloseableRegistry}.
- */
-public abstract class AbstractAsyncSnapshotIOCallable<H extends StateObject>
-       extends AbstractAsyncIOCallable<H, 
CheckpointStreamFactory.CheckpointStateOutputStream> {
-
-       protected final  long checkpointId;
-       protected final  long timestamp;
-
-       protected final CheckpointStreamFactory streamFactory;
-       protected final CloseableRegistry closeStreamOnCancelRegistry;
-       protected final AtomicBoolean open;
-
-       public AbstractAsyncSnapshotIOCallable(
-               long checkpointId,
-               long timestamp,
-               CheckpointStreamFactory streamFactory,
-               CloseableRegistry closeStreamOnCancelRegistry) {
-
-               this.streamFactory = Preconditions.checkNotNull(streamFactory);
-               this.closeStreamOnCancelRegistry = 
Preconditions.checkNotNull(closeStreamOnCancelRegistry);
-               this.checkpointId = checkpointId;
-               this.timestamp = timestamp;
-               this.open = new AtomicBoolean(false);
-       }
-
-       @Override
-       public CheckpointStreamFactory.CheckpointStateOutputStream 
openIOHandle() throws Exception {
-               if (checkStreamClosedAndDoTransitionToOpen()) {
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream =
-                               
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-                       try {
-                               
closeStreamOnCancelRegistry.registerClosable(stream);
-                               return stream;
-                       } catch (Exception ex) {
-                               open.set(false);
-                               throw ex;
-                       }
-               } else {
-                       throw new IOException("Async snapshot: a checkpoint 
stream was already opened.");
-               }
-       }
-
-       @Override
-       public void done(boolean canceled) {
-               if (checkStreamOpenAndDoTransitionToClose()) {
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream = getIoHandle();
-                       if (stream != null) {
-                               
closeStreamOnCancelRegistry.unregisterClosable(stream);
-                               IOUtils.closeQuietly(stream);
-                       }
-               }
-       }
-
-       protected boolean checkStreamClosedAndDoTransitionToOpen() {
-               return open.compareAndSet(false, true);
-       }
-
-       protected boolean checkStreamOpenAndDoTransitionToClose() {
-               return open.compareAndSet(true, false);
-       }
-
-       protected StreamStateHandle closeStreamAndGetStateHandle() throws 
IOException {
-               if (checkStreamOpenAndDoTransitionToClose()) {
-                       final 
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-                       try {
-                               return stream.closeAndGetHandle();
-                       } finally {
-                               
closeStreamOnCancelRegistry.unregisterClosable(stream);
-                       }
-               } else {
-                       throw new IOException("Checkpoint stream already 
closed.");
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
new file mode 100644
index 0000000..bc0116c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+
+/**
+ * This abstract class encapsulates the lifecycle and execution strategy for 
asynchronous operations that use resources.
+ *
+ * @param <V> return type of the asynchronous call.
+ */
+public abstract class AbstractAsyncCallableWithResources<V> implements 
StoppableCallbackCallable<V> {
+
+       /** Tracks if the stop method was called on this object. */
+       private volatile boolean stopped;
+
+       /** Tracks if call method was executed (only before stop calls). */
+       private volatile boolean called;
+
+       /** Stores a collected exception if there was one during stop. */
+       private volatile Exception stopException;
+
+       public AbstractAsyncCallableWithResources() {
+               this.stopped = false;
+               this.called = false;
+       }
+
+       /**
+        * This method implements the strategy for the actual IO operation:
+        * <p>
+        * 1) Acquire resources asynchronously and atomically w.r.t stopping.
+        * 2) Performs the operation
+        * 3) Releases resources.
+        *
+        * @return Result of the IO operation, e.g. a deserialized object.
+        * @throws Exception exception that happened during the call.
+        */
+       @Override
+       public final V call() throws Exception {
+
+               V result = null;
+               Exception collectedException = null;
+
+               try {
+                       synchronized (this) {
+
+                               if (stopped) {
+                                       throw new IOException("Task was already 
stopped.");
+                               }
+
+                               called = true;
+                               // Get resources in async part, atomically 
w.r.t. stopping.
+                               acquireResources();
+                       }
+
+                       // The main work is performed here.
+                       result = performOperation();
+
+               } catch (Exception ex) {
+                       collectedException = ex;
+               } finally {
+
+                       try {
+                               // Cleanup
+                               releaseResources();
+                       } catch (Exception relEx) {
+                               collectedException = 
ExceptionUtils.firstOrSuppressed(relEx, collectedException);
+                       }
+
+                       if (collectedException != null) {
+                               throw collectedException;
+                       }
+               }
+
+               return result;
+       }
+
+       /**
+        * Open the IO Handle (e.g. a stream) on which the operation will be 
performed.
+        *
+        * @return the opened IO handle that implements #Closeable
+        * @throws Exception if there was a problem in acquiring.
+        */
+       protected abstract void acquireResources() throws Exception;
+
+       /**
+        * Implements the actual operation.
+        *
+        * @return Result of the operation
+        * @throws Exception if there was a problem in executing the operation.
+        */
+       protected abstract V performOperation() throws Exception;
+
+       /**
+        * Releases resources acquired by this object.
+        *
+        * @throws Exception if there was a problem in releasing resources.
+        */
+       protected abstract void releaseResources() throws Exception;
+
+       /**
+        * This method implements how the operation is stopped. Usually this 
involves interrupting or closing some
+        * resources like streams to return from blocking calls.
+        *
+        * @throws Exception on problems during the stopping.
+        */
+       protected abstract void stopOperation() throws Exception;
+
+       /**
+        * Stops the I/O operation by closing the I/O handle. If an exception 
is thrown on close, it can be accessed via
+        * #getStopException().
+        */
+       @Override
+       public final void stop() {
+
+               synchronized (this) {
+
+                       // Make sure that call can not enter execution from 
here.
+                       if (stopped) {
+                               return;
+                       } else {
+                               stopped = true;
+                       }
+               }
+
+               if (called) {
+                       // Async call is executing -> attempt to stop it and 
releaseResources() will happen inside the async method.
+                       try {
+                               stopOperation();
+                       } catch (Exception stpEx) {
+                               this.stopException = stpEx;
+                       }
+               } else {
+                       // Async call was not executed, so we also need to 
releaseResources() here.
+                       try {
+                               releaseResources();
+                       } catch (Exception relEx) {
+                               stopException = relEx;
+                       }
+               }
+       }
+
+       /**
+        * Optional callback that subclasses can implement. This is called when 
the callable method completed, e.g. because
+        * it finished or was stopped.
+        */
+       @Override
+       public void done(boolean canceled) {
+               //optional callback hook
+       }
+
+       /**
+        * True once the async method was called.
+        */
+       public boolean isCalled() {
+               return called;
+       }
+
+       /**
+        * Check if the IO operation is stopped
+        *
+        * @return true if stop() was called
+        */
+       @Override
+       public boolean isStopped() {
+               return stopped;
+       }
+
+       /**
+        * Returns a potential exception that might have been observed while 
stopping the operation.
+        */
+       @Override
+       public Exception getStopException() {
+               return stopException;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
deleted file mode 100644
index 1968d40..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.async;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * The abstract class encapsulates the lifecycle and execution strategy for 
asynchronous IO operations
- *
- * @param <V> return type of the asynchronous call
- * @param <D> type of the IO handle
- */
-public abstract class AbstractAsyncIOCallable<V, D extends Closeable> 
implements StoppableCallbackCallable<V> {
-
-       private volatile boolean stopped;
-
-       /**
-        * Closable handle to IO, e.g. an InputStream
-        */
-       private volatile D ioHandle;
-
-       /**
-        * Stores exception that might happen during close
-        */
-       private volatile IOException stopException;
-
-
-       public AbstractAsyncIOCallable() {
-               this.stopped = false;
-       }
-
-       /**
-        * This method implements the strategy for the actual IO operation:
-        *
-        * 1) Open the IO handle
-        * 2) Perform IO operation
-        * 3) Close IO handle
-        *
-        * @return Result of the IO operation, e.g. a deserialized object.
-        * @throws Exception exception that happened during the call.
-        */
-       @Override
-       public V call() throws Exception {
-
-               synchronized (this) {
-                       if (isStopped()) {
-                               throw new IOException("Task was already 
stopped. No I/O handle opened.");
-                       }
-
-                       ioHandle = openIOHandle();
-               }
-
-               try {
-
-                       return performOperation();
-
-               } finally {
-                       closeIOHandle();
-               }
-
-       }
-
-       /**
-        * Open the IO Handle (e.g. a stream) on which the operation will be 
performed.
-        *
-        * @return the opened IO handle that implements #Closeable
-        * @throws Exception
-        */
-       protected abstract D openIOHandle() throws Exception;
-
-       /**
-        * Implements the actual IO operation on the opened IO handle.
-        *
-        * @return Result of the IO operation
-        * @throws Exception
-        */
-       protected abstract V performOperation() throws Exception;
-
-       /**
-        * Stops the I/O operation by closing the I/O handle. If an exception 
is thrown on close, it can be accessed via
-        * #getStopException().
-        */
-       @Override
-       public void stop() {
-               closeIOHandle();
-       }
-
-       private synchronized void closeIOHandle() {
-
-               if (!stopped) {
-                       stopped = true;
-
-                       final D handle = ioHandle;
-                       if (handle != null) {
-                               try {
-                                       handle.close();
-                               } catch (IOException ex) {
-                                       stopException = ex;
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Returns the IO handle.
-        * @return the IO handle
-        */
-       protected D getIoHandle() {
-               return ioHandle;
-       }
-
-       /**
-        * Optional callback that subclasses can implement. This is called when 
the callable method completed, e.g. because
-        * it finished or was stopped.
-        */
-       @Override
-       public void done(boolean canceled) {
-               //optional callback hook
-       }
-
-       /**
-        * Check if the IO operation is stopped
-        *
-        * @return true if stop() was called
-        */
-       @Override
-       public boolean isStopped() {
-               return stopped;
-       }
-
-       /**
-        * Returns Exception that might happen on stop.
-        *
-        * @return Potential Exception that happened open stopping.
-        */
-       @Override
-       public IOException getStopException() {
-               return stopException;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
index 560e56a..8698600 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.async;
 
-import java.io.IOException;
-
 /**
  * An asynchronous operation that can be stopped.
  */
@@ -42,6 +40,6 @@ public interface AsyncStoppable {
         *
         * @return Exception that can happen during stop
         */
-       IOException getStopException();
+       Exception getStopException();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index b16ac06..1fb03d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
@@ -34,11 +33,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -225,17 +226,37 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                }
 
                // implementation of the async IO operation, based on FutureTask
-               final AbstractAsyncSnapshotIOCallable<OperatorStateHandle> 
ioCallable =
-                       new 
AbstractAsyncSnapshotIOCallable<OperatorStateHandle>(
-                               checkpointId,
-                               timestamp,
-                               streamFactory,
-                               closeStreamOnCancelRegistry) {
+               final AbstractAsyncCallableWithResources<OperatorStateHandle> 
ioCallable =
+                       new 
AbstractAsyncCallableWithResources<OperatorStateHandle>() {
+
+                               
CheckpointStreamFactory.CheckpointStateOutputStream out = null;
+
+                               @Override
+                               protected void acquireResources() throws 
Exception {
+                                       out = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+                                       
closeStreamOnCancelRegistry.registerCloseable(out);
+                               }
+
+                               @Override
+                               protected void releaseResources() throws 
Exception {
+                                       if 
(closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+                                               IOUtils.closeQuietly(out);
+                                       }
+                               }
+
+                               @Override
+                               protected void stopOperation() throws Exception 
{
+                                       if 
(closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+                                               IOUtils.closeQuietly(out);
+                                       }
+                               }
 
                                @Override
                                public OperatorStateHandle performOperation() 
throws Exception {
                                        long asyncStartTime = 
System.currentTimeMillis();
 
+                                       
CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
+
                                        final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                                                new 
HashMap<>(registeredStatesDeepCopies.size());
 
@@ -246,8 +267,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                
metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                                        }
 
-                                       
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
-                                       DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
+                                       DataOutputView dov = new 
DataOutputViewStreamWrapper(localOut);
 
                                        OperatorBackendSerializationProxy 
backendSerializationProxy =
                                                new 
OperatorBackendSerializationProxy(metaInfoSnapshots);
@@ -260,25 +280,30 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                
registeredStatesDeepCopies.entrySet()) {
 
                                                PartitionableListState<?> value 
= entry.getValue();
-                                               long[] partitionOffsets = 
value.write(out);
+                                               long[] partitionOffsets = 
value.write(localOut);
                                                OperatorStateHandle.Mode mode = 
value.getStateMetaInfo().getAssignmentMode();
                                                writtenStatesMetaData.put(
                                                        entry.getKey(),
                                                        new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                                        }
 
-                                       StreamStateHandle stateHandle = 
closeStreamAndGetStateHandle();
+                                       OperatorStateHandle retValue = null;
+
+                                       if 
(closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+
+                                               StreamStateHandle stateHandle = 
out.closeAndGetHandle();
+
+                                               if (stateHandle != null) {
+                                                       retValue = new 
OperatorStateHandle(writtenStatesMetaData, stateHandle);
+                                               }
+                                       }
 
                                        if (asynchronousSnapshots) {
                                                
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in 
thread {} took {} ms.",
                                                        streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
                                        }
 
-                                       if (stateHandle == null) {
-                                               return null;
-                                       }
-
-                                       return new 
OperatorStateHandle(writtenStatesMetaData, stateHandle);
+                                       return retValue;
                                }
                        };
 
@@ -308,7 +333,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        }
 
                        FSDataInputStream in = stateHandle.openInputStream();
-                       closeStreamOnCancelRegistry.registerClosable(in);
+                       closeStreamOnCancelRegistry.registerCloseable(in);
 
                        ClassLoader restoreClassLoader = 
Thread.currentThread().getContextClassLoader();
 
@@ -370,8 +395,9 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                        } finally {
                                
Thread.currentThread().setContextClassLoader(restoreClassLoader);
-                               
closeStreamOnCancelRegistry.unregisterClosable(in);
-                               IOUtils.closeQuietly(in);
+                               if 
(closeStreamOnCancelRegistry.unregisterCloseable(in)) {
+                                       IOUtils.closeQuietly(in);
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 031d7c7..750d206 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -254,9 +254,10 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                                                this.offsets = metaOffsets;
                                                this.offPos = 0;
 
-                                               
closableRegistry.unregisterClosable(currentStream);
-                                               
IOUtils.closeQuietly(currentStream);
-                                               currentStream = null;
+                                               if 
(closableRegistry.unregisterCloseable(currentStream)) {
+                                                       
IOUtils.closeQuietly(currentStream);
+                                                       currentStream = null;
+                                               }
 
                                                return true;
                                        }
@@ -308,14 +309,18 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                }
 
                protected void openCurrentStream() throws IOException {
+
+                       Preconditions.checkState(currentStream == null);
+
                        FSDataInputStream stream = 
currentStateHandle.openInputStream();
-                       closableRegistry.registerClosable(stream);
+                       closableRegistry.registerCloseable(stream);
                        currentStream = stream;
                }
 
                protected void closeCurrentStream() {
-                       closableRegistry.unregisterClosable(currentStream);
-                       IOUtils.closeQuietly(currentStream);
+                       if 
(closableRegistry.unregisterCloseable(currentStream)) {
+                               IOUtils.closeQuietly(currentStream);
+                       }
                        currentStream = null;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index 5db0138..6a8a08f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -88,7 +88,7 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
                CheckpointStreamFactory.CheckpointStateOutputStream cout =
                                
streamFactory.createCheckpointStateOutputStream(checkpointId, 
checkpointTimestamp);
 
-               closableRegistry.registerClosable(cout);
+               closableRegistry.registerCloseable(cout);
                return cout;
        }
 
@@ -120,22 +120,25 @@ public class StateSnapshotContextSynchronousImpl 
implements StateSnapshotContext
        }
 
        private <T extends StreamStateHandle> T 
closeAndUnregisterStreamToObtainStateHandle(
-                       NonClosingCheckpointOutputStream<T> stream) throws 
IOException {
-               if (null == stream) {
+               NonClosingCheckpointOutputStream<T> stream) throws IOException {
+
+               if (null != stream && 
closableRegistry.unregisterCloseable(stream.getDelegate())) {
+                       return stream.closeAndGetHandle();
+               } else {
                        return null;
                }
-
-               closableRegistry.unregisterClosable(stream.getDelegate());
-
-               // for now we only support synchronous writing
-               return stream.closeAndGetHandle();
        }
 
-       private <T extends StreamStateHandle> void 
closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws 
IOException {
+       private <T extends StreamStateHandle> void closeAndUnregisterStream(
+               NonClosingCheckpointOutputStream<T> stream) throws IOException {
+
                Preconditions.checkNotNull(stream);
 
-               closableRegistry.unregisterClosable(stream.getDelegate());
-               stream.getDelegate().close();
+               CheckpointStreamFactory.CheckpointStateOutputStream delegate = 
stream.getDelegate();
+
+               if (closableRegistry.unregisterCloseable(delegate)) {
+                       delegate.close();
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index e235b96..bf92b34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -325,30 +325,56 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                //--------------------------------------------------- this 
becomes the end of sync part
 
                // implementation of the async IO operation, based on FutureTask
-               final AbstractAsyncSnapshotIOCallable<KeyedStateHandle> 
ioCallable =
-                       new AbstractAsyncSnapshotIOCallable<KeyedStateHandle>(
-                               checkpointId,
-                               timestamp,
-                               streamFactory,
-                               cancelStreamRegistry) {
+               final AbstractAsyncCallableWithResources<KeyedStateHandle> 
ioCallable =
+                       new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
+
+                               
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+                               @Override
+                               protected void acquireResources() throws 
Exception {
+                                       stream = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+                                       
cancelStreamRegistry.registerCloseable(stream);
+                               }
+
+                               @Override
+                               protected void releaseResources() throws 
Exception {
+
+                                       if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
+                                               IOUtils.closeQuietly(stream);
+                                               stream = null;
+                                       }
+
+                                       for (StateTableSnapshot tableSnapshot : 
cowStateStableSnapshots.values()) {
+                                               tableSnapshot.release();
+                                       }
+                               }
+
+                               @Override
+                               protected void stopOperation() throws Exception 
{
+                                       if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
+                                               IOUtils.closeQuietly(stream);
+                                               stream = null;
+                                       }
+                               }
 
                                @Override
                                public KeyGroupsStateHandle performOperation() 
throws Exception {
                                        long asyncStartTime = 
System.currentTimeMillis();
 
-                                       
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-                                       DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(stream);
+                                       
CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
+
+                                       DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(localStream);
                                        serializationProxy.write(outView);
 
                                        long[] keyGroupRangeOffsets = new 
long[keyGroupRange.getNumberOfKeyGroups()];
 
                                        for (int keyGroupPos = 0; keyGroupPos < 
keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
                                                int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
-                                               
keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+                                               
keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
                                                outView.writeInt(keyGroupId);
 
                                                for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-                                                       OutputStream 
kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream);
+                                                       OutputStream 
kgCompressionOut = 
keyGroupCompressionDecorator.decorateWithCompression(localStream);
                                                        
DataOutputViewStreamWrapper kgCompressionView = new 
DataOutputViewStreamWrapper(kgCompressionOut);
                                                        
kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
                                                        
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
 keyGroupId);
@@ -356,21 +382,29 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                }
                                        }
 
-                                       final StreamStateHandle 
streamStateHandle = closeStreamAndGetStateHandle();
+                                       if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
 
-                                       if (asynchronousSnapshots) {
-                                               LOG.info("Heap backend snapshot 
({}, asynchronous part) in thread {} took {} ms.",
-                                                       streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
-                                       }
+                                               final StreamStateHandle 
streamStateHandle = stream.closeAndGetHandle();
+                                               stream = null;
 
-                                       if (streamStateHandle == null) {
-                                               return null;
-                                       }
+                                               if (asynchronousSnapshots) {
+                                                       LOG.info("Heap backend 
snapshot ({}, asynchronous part) in thread {} took {} ms.",
+                                                               streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+                                               }
 
-                                       KeyGroupRangeOffsets offsets = new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
-                                       final KeyGroupsStateHandle 
keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+                                               if (streamStateHandle != null) {
 
-                                       return keyGroupsStateHandle;
+                                                       KeyGroupRangeOffsets 
offsets =
+                                                               new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+
+                                                       final 
KeyGroupsStateHandle keyGroupsStateHandle =
+                                                               new 
KeyGroupsStateHandle(offsets, streamStateHandle);
+
+                                                       return 
keyGroupsStateHandle;
+                                               }
+                                       }
+
+                                       return null;
                                }
                        };
 
@@ -425,7 +459,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        KeyGroupsStateHandle keyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
                        FSDataInputStream fsDataInputStream = 
keyGroupsStateHandle.openInputStream();
-                       
cancelStreamRegistry.registerClosable(fsDataInputStream);
+                       
cancelStreamRegistry.registerCloseable(fsDataInputStream);
 
                        try {
                                DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
@@ -533,8 +567,9 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        }
                                }
                        } finally {
-                               
cancelStreamRegistry.unregisterClosable(fsDataInputStream);
-                               IOUtils.closeQuietly(fsDataInputStream);
+                               if 
(cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) {
+                                       IOUtils.closeQuietly(fsDataInputStream);
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6089240..631cdfc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -709,7 +709,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                OperatorStateBackend operatorStateBackend = 
stateBackend.createOperatorStateBackend(env, opId);
 
                // let operator state backend participate in the operator 
lifecycle, i.e. make it responsive to cancelation
-               cancelables.registerClosable(operatorStateBackend);
+               cancelables.registerCloseable(operatorStateBackend);
 
                // restore if we have some old state
                if (null != restoreStateHandles) {
@@ -742,7 +742,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                getEnvironment().getTaskKvStateRegistry());
 
                // let keyed state backend participate in the operator 
lifecycle, i.e. make it responsive to cancelation
-               cancelables.registerClosable(keyedStateBackend);
+               cancelables.registerCloseable(keyedStateBackend);
 
                // restore if we have some old state
                Collection<KeyedStateHandle> restoreKeyedStateHandles = null;
@@ -933,7 +933,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
                        } finally {
-                               owner.cancelables.unregisterClosable(this);
+                               owner.cancelables.unregisterCloseable(this);
                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                        }
                }
@@ -1086,7 +1086,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        checkpointMetrics,
                                        startAsyncPartNano);
 
-                       
owner.cancelables.registerClosable(asyncCheckpointRunnable);
+                       
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                        
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9bb91ad..811d700 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1117,7 +1117,7 @@ public class StreamTaskTest extends TestLogger {
                        holder.start();
                        try {
                                // cancellation should try and cancel this
-                               getCancelables().registerClosable(holder);
+                               getCancelables().registerCloseable(holder);
 
                                // wait till the lock holder has the lock
                                latch.await();

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 9156f34..793e8f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -220,7 +220,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                                                environment,
                                                
operator.getClass().getSimpleName());
 
-                                       
mockTask.getCancelables().registerClosable(osb);
+                                       
mockTask.getCancelables().registerCloseable(osb);
 
                                        if (null != stateHandles) {
                                                osb.restore(stateHandles);

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 4d5fa71..829ac93 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -180,7 +180,8 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        }
                        case ROCKSDB_FULLY_ASYNC: {
                                String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
-                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+                               String backups = 
tempFolder.newFolder().getAbsolutePath();
+                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(new FsStateBackend("file://" + backups));
                                rdb.setDbStoragePath(rocksDb);
                                this.stateBackend = rdb;
                                break;

Reply via email to