[FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp 
directories.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7532cb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7532cb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7532cb7

Branch: refs/heads/release-1.5
Commit: b7532cb738398aa3b05c0769705c45fe22eb6a04
Parents: 56e2b0b
Author: Stephan Ewen <[email protected]>
Authored: Mon Apr 30 22:50:24 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Apr 30 23:25:43 2018 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  69 ++++++---
 .../state/RocksDBStateBackendConfigTest.java    | 139 +++++++++++++++----
 2 files changed, 168 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7532cb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9389295..81d6265 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,6 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend 
implements Configu
        /** The number of (re)tries for loading the RocksDB JNI library. */
        private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
 
+       /** Flag whether the native library has been loaded. */
        private static boolean rocksDbInitialized = false;
 
        // 
------------------------------------------------------------------------
@@ -96,7 +97,7 @@ public class RocksDBStateBackend extends AbstractStateBackend 
implements Configu
         * Null if not yet set, in which case the configuration values will be 
used.
         * The configuration defaults to the TaskManager's temp directories. */
        @Nullable
-       private Path[] localRocksDbDirectories;
+       private File[] localRocksDbDirectories;
 
        /** The pre-configured option settings. */
        private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
@@ -169,6 +170,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
+       @SuppressWarnings("deprecation")
        public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
                this(new FsStateBackend(checkpointDataUri));
        }
@@ -186,6 +188,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
+       @SuppressWarnings("deprecation")
        public RocksDBStateBackend(URI checkpointDataUri, boolean 
enableIncrementalCheckpointing) throws IOException {
                this(new FsStateBackend(checkpointDataUri), 
enableIncrementalCheckpointing);
        }
