Repository: flink Updated Branches: refs/heads/release-1.4 35517f129 -> 8a052bf09
[FLINK-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a052bf0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a052bf0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a052bf0 Branch: refs/heads/release-1.4 Commit: 8a052bf0948d92d6fccc4d1c6c4bd2aa459032c9 Parents: 35517f1 Author: Bowen Li <bowenl...@gmail.com> Authored: Tue Oct 10 07:31:17 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Nov 23 13:51:41 2017 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8a052bf0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index f67daab..9185ad0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -235,20 +235,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); this.instanceRocksDBPath = new File(instanceBasePath, "db"); - if (!instanceBasePath.exists()) { - if (!instanceBasePath.mkdirs()) { - throw new IOException("Could not create RocksDB data directory."); - } + if (instanceBasePath.exists()) { + // Clear the base directory when the backend is created + // in case something crashed and the backend never reached dispose() + cleanInstanceBasePath(); } - // clean it, this will remove the last part of the path but RocksDB will recreate it - try { - if (instanceRocksDBPath.exists()) { - LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath); - FileUtils.deleteDirectory(instanceRocksDBPath); - } - } catch (IOException e) { - throw new IOException("Error cleaning RocksDB data directory.", e); + if (!instanceBasePath.mkdirs()) { + throw new IOException("Could not create RocksDB data directory."); } this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; @@ -312,10 +306,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { IOUtils.closeQuietly(dbOptions); IOUtils.closeQuietly(columnOptions); + cleanInstanceBasePath(); + } + + private void cleanInstanceBasePath() { + LOG.info("Deleting existing instance base directory {}.", instanceBasePath); + try { FileUtils.deleteDirectory(instanceBasePath); - } catch (IOException ioex) { - LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath, ioex); + } catch (IOException ex) { + LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex); } }