[FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp directories.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7532cb7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7532cb7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7532cb7 Branch: refs/heads/release-1.5 Commit: b7532cb738398aa3b05c0769705c45fe22eb6a04 Parents: 56e2b0b Author: Stephan Ewen <[email protected]> Authored: Mon Apr 30 22:50:24 2018 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Apr 30 23:25:43 2018 +0200 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 69 ++++++--- .../state/RocksDBStateBackendConfigTest.java | 139 +++++++++++++++---- 2 files changed, 168 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b7532cb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 9389295..81d6265 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -83,6 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu /** The number of (re)tries for loading the RocksDB JNI library. */ private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3; + /** Flag whether the native library has been loaded. */ private static boolean rocksDbInitialized = false; // ------------------------------------------------------------------------ @@ -96,7 +97,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * Null if not yet set, in which case the configuration values will be used. * The configuration defaults to the TaskManager's temp directories. */ @Nullable - private Path[] localRocksDbDirectories; + private File[] localRocksDbDirectories; /** The pre-configured option settings. */ private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT; @@ -169,6 +170,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ + @SuppressWarnings("deprecation") public RocksDBStateBackend(URI checkpointDataUri) throws IOException { this(new FsStateBackend(checkpointDataUri)); } @@ -186,6 +188,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ + @SuppressWarnings("deprecation") public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException { this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing); } @@ -326,16 +329,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } else { List<File> dirs = new ArrayList<>(localRocksDbDirectories.length); - String errorMessage = ""; + StringBuilder errorMessage = new StringBuilder(); - for (Path path : localRocksDbDirectories) { - File f = new File(path.toUri().getPath()); + for (File f : localRocksDbDirectories) { File testDir = new File(f, UUID.randomUUID().toString()); if (!testDir.mkdirs()) { - String msg = "Local DB files directory '" + path + String msg = "Local DB files directory '" + f + "' does not exist and cannot be created. "; LOG.error(msg); - errorMessage += msg; + errorMessage.append(msg); } else { dirs.add(f); } @@ -455,9 +457,13 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } /** - * Sets the paths across which the local RocksDB database files are distributed on the local - * file system. Setting these paths overrides the default behavior, where the - * files are stored across the configured temp directories. + * Sets the directories in which the local RocksDB database puts its files (like SST and + * metadata files). These directories do not need to be persistent, they can be ephemeral, + * meaning that they are lost on a machine failure, because state in RocksDB is persisted + * in checkpoints. + * + * <p>If nothing is configured, these directories default to the TaskManager's local + * temporary file directories. * * <p>Each distinct state will be stored in one path, but when the state backend creates * multiple states, they will store their files on different paths. @@ -475,17 +481,41 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu throw new IllegalArgumentException("empty paths"); } else { - Path[] pp = new Path[paths.length]; + File[] pp = new File[paths.length]; for (int i = 0; i < paths.length; i++) { - if (paths[i] == null) { + final String rawPath = paths[i]; + final String path; + + if (rawPath == null) { throw new IllegalArgumentException("null path"); } + else { + // we need this for backwards compatibility, to allow URIs like 'file:///'... + URI uri = null; + try { + uri = new Path(rawPath).toUri(); + } + catch (Exception e) { + // cannot parse as a path + } - pp[i] = new Path(paths[i]); - String scheme = pp[i].toUri().getScheme(); - if (scheme != null && !scheme.equalsIgnoreCase("file")) { - throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme"); + if (uri != null && uri.getScheme() != null) { + if ("file".equalsIgnoreCase(uri.getScheme())) { + path = uri.getPath(); + } + else { + throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme"); + } + } + else { + path = rawPath; + } + } + + pp[i] = new File(path); + if (!pp[i].isAbsolute()) { + throw new IllegalArgumentException("Relative paths are not supported"); } } @@ -494,8 +524,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } /** + * Gets the configured local DB storage paths, or null, if none were configured. + * + * <p>Under these directories on the TaskManager, RocksDB stores its SST files and + * metadata files. These directories do not need to be persistent, they can be ephermeral, + * meaning that they are lost on a machine failure, because state in RocksDB is persisted + * in checkpoints. * - * @return The configured DB storage paths, or null, if none were configured. + * <p>If nothing is configured, these directories default to the TaskManager's local + * temporary file directories. */ public String[] getDbStoragePaths() { if (localRocksDbDirectories == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/b7532cb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 65d5b2e..4bc2f9f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -23,14 +23,17 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -67,10 +70,10 @@ import static org.mockito.Mockito.when; public class RocksDBStateBackendConfigTest { @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + public final TemporaryFolder tempFolder = new TemporaryFolder(); // ------------------------------------------------------------------------ - // RocksDB local file directory + // default values // ------------------------------------------------------------------------ @Test @@ -81,39 +84,99 @@ public class RocksDBStateBackendConfigTest { assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled()); } + // ------------------------------------------------------------------------ + // RocksDB local file directory + // ------------------------------------------------------------------------ + + /** + * This test checks the behavior for basic setting of local DB directories. + */ @Test public void testSetDbPath() throws Exception { - String checkpointPath = tempFolder.newFolder().toURI().toString(); - File testDir1 = tempFolder.newFolder(); - File testDir2 = tempFolder.newFolder(); + final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + final String testDir1 = tempFolder.newFolder().getAbsolutePath(); + final String testDir2 = tempFolder.newFolder().getAbsolutePath(); assertNull(rocksDbBackend.getDbStoragePaths()); - rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath()); - assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths()); + rocksDbBackend.setDbStoragePath(testDir1); + assertArrayEquals(new String[] { testDir1 }, rocksDbBackend.getDbStoragePaths()); rocksDbBackend.setDbStoragePath(null); assertNull(rocksDbBackend.getDbStoragePaths()); - rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath()); - assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString(), new Path(testDir2.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths()); + rocksDbBackend.setDbStoragePaths(testDir1, testDir2); + assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths()); - Environment env = getMockEnvironment(); - RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. - createKeyedStateBackend( - env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry()); + final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend); + + try { + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1), startsWith(testDir2))); + + //noinspection NullArgumentToVariableArgMethod + rocksDbBackend.setDbStoragePaths(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + } + finally { + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + } + } + + @Test + public void testStoragePathWithFilePrefix() throws Exception { + final File folder = tempFolder.newFolder(); + final String dbStoragePath = new Path(folder.toURI().toString()).toString(); + + assertTrue(dbStoragePath.startsWith("file:")); + + testLocalDbPaths(dbStoragePath, folder); + } + + @Test + public void testWithDefaultFsSchemeNoStoragePath() throws Exception { + try { + // set the default file system scheme + Configuration config = new Configuration(); + config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink"); + FileSystem.initialize(config); + + testLocalDbPaths(null, new File(CommonTestUtils.getTempDir())); + } + finally { + FileSystem.initialize(new Configuration()); + } + } + + @Test + public void testWithDefaultFsSchemeAbsoluteStoragePath() throws Exception { + final File folder = tempFolder.newFolder(); + final String dbStoragePath = folder.getAbsolutePath(); + + try { + // set the default file system scheme + Configuration config = new Configuration(); + config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink"); + FileSystem.initialize(config); + + testLocalDbPaths(dbStoragePath, folder); + } + finally { + FileSystem.initialize(new Configuration()); + } + } + + private void testLocalDbPaths(String configuredPath, File expectedPath) throws Exception { + final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); + rocksDbBackend.setDbStoragePath(configuredPath); + + RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend); try { File instanceBasePath = keyedBackend.getInstanceBasePath(); - assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath()))); + assertThat(instanceBasePath.getAbsolutePath(), startsWith(expectedPath.getAbsolutePath())); //noinspection NullArgumentToVariableArgMethod rocksDbBackend.setDbStoragePaths(null); @@ -124,13 +187,19 @@ public class RocksDBStateBackendConfigTest { } } + /** + * Validates that empty arguments for the local DB path are invalid. + */ @Test(expected = IllegalArgumentException.class) - public void testSetNullPaths() throws Exception { + public void testSetEmptyPaths() throws Exception { String checkpointPath = tempFolder.newFolder().toURI().toString(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); rocksDbBackend.setDbStoragePaths(); } + /** + * Validates that schemes other than 'file:/' are not allowed. + */ @Test(expected = IllegalArgumentException.class) public void testNonFileSchemePath() throws Exception { String checkpointPath = tempFolder.newFolder().toURI().toString(); @@ -138,6 +207,12 @@ public class RocksDBStateBackendConfigTest { rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition"); } + @Test(expected = IllegalArgumentException.class) + public void testDbPathRelativePaths() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); + rocksDbBackend.setDbStoragePath("relative/path"); + } + // ------------------------------------------------------------------------ // RocksDB local file automatic from temp directories // ------------------------------------------------------------------------ @@ -381,7 +456,7 @@ public class RocksDBStateBackendConfigTest { @Test public void testCallsForwardedToNonPartitionedBackend() throws Exception { - AbstractStateBackend storageBackend = new MemoryStateBackend(); + StateBackend storageBackend = new MemoryStateBackend(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend); assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend()); } @@ -390,6 +465,22 @@ public class RocksDBStateBackendConfigTest { // Utilities // ------------------------------------------------------------------------ + static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend( + RocksDBStateBackend rocksDbBackend) throws Exception { + + final Environment env = getMockEnvironment(); + + return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. + createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); + } + static Environment getMockEnvironment() { return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); }
