Repository: flink Updated Branches: refs/heads/master d8dbaeeb4 -> 39fb7c945
[FLINK-1266] Properly pass the fs.defaulFS setting when initializing filesystems Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c024d819 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c024d819 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c024d819 Branch: refs/heads/master Commit: c024d819cfd37e05a8bac9db8082c1ef03d3f3bb Parents: 0af4d3a Author: Robert Metzger <[email protected]> Authored: Thu Jan 8 12:28:06 2015 +0100 Committer: Robert Metzger <[email protected]> Committed: Thu Jan 8 16:35:47 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c024d819/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 32e3f2d..37061bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -263,8 +263,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst @Override public void initialize(URI path) throws IOException { - // For HDFS we have to have an authority - if (path.getAuthority() == null && path.getScheme().equals("hdfs")) { + // If the authority is not part of the path, we initialize with the fs.defaultFS entry. + if (path.getAuthority() == null) { String configEntry = this.conf.get("fs.defaultFS", null); if (configEntry == null) { @@ -277,31 +277,29 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst } if (configEntry == null) { - throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " + - "or that configuration did not contain an entry for the default hdfs."); + throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system (hdfs) configuration was registered, " + + "or that configuration did not contain an entry for the default file system (usually 'fs.defaultFS')."); } else { try { URI initURI = URI.create(configEntry); if (initURI.getAuthority() == null) { - throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " + - "or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port."); - } else if (!initURI.getScheme().equalsIgnoreCase("hdfs")) { - throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " + - "or the provided configuration describes a file system with scheme '" + initURI.getScheme() + "' other than the Hadoop Distributed File System (HDFS)."); + throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system was registered, " + + "or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) " + + "describing the (hdfs namenode) host and port."); } else { try { this.fs.initialize(initURI, this.conf); } catch (IOException e) { throw new IOException(getMissingAuthorityErrorPrefix(path) + - "Could not initialize the file system connection with the given address of the HDFS NameNode: " + e.getMessage(), e); + "Could not initialize the file system connection with the given default file system address: " + e.getMessage(), e); } } } catch (IllegalArgumentException e) { throw new IOException(getMissingAuthorityErrorPrefix(path) + - "The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS): " + configEntry); + "The configuration contains an invalid file system default name (fs.default.name or fs.defaultFS): " + configEntry); } } }
