Repository: flink
Updated Branches:
  refs/heads/master 97f0cac2a -> 2479ff53c


[FLINK-8559][RocksDB] Release resources if snapshot operation fails

This closes #5412.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbb81acb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbb81acb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbb81acb

Branch: refs/heads/master
Commit: dbb81acb5a1d0f2a9521c6eef7eeb2436bb8004d
Parents: 5e41eaa
Author: zentol <[email protected]>
Authored: Mon Feb 5 13:15:29 2018 +0100
Committer: zentol <[email protected]>
Committed: Tue Feb 6 20:20:47 2018 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbb81acb/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 29a0854..5507339 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -377,7 +377,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                checkpointId,
                                checkpointTimestamp);
 
-               snapshotOperation.takeSnapshot();
+               try {
+                       snapshotOperation.takeSnapshot();
+               } catch (Exception e) {
+                       snapshotOperation.stop();
+                       snapshotOperation.releaseResources(true);
+                       throw e;
+               }
 
                return new FutureTask<KeyedStateHandle>(
                        new Callable<KeyedStateHandle>() {

Reply via email to