Repository: flink
Updated Branches:
refs/heads/master ccc3e44cb -> 61d69a229
[FLINK-3948] Protect RocksDB cleanup by cleanup lock
Before, it could happen that an asynchronous checkpoint was going on
when trying to do cleanup. Now we protect cleanup and asynchronous
checkpointing by a lock.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d69a22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d69a22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d69a22
Branch: refs/heads/master
Commit: 61d69a229b40e52460f26804e4a36cf12e150004
Parents: cfffdc8
Author: Aljoscha Krettek
Authored: Fri May 20 22:37:14 2016 +0200
Committer: Aljoscha Krettek
Committed: Mon Jun 6 09:29:33 2016 +0200
--
.../flink-statebackend-rocksdb/pom.xml | 2 +-
.../streaming/state/RocksDBStateBackend.java| 110 ---
2 files changed, 70 insertions(+), 42 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/flink/blob/61d69a22/flink-contrib/flink-statebackend-rocksdb/pom.xml
--
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index cccdc20..115752c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -50,7 +50,7 @@ under the License.
org.rocksdb
rocksdbjni
- 4.1.0
+ 4.5.1
org.apache.flink
http://git-wip-us.apache.org/repos/asf/flink/blob/61d69a22/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 4b7d7ee..4c44249 100644
---
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -155,7 +155,14 @@ public class RocksDBStateBackend extends
AbstractStateBackend {
* to store state. The different k/v states that we have don't each
have their own RocksDB
* instance. They all write to this instance but to their own column
family.
*/
- protected transient RocksDB db;
+ protected volatile transient RocksDB db;
+
+ /**
+* Lock for protecting cleanup of the RocksDB db. We acquire this when
doing asynchronous
+* checkpoints and when disposing the db. Otherwise, the asynchronous
snapshot might try
+* iterating over a disposed db.
+*/
+ private Object dbCleanupLock;
/**
* Information about the k/v states as we create them. This is used to
retrieve the
@@ -282,6 +289,8 @@ public class RocksDBStateBackend extends
AbstractStateBackend {
throw new RuntimeException("Error cleaning RocksDB data
directory.", e);
}
+ dbCleanupLock = new Object();
+
List columnFamilyDescriptors = new
ArrayList<>(1);
// RocksDB seems to need this...
columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("default".getBytes()));
@@ -305,28 +314,44 @@ public class RocksDBStateBackend extends
AbstractStateBackend {
super.dispose();
nonPartitionedStateBackend.dispose();
- if (this.dbOptions != null) {
- this.dbOptions.dispose();
- this.dbOptions = null;
- }
- for (Tuple2 column:
kvStateInformation.values()) {
- column.f0.dispose();
+ // we have to lock because we might have an asynchronous
checkpoint going on
+ synchronized (dbCleanupLock) {
+ if (db != null) {
+ if (this.dbOptions != null) {
+ this.dbOptions.dispose();
+ this.dbOptions = null;
+ }
+
+ for (Tuple2 column : kvStateInformation.values()) {
+ column.f0.dispose();
+