Repository: flink
Updated Branches:
  refs/heads/master 200612ee0 -> ccf917de2


[FLIN-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf917de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf917de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf917de

Branch: refs/heads/master
Commit: ccf917de23ac94b032da11fb536d778f0566792f
Parents: 200612e
Author: Bowen Li <bowenl...@gmail.com>
Authored: Mon Oct 9 22:31:17 2017 -0700
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Nov 23 12:38:58 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 28 ++++++++++----------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccf917de/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f67daab..9185ad0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -235,20 +235,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.instanceBasePath = 
Preconditions.checkNotNull(instanceBasePath);
                this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-               if (!instanceBasePath.exists()) {
-                       if (!instanceBasePath.mkdirs()) {
-                               throw new IOException("Could not create RocksDB 
data directory.");
-                       }
+               if (instanceBasePath.exists()) {
+                       // Clear the base directory when the backend is created
+                       // in case something crashed and the backend never 
reached dispose()
+                       cleanInstanceBasePath();
                }
 
-               // clean it, this will remove the last part of the path but 
RocksDB will recreate it
-               try {
-                       if (instanceRocksDBPath.exists()) {
-                               LOG.warn("Deleting already existing db 
directory {}.", instanceRocksDBPath);
-                               FileUtils.deleteDirectory(instanceRocksDBPath);
-                       }
-               } catch (IOException e) {
-                       throw new IOException("Error cleaning RocksDB data 
directory.", e);
+               if (!instanceBasePath.mkdirs()) {
+                       throw new IOException("Could not create RocksDB data 
directory.");
                }
 
                this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
@@ -312,10 +306,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                IOUtils.closeQuietly(dbOptions);
                IOUtils.closeQuietly(columnOptions);
 
+               cleanInstanceBasePath();
+       }
+
+       private void cleanInstanceBasePath() {
+               LOG.info("Deleting existing instance base directory {}.", 
instanceBasePath);
+
                try {
                        FileUtils.deleteDirectory(instanceBasePath);
-               } catch (IOException ioex) {
-                       LOG.info("Could not delete instace base path for 
RocksDB: " + instanceBasePath, ioex);
+               } catch (IOException ex) {
+                       LOG.warn("Could not delete instance base path for 
RocksDB: " + instanceBasePath, ex);
                }
        }
 

Reply via email to