[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/854b0537 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/854b0537 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/854b0537 Branch: refs/heads/release-1.3 Commit: 854b05376a459a6197e41e141bb28a9befe481ad Parents: 3569f80 Author: Stephan Ewen <se...@apache.org> Authored: Tue Jul 25 17:19:25 2017 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Jul 28 15:15:30 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/core/fs/FileSystem.java | 5 + .../apache/flink/core/fs/FileSystemKind.java | 40 ++++++++ .../core/fs/SafetyNetWrapperFileSystem.java | 5 + .../flink/core/fs/local/LocalFileSystem.java | 6 ++ .../core/fs/local/LocalFileSystemTest.java | 7 ++ .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++++++++++++++++++ .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 47 +++++++++ .../flink/runtime/fs/maprfs/MapRFileSystem.java | 7 +- 8 files changed, 217 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index fab0f4d..e76992d 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -659,6 +659,11 @@ public abstract class FileSystem { */ public abstract boolean isDistributedFS(); + /** + * Gets a description of the characteristics of this file system. + */ + public abstract FileSystemKind getKind(); + // ------------------------------------------------------------------------ // output directory initialization // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java new file mode 100644 index 0000000..52f58ac --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enumeration defining the kind and characteristics of a {@link FileSystem}. + */ +@PublicEvolving +public enum FileSystemKind { + + /** + * An actual file system, with files and directories. + */ + FILE_SYSTEM, + + /** + * An Object store. Files correspond to objects. + * There are not really directories, but a directory-like structure may be mimicked + * by hierarchical naming of files. + */ + OBJECT_STORE +} http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index 1dacafd..63a263a 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -146,6 +146,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr } @Override + public FileSystemKind getKind() { + return unsafeFileSystem.getKind(); + } + + @Override public FileSystem getWrappedDelegate() { return unsafeFileSystem; } http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java index 0e3e9f3..579f856 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java @@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.util.OperatingSystem; @@ -289,4 +290,9 @@ public class LocalFileSystem extends FileSystem { public boolean isDistributedFS() { return false; } + + @Override + public FileSystemKind getKind() { + return FileSystemKind.FILE_SYSTEM; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java index 2312ee9..96c5269 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java @@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.util.FileUtils; @@ -312,4 +313,10 @@ public class LocalFileSystemTest { assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI()))); assertTrue(new File(dstFolder, srcFile.getName()).exists()); } + + @Test + public void testKind() { + final FileSystem fs = FileSystem.getLocalFileSystem(); + assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java new file mode 100644 index 0000000..69ecdb8 --- /dev/null +++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for extracting the {@link FileSystemKind} from file systems that Flink + * accesses through Hadoop's File System interface. + * + * <p>This class needs to be in this package, because it accesses package private methods + * from the HDFS file system wrapper class. + */ +public class HdfsKindTest extends TestLogger { + + @Test + public void testHdfsKind() throws IOException { + final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem(); + assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind()); + } + + @Test + public void testS3Kind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3.S3FileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path"); + return; + } + + final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind()); + } + + @Test + public void testS3nKind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path"); + return; + } + + final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind()); + } + + @Test + public void testS3aKind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path"); + return; + } + + final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind()); + } + + @Test + public void testS3fileSystemSchemes() { + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3")); + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n")); + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a")); + assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS")); + } + + @Test + public void testViewFs() { + assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index d539b2a..b4f4609 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -22,10 +22,12 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.BlockLocation; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.HadoopFileSystemWrapper; import org.apache.flink.core.fs.Path; import org.apache.flink.util.InstantiationUtil; import org.apache.hadoop.conf.Configuration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.net.URI; import java.net.UnknownHostException; +import java.util.Locale; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -60,6 +63,10 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst private final org.apache.hadoop.fs.FileSystem fs; + /* This field caches the file system kind. It is lazily set because the file system + * URL is lazily initialized. */ + private FileSystemKind fsKind; + /** * Creates a new DistributedFileSystem object to access HDFS, based on a class name @@ -464,6 +471,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst } @Override + public FileSystemKind getKind() { + if (fsKind == null) { + fsKind = getKindForScheme(this.fs.getUri().getScheme()); + } + return fsKind; + } + + @Override public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) { Configuration hadoopConf = getHadoopConfiguration(); Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null; @@ -478,4 +493,36 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst } return clazz; } + + /** + * Gets the kind of the file system from its scheme. + * + * <p>Implementation note: Initially, especially within the Flink 1.3.x line + * (in order to not break backwards compatibility), we must only label file systems + * as 'inconsistent' or as 'not proper filesystems' if we are sure about it. + * Otherwise, we cause regression for example in the performance and cleanup handling + * of checkpoints. + * For that reason, we initially mark some filesystems as 'eventually consistent' or + * as 'object stores', and leave the others as 'consistent file systems'. + */ + static FileSystemKind getKindForScheme(String scheme) { + scheme = scheme.toLowerCase(Locale.US); + + if (scheme.startsWith("s3") || scheme.startsWith("emr")) { + // the Amazon S3 storage + return FileSystemKind.OBJECT_STORE; + } + else if (scheme.startsWith("http") || scheme.startsWith("ftp")) { + // file servers instead of file systems + // they might actually be consistent, but we have no hard guarantees + // currently to rely on that + return FileSystemKind.OBJECT_STORE; + } + else { + // the remainder should include hdfs, kosmos, ceph, ... + // this also includes federated HDFS (viewfs). + return FileSystemKind.FILE_SYSTEM; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java index 57eea6f..1ec34bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java @@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation; import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream; @@ -381,7 +382,11 @@ public final class MapRFileSystem extends FileSystem { @Override public boolean isDistributedFS() { - return true; } + + @Override + public FileSystemKind getKind() { + return FileSystemKind.FILE_SYSTEM; + } }