Repository: flink
Updated Branches:
  refs/heads/master d8dbaeeb4 -> 39fb7c945


[FLINK-1266] Properly pass the fs.defaulFS setting when initializing filesystems


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c024d819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c024d819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c024d819

Branch: refs/heads/master
Commit: c024d819cfd37e05a8bac9db8082c1ef03d3f3bb
Parents: 0af4d3a
Author: Robert Metzger <[email protected]>
Authored: Thu Jan 8 12:28:06 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Thu Jan 8 16:35:47 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c024d819/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 32e3f2d..37061bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -263,8 +263,8 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
        @Override
        public void initialize(URI path) throws IOException {
                
-               // For HDFS we have to have an authority
-               if (path.getAuthority() == null && 
path.getScheme().equals("hdfs")) {
+               // If the authority is not part of the path, we initialize with 
the fs.defaultFS entry.
+               if (path.getAuthority() == null) {
                        
                        String configEntry = this.conf.get("fs.defaultFS", 
null);
                        if (configEntry == null) {
@@ -277,31 +277,29 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
                        }
                        
                        if (configEntry == null) {
-                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs 
configuration was registered, " +
-                                               "or that configuration did not 
contain an entry for the default hdfs.");
+                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file 
system (hdfs) configuration was registered, " +
+                                               "or that configuration did not 
contain an entry for the default file system (usually 'fs.defaultFS').");
                        } else {
                                try {
                                        URI initURI = URI.create(configEntry);
                                        
                                        if (initURI.getAuthority() == null) {
-                                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs 
configuration was registered, " +
-                                                               "or the 
provided configuration contains no valid hdfs namenode address (fs.default.name 
or fs.defaultFS) describing the hdfs namenode host and port.");
-                                       } else if 
(!initURI.getScheme().equalsIgnoreCase("hdfs")) {
-                                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs 
configuration was registered, " +
-                                                               "or the 
provided configuration describes a file system with scheme '" + 
initURI.getScheme() + "' other than the Hadoop Distributed File System 
(HDFS).");
+                                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file 
system was registered, " +
+                                                               "or the 
provided configuration contains no valid authority component (fs.default.name 
or fs.defaultFS) " +
+                                                               "describing the 
(hdfs namenode) host and port.");
                                        } else {
                                                try {
                                                        
this.fs.initialize(initURI, this.conf);
                                                }
                                                catch (IOException e) {
                                                        throw new 
IOException(getMissingAuthorityErrorPrefix(path) +
-                                                                       "Could 
not initialize the file system connection with the given address of the HDFS 
NameNode: " + e.getMessage(), e);
+                                                                       "Could 
not initialize the file system connection with the given default file system 
address: " + e.getMessage(), e);
                                                }
                                        }
                                }
                                catch (IllegalArgumentException e) {
                                        throw new 
IOException(getMissingAuthorityErrorPrefix(path) +
-                                                       "The configuration 
contains an invalid hdfs default name (fs.default.name or fs.defaultFS): " + 
configEntry);
+                                                       "The configuration 
contains an invalid file system default name (fs.default.name or fs.defaultFS): 
" + configEntry);
                                }
                        } 
                }

Reply via email to