@@ -326,16 +329,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                }
                else {
                        List<File> dirs = new 
ArrayList<>(localRocksDbDirectories.length);
-                       String errorMessage = "";
+                       StringBuilder errorMessage = new StringBuilder();
 
-                       for (Path path : localRocksDbDirectories) {
-                               File f = new File(path.toUri().getPath());
+                       for (File f : localRocksDbDirectories) {
                                File testDir = new File(f, 
UUID.randomUUID().toString());
                                if (!testDir.mkdirs()) {
-                                       String msg = "Local DB files directory 
'" + path
+                                       String msg = "Local DB files directory 
'" + f
                                                        + "' does not exist and 
cannot be created. ";
                                        LOG.error(msg);
-                                       errorMessage += msg;
+                                       errorMessage.append(msg);
                                } else {
                                        dirs.add(f);
                                }
@@ -455,9 +457,13 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        }
 
        /**
-        * Sets the paths across which the local RocksDB database files are 
distributed on the local
-        * file system. Setting these paths overrides the default behavior, 
where the
-        * files are stored across the configured temp directories.
+        * Sets the directories in which the local RocksDB database puts its 
files (like SST and
+        * metadata files). These directories do not need to be persistent, 
they can be ephemeral,
+        * meaning that they are lost on a machine failure, because state in 
RocksDB is persisted
+        * in checkpoints.
+        *
+        * <p>If nothing is configured, these directories default to the 
TaskManager's local
+        * temporary file directories.
         *
         * <p>Each distinct state will be stored in one path, but when the 
state backend creates
         * multiple states, they will store their files on different paths.
@@ -475,17 +481,41 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                        throw new IllegalArgumentException("empty paths");
                }
                else {
-                       Path[] pp = new Path[paths.length];
+                       File[] pp = new File[paths.length];
 
                        for (int i = 0; i < paths.length; i++) {
-                               if (paths[i] == null) {
+                               final String rawPath = paths[i];
+                               final String path;
+
+                               if (rawPath == null) {
                                        throw new 
IllegalArgumentException("null path");
                                }
+                               else {
+                                       // we need this for backwards 
compatibility, to allow URIs like 'file:///'...
+                                       URI uri = null;
+                                       try {
+                                               uri = new Path(rawPath).toUri();
+                                       }
+                                       catch (Exception e) {
+                                               // cannot parse as a path
+                                       }
 
-                               pp[i] = new Path(paths[i]);
-                               String scheme = pp[i].toUri().getScheme();
-                               if (scheme != null && 
!scheme.equalsIgnoreCase("file")) {
-                                       throw new 
IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
+                                       if (uri != null && uri.getScheme() != 
null) {
+                                               if 
("file".equalsIgnoreCase(uri.getScheme())) {
+                                                       path = uri.getPath();
+                                               }
+                                               else {
+                                                       throw new 
IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
+                                               }
+                                       }
+                                       else {
+                                               path = rawPath;
+                                       }
+                               }
+
+                               pp[i] = new File(path);
+                               if (!pp[i].isAbsolute()) {
+                                       throw new 
IllegalArgumentException("Relative paths are not supported");
                                }
                        }
 
@@ -494,8 +524,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        }
 
        /**
+        * Gets the configured local DB storage paths, or null, if none were 
configured.
+        *
+        * <p>Under these directories on the TaskManager, RocksDB stores its 
SST files and
+        * metadata files. These directories do not need to be persistent, they 
can be ephermeral,
+        * meaning that they are lost on a machine failure, because state in 
RocksDB is persisted
+        * in checkpoints.
         *
-        * @return The configured DB storage paths, or null, if none were 
configured.
+        * <p>If nothing is configured, these directories default to the 
TaskManager's local
+        * temporary file directories.
         */
        public String[] getDbStoragePaths() {
                if (localRocksDbDirectories == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7532cb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 65d5b2e..4bc2f9f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -67,10 +70,10 @@ import static org.mockito.Mockito.when;
 public class RocksDBStateBackendConfigTest {
 
        @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
 
        // 
------------------------------------------------------------------------
-       //  RocksDB local file directory
+       //  default values
        // 
------------------------------------------------------------------------
 
        @Test
@@ -81,39 +84,99 @@ public class RocksDBStateBackendConfigTest {
                assertEquals(defaultIncremental, 
backend.isIncrementalCheckpointsEnabled());
        }
 
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file directory
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This test checks the behavior for basic setting of local DB 
directories.
+        */
        @Test
        public void testSetDbPath() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               File testDir1 = tempFolder.newFolder();
-               File testDir2 = tempFolder.newFolder();
+               final RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+               final String testDir1 = 
tempFolder.newFolder().getAbsolutePath();
+               final String testDir2 = 
tempFolder.newFolder().getAbsolutePath();
 
                assertNull(rocksDbBackend.getDbStoragePaths());
 
-               rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
-               assertArrayEquals(new String[] { new 
Path(testDir1.getAbsolutePath()).toString() }, 
rocksDbBackend.getDbStoragePaths());
+               rocksDbBackend.setDbStoragePath(testDir1);
+               assertArrayEquals(new String[] { testDir1 }, 
rocksDbBackend.getDbStoragePaths());
 
                rocksDbBackend.setDbStoragePath(null);
                assertNull(rocksDbBackend.getDbStoragePaths());
 
-               rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), 
testDir2.getAbsolutePath());
-               assertArrayEquals(new String[] { new 
Path(testDir1.getAbsolutePath()).toString(), new 
Path(testDir2.getAbsolutePath()).toString() }, 
rocksDbBackend.getDbStoragePaths());
+               rocksDbBackend.setDbStoragePaths(testDir1, testDir2);
+               assertArrayEquals(new String[] { testDir1, testDir2 }, 
rocksDbBackend.getDbStoragePaths());
 
-               Environment env = getMockEnvironment();
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-                               createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               env.getTaskKvStateRegistry());
+               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend);
+
+               try {
+                       File instanceBasePath = 
keyedBackend.getInstanceBasePath();
+                       assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(testDir1), startsWith(testDir2)));
+
+                       //noinspection NullArgumentToVariableArgMethod
+                       rocksDbBackend.setDbStoragePaths(null);
+                       assertNull(rocksDbBackend.getDbStoragePaths());
+               }
+               finally {
+                       IOUtils.closeQuietly(keyedBackend);
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testStoragePathWithFilePrefix() throws Exception {
+               final File folder = tempFolder.newFolder();
+               final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+
+               assertTrue(dbStoragePath.startsWith("file:"));
+
+               testLocalDbPaths(dbStoragePath, folder);
+       }
+
+       @Test
+       public void testWithDefaultFsSchemeNoStoragePath() throws Exception {
+               try {
+                       // set the default file system scheme
+                       Configuration config = new Configuration();
+                       config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, 
"s3://mydomain.com:8020/flink");
+                       FileSystem.initialize(config);
+
+                       testLocalDbPaths(null, new 
File(CommonTestUtils.getTempDir()));
+               }
+               finally {
+                       FileSystem.initialize(new Configuration());
+               }
+       }
+
+       @Test
+       public void testWithDefaultFsSchemeAbsoluteStoragePath() throws 
Exception {
+               final File folder = tempFolder.newFolder();
+               final String dbStoragePath = folder.getAbsolutePath();
+
+               try {
+                       // set the default file system scheme
+                       Configuration config = new Configuration();
+                       config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, 
"s3://mydomain.com:8020/flink");
+                       FileSystem.initialize(config);
+
+                       testLocalDbPaths(dbStoragePath, folder);
+               }
+               finally {
+                       FileSystem.initialize(new Configuration());
+               }
+       }
+
+       private void testLocalDbPaths(String configuredPath, File expectedPath) 
throws Exception {
+               final RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+               rocksDbBackend.setDbStoragePath(configuredPath);
+
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend);
 
                try {
                        File instanceBasePath = 
keyedBackend.getInstanceBasePath();
-                       assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(testDir1.getAbsolutePath()), 
startsWith(testDir2.getAbsolutePath())));
+                       assertThat(instanceBasePath.getAbsolutePath(), 
startsWith(expectedPath.getAbsolutePath()));
 
                        //noinspection NullArgumentToVariableArgMethod
                        rocksDbBackend.setDbStoragePaths(null);
