[FLINK-7619] Improved abstraction of AbstractAsyncIOCallable to better fit the current usage pattern.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5af463a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5af463a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5af463a9 Branch: refs/heads/master Commit: 5af463a9c0ff62603bc342a78dfd5483d834e8a7 Parents: 0073204 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Thu Sep 7 11:24:12 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Sep 25 16:04:15 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 198 +++++++++++-------- .../apache/flink/core/fs/CloseableRegistry.java | 4 +- .../flink/core/fs/ClosingFSDataInputStream.java | 4 +- .../core/fs/ClosingFSDataOutputStream.java | 4 +- .../core/fs/SafetyNetCloseableRegistry.java | 8 +- .../flink/util/AbstractCloseableRegistry.java | 11 +- .../core/fs/AbstractCloseableRegistryTest.java | 11 +- .../flink/core/fs/CloseableRegistryTest.java | 2 +- .../AbstractAsyncSnapshotIOCallable.java | 109 ---------- .../AbstractAsyncCallableWithResources.java | 194 ++++++++++++++++++ .../io/async/AbstractAsyncIOCallable.java | 157 --------------- .../flink/runtime/io/async/AsyncStoppable.java | 4 +- .../state/DefaultOperatorStateBackend.java | 66 +++++-- .../state/StateInitializationContextImpl.java | 17 +- .../StateSnapshotContextSynchronousImpl.java | 25 +-- .../state/heap/HeapKeyedStateBackend.java | 85 +++++--- .../streaming/runtime/tasks/StreamTask.java | 8 +- .../streaming/runtime/tasks/StreamTaskTest.java | 2 +- .../util/AbstractStreamOperatorTestHarness.java | 2 +- ...tractEventTimeWindowCheckpointingITCase.java | 3 +- 20 files changed, 468 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index dd5b852..a1500c7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -43,7 +43,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; +import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -384,8 +384,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final CheckpointStreamFactory streamFactory) throws Exception { long startTime = System.currentTimeMillis(); + final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); + + final RocksDBFullSnapshotOperation<K> snapshotOperation; - final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory); // hold the db lock while operation on the db to guard us against async db disposal synchronized (asyncSnapshotLock) { @@ -399,6 +401,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return DoneFuture.nullValue(); } + snapshotOperation = + new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); + snapshotOperation.takeDBSnapShot(checkpointId, timestamp); } else { throw new IOException("RocksDB closed."); @@ -406,30 +411,55 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } // implementation of the async IO operation, based on FutureTask - AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable = - new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() { + AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = + new AbstractAsyncCallableWithResources<KeyedStateHandle>() { @Override - public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + protected void acquireResources() throws Exception { + cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); snapshotOperation.openCheckpointStream(); - return snapshotOperation.getOutStream(); } @Override - public KeyGroupsStateHandle performOperation() throws Exception { - long startTime = System.currentTimeMillis(); + protected void releaseResources() throws Exception { + closeLocalRegistry(); + releaseSnapshotOperationResources(); + } + + private void releaseSnapshotOperationResources() { + // hold the db lock while operation on the db to guard us against async db disposal synchronized (asyncSnapshotLock) { + snapshotOperation.releaseSnapshotResources(); + } + } + + @Override + protected void stopOperation() throws Exception { + closeLocalRegistry(); + } + + private void closeLocalRegistry() { + if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { try { - // hold the db lock while operation on the db to guard us against async db disposal - if (db == null) { - throw new IOException("RocksDB closed."); - } + snapshotCloseableRegistry.close(); + } catch (Exception ex) { + LOG.warn("Error closing local registry", ex); + } + } + } - snapshotOperation.writeDBSnapshot(); + @Override + public KeyGroupsStateHandle performOperation() throws Exception { + long startTime = System.currentTimeMillis(); - } finally { - snapshotOperation.closeCheckpointStream(); + synchronized (asyncSnapshotLock) { + // hold the db lock while operation on the db to guard us against async db disposal + if (db == null) { + throw new IOException("RocksDB closed."); } + + snapshotOperation.writeDBSnapshot(); + snapshotOperation.createSnapshotResultStateHandleFromOutputStream(); } LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", @@ -437,18 +467,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return snapshotOperation.getSnapshotResultStateHandle(); } - - private void releaseSnapshotOperationResources(boolean canceled) { - // hold the db lock while operation on the db to guard us against async db disposal - synchronized (asyncSnapshotLock) { - snapshotOperation.releaseSnapshotResources(canceled); - } - } - - @Override - public void done(boolean canceled) { - releaseSnapshotOperationResources(canceled); - } }; LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " + @@ -468,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final RocksDBKeyedStateBackend<K> stateBackend; private final KeyGroupRangeOffsets keyGroupRangeOffsets; private final CheckpointStreamFactory checkpointStreamFactory; + private final CloseableRegistry snapshotCloseableRegistry; private long checkpointId; private long checkpointTimeStamp; @@ -482,11 +501,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RocksDBFullSnapshotOperation( RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory) { + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry registry) { this.stateBackend = stateBackend; this.checkpointStreamFactory = checkpointStreamFactory; this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); + this.snapshotCloseableRegistry = registry; } /** @@ -510,9 +531,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ public void openCheckpointStream() throws Exception { Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); - outStream = checkpointStreamFactory. - createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); - stateBackend.cancelStreamRegistry.registerClosable(outStream); + outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); + snapshotCloseableRegistry.registerCloseable(outStream); outputView = new DataOutputViewStreamWrapper(outStream); } @@ -537,18 +557,25 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * * @throws IOException */ - public void closeCheckpointStream() throws IOException { - if (outStream != null) { - snapshotResultStateHandle = closeSnapshotStreamAndGetHandle(); - } else { - snapshotResultStateHandle = null; + public void createSnapshotResultStateHandleFromOutputStream() throws IOException { + + if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { + + StreamStateHandle stateHandle = outStream.closeAndGetHandle(); + outStream = null; + + if (stateHandle != null) { + this.snapshotResultStateHandle = new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } } } /** * 5) Release the snapshot object for RocksDB and clean up. */ - public void releaseSnapshotResources(boolean canceled) { + public void releaseSnapshotResources() { + + outStream = null; if (null != kvStateIterators) { for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { @@ -569,12 +596,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { IOUtils.closeQuietly(readOptions); readOptions = null; } + } - if (canceled) { + /** + * Drop the created snapshot if we have ben cancelled. + */ + public void dropSnapshotResult() { + if (null != snapshotResultStateHandle) { try { - if (null != snapshotResultStateHandle) { - snapshotResultStateHandle.discardState(); - } + snapshotResultStateHandle.discardState(); } catch (Exception e) { LOG.warn("Exception occurred during snapshot state handle cleanup.", e); } @@ -719,13 +749,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException { - stateBackend.cancelStreamRegistry.unregisterClosable(outStream); - StreamStateHandle stateHandle = outStream.closeAndGetHandle(); - outStream = null; - return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null; - } - private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); @@ -805,11 +828,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { FileSystem backupFileSystem = backupPath.getFileSystem(); inputStream = backupFileSystem.open(filePath); - closeableRegistry.registerClosable(inputStream); + closeableRegistry.registerCloseable(inputStream); outputStream = checkpointStreamFactory .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); - closeableRegistry.registerClosable(outputStream); + closeableRegistry.registerCloseable(outputStream); while (true) { int numBytes = inputStream.read(buffer); @@ -821,19 +844,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { outputStream.write(buffer, 0, numBytes); } - closeableRegistry.unregisterClosable(outputStream); - StreamStateHandle result = outputStream.closeAndGetHandle(); - outputStream = null; - + StreamStateHandle result = null; + if (closeableRegistry.unregisterCloseable(outputStream)) { + result = outputStream.closeAndGetHandle(); + outputStream = null; + } return result; + } finally { - if (inputStream != null) { - closeableRegistry.unregisterClosable(inputStream); + if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } - if (outputStream != null) { - closeableRegistry.unregisterClosable(outputStream); + if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) { outputStream.close(); } } @@ -845,7 +868,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { try { outputStream = checkpointStreamFactory .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); - closeableRegistry.registerClosable(outputStream); + closeableRegistry.registerCloseable(outputStream); //no need for compression scheme support because sst-files are already compressed KeyedBackendSerializationProxy<K> serializationProxy = @@ -858,15 +881,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.write(out); - closeableRegistry.unregisterClosable(outputStream); - StreamStateHandle result = outputStream.closeAndGetHandle(); - outputStream = null; - + StreamStateHandle result = null; + if (closeableRegistry.unregisterCloseable(outputStream)) { + result = outputStream.closeAndGetHandle(); + outputStream = null; + } return result; } finally { if (outputStream != null) { - closeableRegistry.unregisterClosable(outputStream); - outputStream.close(); + if (closeableRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } } } } @@ -905,7 +930,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyedStateHandle materializeSnapshot() throws Exception { - stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); + stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); // write meta data metaStateHandle = materializeMetaData(); @@ -954,15 +979,25 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } void stop() { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Could not properly close io streams.", e); + + if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Could not properly close io streams.", e); + } } } void releaseResources(boolean canceled) { - stateBackend.cancelStreamRegistry.unregisterClosable(closeableRegistry); + + if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Exception on closing registry.", e); + } + } if (backupPath != null) { try { @@ -1128,13 +1163,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throws IOException, StateMigrationException, RocksDBException { try { currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); - rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); restoreKVStateMetaData(); restoreKVStateData(); } finally { - if (currentStateHandleInStream != null) { - rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream); + if (currentStateHandleInStream != null + && rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { IOUtils.closeQuietly(currentStateHandleInStream); } } @@ -1275,7 +1310,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { try { inputStream = metaStateHandle.openInputStream(); - stateBackend.cancelStreamRegistry.registerClosable(inputStream); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); KeyedBackendSerializationProxy<T> serializationProxy = new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); @@ -1298,8 +1333,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return serializationProxy.getStateMetaInfoSnapshots(); } finally { - if (inputStream != null) { - stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } } @@ -1316,10 +1350,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { try { inputStream = remoteFileHandle.openInputStream(); - stateBackend.cancelStreamRegistry.registerClosable(inputStream); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); - stateBackend.cancelStreamRegistry.registerClosable(outputStream); + stateBackend.cancelStreamRegistry.registerCloseable(outputStream); byte[] buffer = new byte[8 * 1024]; while (true) { @@ -1331,13 +1365,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { outputStream.write(buffer, 0, numBytes); } } finally { - if (inputStream != null) { - stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } - if (outputStream != null) { - stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { outputStream.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java index 29f363c..87d33d2 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java @@ -49,7 +49,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje } @Override - protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) { - closeableMap.remove(closeable); + protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) { + return closeableMap.remove(closeable) != null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java index 7c97271..173a890 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java @@ -55,7 +55,7 @@ public class ClosingFSDataInputStream public void close() throws IOException { if (!closed) { closed = true; - registry.unregisterClosable(this); + registry.unregisterCloseable(this); inputStream.close(); } } @@ -93,7 +93,7 @@ public class ClosingFSDataInputStream FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{ ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo); - registry.registerClosable(inputStream); + registry.registerCloseable(inputStream); return inputStream; } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java index c517a83..cb7de92 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java @@ -60,7 +60,7 @@ public class ClosingFSDataOutputStream public void close() throws IOException { if (!closed) { closed = true; - registry.unregisterClosable(this); + registry.unregisterCloseable(this); outputStream.close(); } } @@ -98,7 +98,7 @@ public class ClosingFSDataOutputStream FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException { ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo); - registry.registerClosable(inputStream); + registry.registerCloseable(inputStream); return inputStream; } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java index 6097334..9c4272f 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java @@ -98,7 +98,7 @@ public class SafetyNetCloseableRegistry extends } @Override - protected void doUnRegister( + protected boolean doUnRegister( @Nonnull WrappingProxyCloseable<? extends Closeable> closeable, @Nonnull Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) { @@ -106,11 +106,7 @@ public class SafetyNetCloseableRegistry extends Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate()); - if (null == innerCloseable) { - return; - } - - closeableMap.remove(innerCloseable); + return null != innerCloseable && closeableMap.remove(innerCloseable) != null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index 4527b5e..14e765c 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -68,7 +68,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen * @param closeable Closeable tor register * @throws IOException exception when the registry was closed before */ - public final void registerClosable(C closeable) throws IOException { + public final void registerCloseable(C closeable) throws IOException { if (null == closeable) { return; @@ -89,15 +89,16 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen * Removes a {@link Closeable} from the registry. * * @param closeable instance to remove from the registry. + * @return true if the closeable was previously registered and became unregistered through this call. */ - public final void unregisterClosable(C closeable) { + public final boolean unregisterCloseable(C closeable) { if (null == closeable) { - return; + return false; } synchronized (getSynchronizationLock()) { - doUnRegister(closeable, closeableToRef); + return doUnRegister(closeable, closeableToRef); } } @@ -137,7 +138,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen * Does the actual un-registration of the closeable from the registry map. This should not do any long running or * potentially blocking operations as is is executed under the registry's lock. */ - protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T> closeableMap); + protected abstract boolean doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T> closeableMap); /** * Returns the lock on which manipulations to members closeableToRef and closeable must be synchronized. http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java index 41b69c8..f9425f3 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java @@ -91,7 +91,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> { try { - closeableRegistry.registerClosable(testCloseable); + closeableRegistry.registerCloseable(testCloseable); Assert.fail("Closed registry should not accept closeables!"); @@ -120,7 +120,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> { return null; }).when(spyCloseable).close(); - closeableRegistry.registerClosable(spyCloseable); + closeableRegistry.registerCloseable(spyCloseable); Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables()); @@ -138,7 +138,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> { final C testCloseable = spy(createCloseable()); try { - closeableRegistry.registerClosable(testCloseable); + closeableRegistry.registerCloseable(testCloseable); Assert.fail("Closed registry should not accept closeables!"); }catch (IOException ignore) { } @@ -214,10 +214,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> { @Override public synchronized void close() throws IOException { - if (refCount != null) { - refCount.decrementAndGet(); - refCount = null; - } + refCount.decrementAndGet(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java index eb8d1f4..c3bf6e6 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java @@ -52,7 +52,7 @@ public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeab @Override protected void createAndRegisterStream() throws IOException { TestStream testStream = new TestStream(unclosedCounter); - registry.registerClosable(testStream); + registry.registerCloseable(testStream); } }; } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java deleted file mode 100644 index 1aaa473..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import org.apache.commons.io.IOUtils; -import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.StateObject; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Abstract base class for async IO operations of snapshots against a - * {@link java.util.zip.CheckedOutputStream}. This includes participating in lifecycle management - * through a {@link CloseableRegistry}. - */ -public abstract class AbstractAsyncSnapshotIOCallable<H extends StateObject> - extends AbstractAsyncIOCallable<H, CheckpointStreamFactory.CheckpointStateOutputStream> { - - protected final long checkpointId; - protected final long timestamp; - - protected final CheckpointStreamFactory streamFactory; - protected final CloseableRegistry closeStreamOnCancelRegistry; - protected final AtomicBoolean open; - - public AbstractAsyncSnapshotIOCallable( - long checkpointId, - long timestamp, - CheckpointStreamFactory streamFactory, - CloseableRegistry closeStreamOnCancelRegistry) { - - this.streamFactory = Preconditions.checkNotNull(streamFactory); - this.closeStreamOnCancelRegistry = Preconditions.checkNotNull(closeStreamOnCancelRegistry); - this.checkpointId = checkpointId; - this.timestamp = timestamp; - this.open = new AtomicBoolean(false); - } - - @Override - public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { - if (checkStreamClosedAndDoTransitionToOpen()) { - CheckpointStreamFactory.CheckpointStateOutputStream stream = - streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); - try { - closeStreamOnCancelRegistry.registerClosable(stream); - return stream; - } catch (Exception ex) { - open.set(false); - throw ex; - } - } else { - throw new IOException("Async snapshot: a checkpoint stream was already opened."); - } - } - - @Override - public void done(boolean canceled) { - if (checkStreamOpenAndDoTransitionToClose()) { - CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); - if (stream != null) { - closeStreamOnCancelRegistry.unregisterClosable(stream); - IOUtils.closeQuietly(stream); - } - } - } - - protected boolean checkStreamClosedAndDoTransitionToOpen() { - return open.compareAndSet(false, true); - } - - protected boolean checkStreamOpenAndDoTransitionToClose() { - return open.compareAndSet(true, false); - } - - protected StreamStateHandle closeStreamAndGetStateHandle() throws IOException { - if (checkStreamOpenAndDoTransitionToClose()) { - final CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); - try { - return stream.closeAndGetHandle(); - } finally { - closeStreamOnCancelRegistry.unregisterClosable(stream); - } - } else { - throw new IOException("Checkpoint stream already closed."); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java new file mode 100644 index 0000000..bc0116c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.async; + +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; + +/** + * This abstract class encapsulates the lifecycle and execution strategy for asynchronous operations that use resources. + * + * @param <V> return type of the asynchronous call. + */ +public abstract class AbstractAsyncCallableWithResources<V> implements StoppableCallbackCallable<V> { + + /** Tracks if the stop method was called on this object. */ + private volatile boolean stopped; + + /** Tracks if call method was executed (only before stop calls). */ + private volatile boolean called; + + /** Stores a collected exception if there was one during stop. */ + private volatile Exception stopException; + + public AbstractAsyncCallableWithResources() { + this.stopped = false; + this.called = false; + } + + /** + * This method implements the strategy for the actual IO operation: + * <p> + * 1) Acquire resources asynchronously and atomically w.r.t stopping. + * 2) Performs the operation + * 3) Releases resources. + * + * @return Result of the IO operation, e.g. a deserialized object. + * @throws Exception exception that happened during the call. + */ + @Override + public final V call() throws Exception { + + V result = null; + Exception collectedException = null; + + try { + synchronized (this) { + + if (stopped) { + throw new IOException("Task was already stopped."); + } + + called = true; + // Get resources in async part, atomically w.r.t. stopping. + acquireResources(); + } + + // The main work is performed here. + result = performOperation(); + + } catch (Exception ex) { + collectedException = ex; + } finally { + + try { + // Cleanup + releaseResources(); + } catch (Exception relEx) { + collectedException = ExceptionUtils.firstOrSuppressed(relEx, collectedException); + } + + if (collectedException != null) { + throw collectedException; + } + } + + return result; + } + + /** + * Open the IO Handle (e.g. a stream) on which the operation will be performed. + * + * @return the opened IO handle that implements #Closeable + * @throws Exception if there was a problem in acquiring. + */ + protected abstract void acquireResources() throws Exception; + + /** + * Implements the actual operation. + * + * @return Result of the operation + * @throws Exception if there was a problem in executing the operation. + */ + protected abstract V performOperation() throws Exception; + + /** + * Releases resources acquired by this object. + * + * @throws Exception if there was a problem in releasing resources. + */ + protected abstract void releaseResources() throws Exception; + + /** + * This method implements how the operation is stopped. Usually this involves interrupting or closing some + * resources like streams to return from blocking calls. + * + * @throws Exception on problems during the stopping. + */ + protected abstract void stopOperation() throws Exception; + + /** + * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via + * #getStopException(). + */ + @Override + public final void stop() { + + synchronized (this) { + + // Make sure that call can not enter execution from here. + if (stopped) { + return; + } else { + stopped = true; + } + } + + if (called) { + // Async call is executing -> attempt to stop it and releaseResources() will happen inside the async method. + try { + stopOperation(); + } catch (Exception stpEx) { + this.stopException = stpEx; + } + } else { + // Async call was not executed, so we also need to releaseResources() here. + try { + releaseResources(); + } catch (Exception relEx) { + stopException = relEx; + } + } + } + + /** + * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because + * it finished or was stopped. + */ + @Override + public void done(boolean canceled) { + //optional callback hook + } + + /** + * True once the async method was called. + */ + public boolean isCalled() { + return called; + } + + /** + * Check if the IO operation is stopped + * + * @return true if stop() was called + */ + @Override + public boolean isStopped() { + return stopped; + } + + /** + * Returns a potential exception that might have been observed while stopping the operation. + */ + @Override + public Exception getStopException() { + return stopException; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java deleted file mode 100644 index 1968d40..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.async; - -import java.io.Closeable; -import java.io.IOException; - -/** - * The abstract class encapsulates the lifecycle and execution strategy for asynchronous IO operations - * - * @param <V> return type of the asynchronous call - * @param <D> type of the IO handle - */ -public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements StoppableCallbackCallable<V> { - - private volatile boolean stopped; - - /** - * Closable handle to IO, e.g. an InputStream - */ - private volatile D ioHandle; - - /** - * Stores exception that might happen during close - */ - private volatile IOException stopException; - - - public AbstractAsyncIOCallable() { - this.stopped = false; - } - - /** - * This method implements the strategy for the actual IO operation: - * - * 1) Open the IO handle - * 2) Perform IO operation - * 3) Close IO handle - * - * @return Result of the IO operation, e.g. a deserialized object. - * @throws Exception exception that happened during the call. - */ - @Override - public V call() throws Exception { - - synchronized (this) { - if (isStopped()) { - throw new IOException("Task was already stopped. No I/O handle opened."); - } - - ioHandle = openIOHandle(); - } - - try { - - return performOperation(); - - } finally { - closeIOHandle(); - } - - } - - /** - * Open the IO Handle (e.g. a stream) on which the operation will be performed. - * - * @return the opened IO handle that implements #Closeable - * @throws Exception - */ - protected abstract D openIOHandle() throws Exception; - - /** - * Implements the actual IO operation on the opened IO handle. - * - * @return Result of the IO operation - * @throws Exception - */ - protected abstract V performOperation() throws Exception; - - /** - * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via - * #getStopException(). - */ - @Override - public void stop() { - closeIOHandle(); - } - - private synchronized void closeIOHandle() { - - if (!stopped) { - stopped = true; - - final D handle = ioHandle; - if (handle != null) { - try { - handle.close(); - } catch (IOException ex) { - stopException = ex; - } - } - } - } - - /** - * Returns the IO handle. - * @return the IO handle - */ - protected D getIoHandle() { - return ioHandle; - } - - /** - * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because - * it finished or was stopped. - */ - @Override - public void done(boolean canceled) { - //optional callback hook - } - - /** - * Check if the IO operation is stopped - * - * @return true if stop() was called - */ - @Override - public boolean isStopped() { - return stopped; - } - - /** - * Returns Exception that might happen on stop. - * - * @return Potential Exception that happened open stopping. - */ - @Override - public IOException getStopException() { - return stopException; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java index 560e56a..8698600 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.async; -import java.io.IOException; - /** * An asynchronous operation that can be stopped. */ @@ -42,6 +40,6 @@ public interface AsyncStoppable { * * @return Exception that can happen during stop */ - IOException getStopException(); + Exception getStopException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index b16ac06..1fb03d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.state; -import org.apache.commons.io.IOUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; @@ -34,11 +33,13 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; + +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,17 +226,37 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } // implementation of the async IO operation, based on FutureTask - final AbstractAsyncSnapshotIOCallable<OperatorStateHandle> ioCallable = - new AbstractAsyncSnapshotIOCallable<OperatorStateHandle>( - checkpointId, - timestamp, - streamFactory, - closeStreamOnCancelRegistry) { + final AbstractAsyncCallableWithResources<OperatorStateHandle> ioCallable = + new AbstractAsyncCallableWithResources<OperatorStateHandle>() { + + CheckpointStreamFactory.CheckpointStateOutputStream out = null; + + @Override + protected void acquireResources() throws Exception { + out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + closeStreamOnCancelRegistry.registerCloseable(out); + } + + @Override + protected void releaseResources() throws Exception { + if (closeStreamOnCancelRegistry.unregisterCloseable(out)) { + IOUtils.closeQuietly(out); + } + } + + @Override + protected void stopOperation() throws Exception { + if (closeStreamOnCancelRegistry.unregisterCloseable(out)) { + IOUtils.closeQuietly(out); + } + } @Override public OperatorStateHandle performOperation() throws Exception { long asyncStartTime = System.currentTimeMillis(); + CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out; + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(registeredStatesDeepCopies.size()); @@ -246,8 +267,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } - CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); - DataOutputView dov = new DataOutputViewStreamWrapper(out); + DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(metaInfoSnapshots); @@ -260,25 +280,30 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { registeredStatesDeepCopies.entrySet()) { PartitionableListState<?> value = entry.getValue(); - long[] partitionOffsets = value.write(out); + long[] partitionOffsets = value.write(localOut); OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); writtenStatesMetaData.put( entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); } - StreamStateHandle stateHandle = closeStreamAndGetStateHandle(); + OperatorStateHandle retValue = null; + + if (closeStreamOnCancelRegistry.unregisterCloseable(out)) { + + StreamStateHandle stateHandle = out.closeAndGetHandle(); + + if (stateHandle != null) { + retValue = new OperatorStateHandle(writtenStatesMetaData, stateHandle); + } + } if (asynchronousSnapshots) { LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); } - if (stateHandle == null) { - return null; - } - - return new OperatorStateHandle(writtenStatesMetaData, stateHandle); + return retValue; } }; @@ -308,7 +333,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } FSDataInputStream in = stateHandle.openInputStream(); - closeStreamOnCancelRegistry.registerClosable(in); + closeStreamOnCancelRegistry.registerCloseable(in); ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader(); @@ -370,8 +395,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } finally { Thread.currentThread().setContextClassLoader(restoreClassLoader); - closeStreamOnCancelRegistry.unregisterClosable(in); - IOUtils.closeQuietly(in); + if (closeStreamOnCancelRegistry.unregisterCloseable(in)) { + IOUtils.closeQuietly(in); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index 031d7c7..750d206 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -254,9 +254,10 @@ public class StateInitializationContextImpl implements StateInitializationContex this.offsets = metaOffsets; this.offPos = 0; - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; + if (closableRegistry.unregisterCloseable(currentStream)) { + IOUtils.closeQuietly(currentStream); + currentStream = null; + } return true; } @@ -308,14 +309,18 @@ public class StateInitializationContextImpl implements StateInitializationContex } protected void openCurrentStream() throws IOException { + + Preconditions.checkState(currentStream == null); + FSDataInputStream stream = currentStateHandle.openInputStream(); - closableRegistry.registerClosable(stream); + closableRegistry.registerCloseable(stream); currentStream = stream; } protected void closeCurrentStream() { - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); + if (closableRegistry.unregisterCloseable(currentStream)) { + IOUtils.closeQuietly(currentStream); + } currentStream = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index 5db0138..6a8a08f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -88,7 +88,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext CheckpointStreamFactory.CheckpointStateOutputStream cout = streamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); - closableRegistry.registerClosable(cout); + closableRegistry.registerCloseable(cout); return cout; } @@ -120,22 +120,25 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext } private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle( - NonClosingCheckpointOutputStream<T> stream) throws IOException { - if (null == stream) { + NonClosingCheckpointOutputStream<T> stream) throws IOException { + + if (null != stream && closableRegistry.unregisterCloseable(stream.getDelegate())) { + return stream.closeAndGetHandle(); + } else { return null; } - - closableRegistry.unregisterClosable(stream.getDelegate()); - - // for now we only support synchronous writing - return stream.closeAndGetHandle(); } - private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException { + private <T extends StreamStateHandle> void closeAndUnregisterStream( + NonClosingCheckpointOutputStream<T> stream) throws IOException { + Preconditions.checkNotNull(stream); - closableRegistry.unregisterClosable(stream.getDelegate()); - stream.getDelegate().close(); + CheckpointStreamFactory.CheckpointStateOutputStream delegate = stream.getDelegate(); + + if (closableRegistry.unregisterCloseable(delegate)) { + delegate.close(); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index e235b96..bf92b34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -35,8 +35,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -325,30 +325,56 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { //--------------------------------------------------- this becomes the end of sync part // implementation of the async IO operation, based on FutureTask - final AbstractAsyncSnapshotIOCallable<KeyedStateHandle> ioCallable = - new AbstractAsyncSnapshotIOCallable<KeyedStateHandle>( - checkpointId, - timestamp, - streamFactory, - cancelStreamRegistry) { + final AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = + new AbstractAsyncCallableWithResources<KeyedStateHandle>() { + + CheckpointStreamFactory.CheckpointStateOutputStream stream = null; + + @Override + protected void acquireResources() throws Exception { + stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + cancelStreamRegistry.registerCloseable(stream); + } + + @Override + protected void releaseResources() throws Exception { + + if (cancelStreamRegistry.unregisterCloseable(stream)) { + IOUtils.closeQuietly(stream); + stream = null; + } + + for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) { + tableSnapshot.release(); + } + } + + @Override + protected void stopOperation() throws Exception { + if (cancelStreamRegistry.unregisterCloseable(stream)) { + IOUtils.closeQuietly(stream); + stream = null; + } + } @Override public KeyGroupsStateHandle performOperation() throws Exception { long asyncStartTime = System.currentTimeMillis(); - CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); - DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream); + CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream; + + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream); serializationProxy.write(outView); long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); - keyGroupRangeOffsets[keyGroupPos] = stream.getPos(); + keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); outView.writeInt(keyGroupId); for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { - OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream); + OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream); DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); kgCompressionView.writeShort(kVStateToId.get(kvState.getKey())); cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId); @@ -356,21 +382,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - final StreamStateHandle streamStateHandle = closeStreamAndGetStateHandle(); + if (cancelStreamRegistry.unregisterCloseable(stream)) { - if (asynchronousSnapshots) { - LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); - } + final StreamStateHandle streamStateHandle = stream.closeAndGetHandle(); + stream = null; - if (streamStateHandle == null) { - return null; - } + if (asynchronousSnapshots) { + LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); + } - KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); - final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle); + if (streamStateHandle != null) { - return keyGroupsStateHandle; + KeyGroupRangeOffsets offsets = + new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); + + final KeyGroupsStateHandle keyGroupsStateHandle = + new KeyGroupsStateHandle(offsets, streamStateHandle); + + return keyGroupsStateHandle; + } + } + + return null; } }; @@ -425,7 +459,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream(); - cancelStreamRegistry.registerClosable(fsDataInputStream); + cancelStreamRegistry.registerCloseable(fsDataInputStream); try { DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream); @@ -533,8 +567,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } } finally { - cancelStreamRegistry.unregisterClosable(fsDataInputStream); - IOUtils.closeQuietly(fsDataInputStream); + if (cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) { + IOUtils.closeQuietly(fsDataInputStream); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 6089240..631cdfc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -709,7 +709,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> OperatorStateBackend operatorStateBackend = stateBackend.createOperatorStateBackend(env, opId); // let operator state backend participate in the operator lifecycle, i.e. make it responsive to cancelation - cancelables.registerClosable(operatorStateBackend); + cancelables.registerCloseable(operatorStateBackend); // restore if we have some old state if (null != restoreStateHandles) { @@ -742,7 +742,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> getEnvironment().getTaskKvStateRegistry()); // let keyed state backend participate in the operator lifecycle, i.e. make it responsive to cancelation - cancelables.registerClosable(keyedStateBackend); + cancelables.registerCloseable(keyedStateBackend); // restore if we have some old state Collection<KeyedStateHandle> restoreKeyedStateHandles = null; @@ -933,7 +933,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); } finally { - owner.cancelables.unregisterClosable(this); + owner.cancelables.unregisterCloseable(this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } @@ -1086,7 +1086,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> checkpointMetrics, startAsyncPartNano); - owner.cancelables.registerClosable(asyncCheckpointRunnable); + owner.cancelables.registerCloseable(asyncCheckpointRunnable); owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable); } http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 9bb91ad..811d700 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -1117,7 +1117,7 @@ public class StreamTaskTest extends TestLogger { holder.start(); try { // cancellation should try and cancel this - getCancelables().registerClosable(holder); + getCancelables().registerCloseable(holder); // wait till the lock holder has the lock latch.await(); http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 9156f34..793e8f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -220,7 +220,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { environment, operator.getClass().getSimpleName()); - mockTask.getCancelables().registerClosable(osb); + mockTask.getCancelables().registerCloseable(osb); if (null != stateHandles) { osb.restore(stateHandles); http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 4d5fa71..829ac93 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -180,7 +180,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } case ROCKSDB_FULLY_ASYNC: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); - RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE)); + String backups = tempFolder.newFolder().getAbsolutePath(); + RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups)); rdb.setDbStoragePath(rocksDb); this.stateBackend = rdb; break;