@@ -124,13 +187,19 @@ public class RocksDBStateBackendConfigTest {
                }
        }
 
+       /**
+        * Validates that empty arguments for the local DB path are invalid.
+        */
        @Test(expected = IllegalArgumentException.class)
-       public void testSetNullPaths() throws Exception {
+       public void testSetEmptyPaths() throws Exception {
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
                rocksDbBackend.setDbStoragePaths();
        }
 
+       /**
+        * Validates that schemes other than 'file:/' are not allowed.
+        */
        @Test(expected = IllegalArgumentException.class)
        public void testNonFileSchemePath() throws Exception {
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
@@ -138,6 +207,12 @@ public class RocksDBStateBackendConfigTest {
                
rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
        }
 
+       @Test(expected = IllegalArgumentException.class)
+       public void testDbPathRelativePaths() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+               rocksDbBackend.setDbStoragePath("relative/path");
+       }
+
        // 
------------------------------------------------------------------------
        //  RocksDB local file automatic from temp directories
        // 
------------------------------------------------------------------------
@@ -381,7 +456,7 @@ public class RocksDBStateBackendConfigTest {
 
        @Test
        public void testCallsForwardedToNonPartitionedBackend() throws 
Exception {
-               AbstractStateBackend storageBackend = new MemoryStateBackend();
+               StateBackend storageBackend = new MemoryStateBackend();
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
                assertEquals(storageBackend, 
rocksDbBackend.getCheckpointBackend());
        }
@@ -390,6 +465,22 @@ public class RocksDBStateBackendConfigTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
+       static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+                       RocksDBStateBackend rocksDbBackend) throws Exception {
+
+               final Environment env = getMockEnvironment();
+
+               return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+                               createKeyedStateBackend(
+                                               env,
+                                               env.getJobID(),
+                                               "test_op",
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               new KeyGroupRange(0, 0),
+                                               env.getTaskKvStateRegistry());
+       }
+
        static Environment getMockEnvironment() {
                return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
        }

Reply via email to