http://git-wip-us.apache.org/repos/asf/flink/blob/f2909293/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
new file mode 100644
index 0000000..3f6cb1c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+public final class HadoopDataOutputStream extends FSDataOutputStream {
+
+       private org.apache.hadoop.fs.FSDataOutputStream fdos;
+
+       public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream 
fdos) {
+               this.fdos = fdos;
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+
+               fdos.write(b);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               fdos.write(b, off, len);
+       }
+
+       @Override
+       public void close() throws IOException {
+               fdos.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2909293/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
new file mode 100644
index 0000000..519791e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+
+/**
+ * Concrete implementation of the {@link FileStatus} interface for the
+ * Hadoop Distribution File System.
+ */
+public final class HadoopFileStatus implements FileStatus {
+
+       private org.apache.hadoop.fs.FileStatus fileStatus;
+
+       /**
+        * Creates a new file status from a HDFS file status.
+        * 
+        * @param fileStatus
+        *        the HDFS file status
+        */
+       public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
+               this.fileStatus = fileStatus;
+       }
+
+       @Override
+       public long getLen() {
+               return fileStatus.getLen();
+       }
+
+       @Override
+       public long getBlockSize() {
+               long blocksize = fileStatus.getBlockSize();
+               if (blocksize > fileStatus.getLen()) {
+                       return fileStatus.getLen();
+               }
+
+               return blocksize;
+       }
+
+       @Override
+       public long getAccessTime() {
+               return fileStatus.getAccessTime();
+       }
+
+       @Override
+       public long getModificationTime() {
+               return fileStatus.getModificationTime();
+       }
+
+       @Override
+       public short getReplication() {
+               return fileStatus.getReplication();
+       }
+
+       public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
+               return this.fileStatus;
+       }
+
+       @Override
+       public Path getPath() {
+               return new Path(fileStatus.getPath().toString());
+       }
+
+       @SuppressWarnings("deprecation")
+       @Override
+       public boolean isDir() {
+               return fileStatus.isDir();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2909293/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
new file mode 100644
index 0000000..e849d32
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.fs.hdfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import org.apache.flink.core.fs.AbstractHadoopWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Concrete implementation of the {@link FileSystem} base class for the Hadoop 
File System. The
+ * class is a wrapper class which encapsulated the original Hadoop HDFS API.
+ *
+ * If no file system class is specified, the wrapper will automatically load 
the Hadoop
+ * distributed file system (HDFS).
+ *
+ */
+public final class HadoopFileSystem extends FileSystem implements 
AbstractHadoopWrapper {
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFileSystem.class);
+       
+       private static final String DEFAULT_HDFS_CLASS = 
"org.apache.hadoop.hdfs.DistributedFileSystem";
+       
+       /**
+        * Configuration value name for the DFS implementation name. Usually 
not specified in hadoop configurations.
+        */
+       private static final String HDFS_IMPLEMENTATION_KEY = "fs.hdfs.impl";
+
+       private final org.apache.hadoop.conf.Configuration conf;
+
+       private final org.apache.hadoop.fs.FileSystem fs;
+
+
+       /**
+        * Creates a new DistributedFileSystem object to access HDFS
+        * 
+        * @throws IOException
+        *         throw if the required HDFS classes cannot be instantiated
+        */
+       public HadoopFileSystem(Class<? extends 
org.apache.hadoop.fs.FileSystem> fsClass) throws IOException {
+               // Create new Hadoop configuration object
+               this.conf = getHadoopConfiguration();
+
+               if(fsClass == null) {
+                       fsClass = getDefaultHDFSClass();
+               }
+
+               this.fs = instantiateFileSystem(fsClass);
+       }
+
+       private Class<? extends org.apache.hadoop.fs.FileSystem> 
getDefaultHDFSClass() throws IOException {
+               Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
+
+               // try to get the FileSystem implementation class Hadoop 2.0.0 
style
+               {
+                       LOG.debug("Trying to load HDFS class Hadoop 2.x 
style.");
+
+                       Object fsHandle = null;
+                       try {
+                               Method newApi = 
org.apache.hadoop.fs.FileSystem.class.getMethod("getFileSystemClass", 
String.class, org.apache.hadoop.conf.Configuration.class);
+                               fsHandle = newApi.invoke(null, "hdfs", conf);
+                       } catch (Exception e) {
+                               // if we can't find the FileSystem class using 
the new API,
+                               // clazz will still be null, we assume we're 
running on an older Hadoop version
+                       }
+
+                       if (fsHandle != null) {
+                               if (fsHandle instanceof Class && 
org.apache.hadoop.fs.FileSystem.class.isAssignableFrom((Class<?>) fsHandle)) {
+                                       fsClass = ((Class<?>) 
fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Loaded '" + 
fsClass.getName() + "' as HDFS class.");
+                                       }
+                               }
+                               else {
+                                       LOG.debug("Unexpected return type from 
'org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration)'.");
+                                       throw new RuntimeException("The value 
returned from org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, 
Configuration) is not a valid subclass of org.apache.hadoop.fs.FileSystem.");
+                               }
+                       }
+               }
+
+               // fall back to an older Hadoop version
+               if (fsClass == null)
+               {
+                       // first of all, check for a user-defined hdfs class
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Falling back to loading HDFS class 
old Hadoop style. Looking for HDFS class configuration entry '"
+                                               + HDFS_IMPLEMENTATION_KEY + 
"'.");
+                       }
+
+                       Class<?> classFromConfig = 
conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
+
+                       if (classFromConfig != null)
+                       {
+                               if 
(org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(classFromConfig)) {
+                                       fsClass = 
classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Loaded HDFS class '" 
+ fsClass.getName() + "' as specified in configuration.");
+                                       }
+                               }
+                               else {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("HDFS class specified 
by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
+                                       }
+
+                                       throw new IOException("HDFS class 
specified by " + HDFS_IMPLEMENTATION_KEY +
+                                                       " cannot be cast to a 
FileSystem type.");
+                               }
+                       }
+                       else {
+                               // load the default HDFS class
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Trying to load default HDFS 
implementation " + DEFAULT_HDFS_CLASS);
+                               }
+
+                               try {
+                                       Class <?> reflectedClass = 
Class.forName(DEFAULT_HDFS_CLASS);
+                                       if 
(org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(reflectedClass)) {
+                                               fsClass = 
reflectedClass.asSubclass(org.apache.hadoop.fs.FileSystem.class);
+                                       } else {
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Default HDFS 
class is of wrong type.");
+                                               }
+
+                                               throw new IOException("The 
default HDFS class '" + DEFAULT_HDFS_CLASS +
+                                                               "' cannot be 
cast to a FileSystem type.");
+                                       }
+                               }
+                               catch (ClassNotFoundException e) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Default HDFS class 
cannot be loaded.");
+                                       }
+
+                                       throw new IOException("No HDFS class 
has been configured and the default class '" +
+                                                       DEFAULT_HDFS_CLASS + "' 
cannot be loaded.");
+                               }
+                       }
+               }
+               return fsClass;
+       }
+
+       /**
+        * Returns a new Hadoop Configuration object using the path to the 
hadoop conf configured 
+        * in the main configuration (flink-conf.yaml).
+        * This method is public because its being used in the HadoopDataSource.
+        */
+       public static org.apache.hadoop.conf.Configuration 
getHadoopConfiguration() {
+               Configuration retConf = new 
org.apache.hadoop.conf.Configuration();
+
+               // We need to load both core-site.xml and hdfs-site.xml to 
determine the default fs path and
+               // the hdfs configuration
+               // Try to load HDFS configuration from Hadoop's own 
configuration files
+               // 1. approach: Flink configuration
+               final String hdfsDefaultPath = 
GlobalConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+               if (hdfsDefaultPath != null) {
+                       retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsDefaultPath));
+               } else {
+                       LOG.debug("Cannot find hdfs-default configuration 
file");
+               }
+
+               final String hdfsSitePath = 
GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+               if (hdfsSitePath != null) {
+                       retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsSitePath));
+               } else {
+                       LOG.debug("Cannot find hdfs-site configuration file");
+               }
+               
+               // 2. Approach environment variables
+               String[] possibleHadoopConfPaths = new String[4]; 
+               possibleHadoopConfPaths[0] = 
GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+               possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+               
+               if (System.getenv("HADOOP_HOME") != null) {
+                       possibleHadoopConfPaths[2] = 
System.getenv("HADOOP_HOME")+"/conf";
+                       possibleHadoopConfPaths[3] = 
System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+               }
+
+               for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+                       if (possibleHadoopConfPath != null) {
+                               if (new File(possibleHadoopConfPath).exists()) {
+                                       if (new File(possibleHadoopConfPath + 
"/core-site.xml").exists()) {
+                                               retConf.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Adding " + 
possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+                                               }
+                                       }
+                                       if (new File(possibleHadoopConfPath + 
"/hdfs-site.xml").exists()) {
+                                               retConf.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Adding " + 
possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+                                               }
+                                       }
+                               }
+                       }
+               }
+               return retConf;
+       }
+       
+       private org.apache.hadoop.fs.FileSystem instantiateFileSystem(Class<? 
extends org.apache.hadoop.fs.FileSystem> fsClass)
+               throws IOException
+       {
+               try {
+                       return fsClass.newInstance();
+               }
+               catch (ExceptionInInitializerError e) {
+                       throw new IOException("The filesystem class '" + 
fsClass.getName() + "' throw an exception upon initialization.", 
e.getException());
+               }
+               catch (Throwable t) {
+                       String errorMessage = 
InstantiationUtil.checkForInstantiationError(fsClass);
+                       if (errorMessage != null) {
+                               throw new IOException("The filesystem class '" 
+ fsClass.getName() + "' cannot be instantiated: " + errorMessage);
+                       } else {
+                               throw new IOException("An error occurred while 
instantiating the filesystem class '" +
+                                               fsClass.getName() + "'.", t);
+                       }
+               }
+       }
+
+
+       @Override
+       public Path getWorkingDirectory() {
+               return new Path(this.fs.getWorkingDirectory().toUri());
+       }
+
+       @Override
+       public URI getUri() {
+               return fs.getUri();
+       }
+
+       @Override
+       public void initialize(URI path) throws IOException {
+               
+               // For HDFS we have to have an authority
+               if (path.getAuthority() == null && 
path.getScheme().equals("hdfs")) {
+                       
+                       String configEntry = this.conf.get("fs.defaultFS", 
null);
+                       if (configEntry == null) {
+                               // fs.default.name deprecated as of hadoop 
2.2.0 
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
+                               configEntry = this.conf.get("fs.default.name", 
null);
+                       }
+                       
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("fs.defaultFS is set to " + 
configEntry);
+                       }
+                       
+                       if (configEntry == null) {
+                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs 
configuration was registered, " +
+                                               "or that configuration did not 
contain an entry for the default hdfs.");
+                       } else {
+                               try {
+                                       URI initURI = URI.create(configEntry);
+                                       
+                                       if (initURI.getAuthority() == null) {
+                                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs 
configuration was registered, " +
+                                                               "or the 
provided configuration contains no valid hdfs namenode address (fs.default.name 
or fs.defaultFS) describing the hdfs namenode host and port.");
+                                       } else if 
(!initURI.getScheme().equalsIgnoreCase("hdfs")) {
+                                               throw new 
IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs 
configuration was registered, " +
+                                                               "or the 
provided configuration describes a file system with scheme '" + 
initURI.getScheme() + "' other than the Hadoop Distributed File System 
(HDFS).");
+                                       } else {
+                                               try {
+                                                       
this.fs.initialize(initURI, this.conf);
+                                               }
+                                               catch (IOException e) {
+                                                       throw new 
IOException(getMissingAuthorityErrorPrefix(path) +
+                                                                       "Could 
not initialize the file system connection with the given address of the HDFS 
NameNode: " + e.getMessage(), e);
+                                               }
+                                       }
+                               }
+                               catch (IllegalArgumentException e) {
+                                       throw new 
IOException(getMissingAuthorityErrorPrefix(path) +
+                                                       "The configuration 
contains an invalid hdfs default name (fs.default.name or fs.defaultFS): " + 
configEntry);
+                               }
+                       } 
+               }
+               else {
+                       // Initialize file system
+                       try {
+                               this.fs.initialize(path, this.conf);
+                       }
+                       catch (UnknownHostException e) {
+                               String message = "The (HDFS NameNode) host at 
'" + path.getAuthority()
+                                               + "', specified by file path '" 
+ path.toString() + "', cannot be resolved"
+                                               + (e.getMessage() != null ? ": 
" + e.getMessage() : ".");
+                               
+                               if (path.getPort() == -1) {
+                                       message += " Hint: Have you forgotten a 
slash? (correct URI would be 'hdfs:///" + path.getAuthority() + path.getPath() 
+ "' ?)";
+                               }
+                               
+                               throw new IOException(message, e);
+                       }
+                       catch (Exception e) {
+                               throw new IOException("The given file URI (" + 
path.toString() + ") points to the HDFS NameNode at "
+                                               + path.getAuthority() + ", but 
the File System could not be initialized with that address"
+                                       + (e.getMessage() != null ? ": " + 
e.getMessage() : "."), e);
+                       }
+               }
+       }
+       
+       private static String getMissingAuthorityErrorPrefix(URI path) {
+               return "The given HDFS file URI (" + path.toString() + ") did 
not describe the HDFS NameNode." +
+                               " The attempt to use a default HDFS 
configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG + 
"' or '" + 
+                               ConfigConstants.HDFS_SITE_CONFIG + "' config 
parameter failed due to the following problem: ";
+       }
+
+
+       @Override
+       public FileStatus getFileStatus(final Path f) throws IOException {
+               org.apache.hadoop.fs.FileStatus status = 
this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
+               return new HadoopFileStatus(status);
+       }
+
+       @Override
+       public BlockLocation[] getFileBlockLocations(final FileStatus file, 
final long start, final long len)
+       throws IOException
+       {
+               if (!(file instanceof HadoopFileStatus)) {
+                       throw new IOException("file is not an instance of 
DistributedFileStatus");
+               }
+
+               final HadoopFileStatus f = (HadoopFileStatus) file;
+
+               final org.apache.hadoop.fs.BlockLocation[] blkLocations = 
fs.getFileBlockLocations(f.getInternalFileStatus(),
+                       start, len);
+
+               // Wrap up HDFS specific block location objects
+               final HadoopBlockLocation[] distBlkLocations = new 
HadoopBlockLocation[blkLocations.length];
+               for (int i = 0; i < distBlkLocations.length; i++) {
+                       distBlkLocations[i] = new 
HadoopBlockLocation(blkLocations[i]);
+               }
+
+               return distBlkLocations;
+       }
+
+       @Override
+       public FSDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
+
+               final org.apache.hadoop.fs.FSDataInputStream fdis = 
this.fs.open(new org.apache.hadoop.fs.Path(f.toString()),
+                       bufferSize);
+
+               return new HadoopDataInputStream(fdis);
+       }
+
+       @Override
+       public FSDataInputStream open(final Path f) throws IOException {
+               final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(new 
org.apache.hadoop.fs.Path(f.toString()));
+               return new HadoopDataInputStream(fdis);
+       }
+
+       @Override
+       public FSDataOutputStream create(final Path f, final boolean overwrite, 
final int bufferSize,
+                       final short replication, final long blockSize)
+       throws IOException
+       {
+               final org.apache.hadoop.fs.FSDataOutputStream fdos = 
this.fs.create(
+                       new org.apache.hadoop.fs.Path(f.toString()), overwrite, 
bufferSize, replication, blockSize);
+               return new HadoopDataOutputStream(fdos);
+       }
+
+
+       @Override
+       public FSDataOutputStream create(final Path f, final boolean overwrite) 
throws IOException {
+               final org.apache.hadoop.fs.FSDataOutputStream 
fsDataOutputStream = this.fs
+                       .create(new org.apache.hadoop.fs.Path(f.toString()), 
overwrite);
+               return new HadoopDataOutputStream(fsDataOutputStream);
+       }
+
+       @Override
+       public boolean delete(final Path f, final boolean recursive) throws 
IOException {
+               return this.fs.delete(new 
org.apache.hadoop.fs.Path(f.toString()), recursive);
+       }
+
+       @Override
+       public FileStatus[] listStatus(final Path f) throws IOException {
+               final org.apache.hadoop.fs.FileStatus[] hadoopFiles = 
this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
+               final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+               // Convert types
+               for (int i = 0; i < files.length; i++) {
+                       files[i] = new HadoopFileStatus(hadoopFiles[i]);
+               }
+               
+               return files;
+       }
+
+       @Override
+       public boolean mkdirs(final Path f) throws IOException {
+               return this.fs.mkdirs(new 
org.apache.hadoop.fs.Path(f.toString()));
+       }
+
+       @Override
+       public boolean rename(final Path src, final Path dst) throws 
IOException {
+               return this.fs.rename(new 
org.apache.hadoop.fs.Path(src.toString()),
+                       new org.apache.hadoop.fs.Path(dst.toString()));
+       }
+
+       @SuppressWarnings("deprecation")
+       @Override
+       public long getDefaultBlockSize() {
+               return this.fs.getDefaultBlockSize();
+       }
+
+       @Override
+       public boolean isDistributedFS() {
+               return true;
+       }
+
+       @Override
+       public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
+               Configuration hadoopConf = getHadoopConfiguration();
+               Class<? extends org.apache.hadoop.fs.FileSystem> clazz =  null;
+               // We can activate this block once we drop Hadoop1 support 
(only hd2 has the getFileSystemClass-method)
+//             try {
+//                     clazz = 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
+//             } catch (IOException e) {
+//                     LOG.info("Flink could not load the Hadoop File system 
implementation for scheme "+scheme);
+//                     return null;
+//             }
+               clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, 
org.apache.hadoop.fs.FileSystem.class);
+
+               if(clazz != null && LOG.isDebugEnabled()) {
+                       LOG.debug("Flink supports "+scheme+" with the Hadoop 
file system wrapper, impl "+clazz);
+               }
+               return clazz;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2909293/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 616ab69..11216da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -35,10 +35,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.fs.hdfs.DistributedBlockLocation;
-import org.apache.flink.runtime.fs.hdfs.DistributedDataInputStream;
-import org.apache.flink.runtime.fs.hdfs.DistributedDataOutputStream;
-import org.apache.flink.runtime.fs.hdfs.DistributedFileStatus;
+import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
 
 /**
  * Concrete implementation of the {@link FileSystem} base class for the MapR
@@ -268,27 +268,27 @@ public final class MapRFileSystem extends FileSystem {
                final org.apache.hadoop.fs.FileStatus status = this.fs
                                .getFileStatus(new 
org.apache.hadoop.fs.Path(f.toString()));
 
-               return new DistributedFileStatus(status);
+               return new HadoopFileStatus(status);
        }
 
        @Override
        public BlockLocation[] getFileBlockLocations(final FileStatus file,
                        final long start, final long len) throws IOException {
 
-               if (!(file instanceof DistributedFileStatus)) {
+               if (!(file instanceof HadoopFileStatus)) {
                        throw new IOException(
                                        "file is not an instance of 
DistributedFileStatus");
                }
 
-               final DistributedFileStatus f = (DistributedFileStatus) file;
+               final HadoopFileStatus f = (HadoopFileStatus) file;
 
                final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs
                                
.getFileBlockLocations(f.getInternalFileStatus(), start, len);
 
                // Wrap up HDFS specific block location objects
-               final DistributedBlockLocation[] distBlkLocations = new 
DistributedBlockLocation[blkLocations.length];
+               final HadoopBlockLocation[] distBlkLocations = new 
HadoopBlockLocation[blkLocations.length];
                for (int i = 0; i < distBlkLocations.length; i++) {
-                       distBlkLocations[i] = new 
DistributedBlockLocation(blkLocations[i]);
+                       distBlkLocations[i] = new 
HadoopBlockLocation(blkLocations[i]);
                }
 
                return distBlkLocations;
@@ -301,7 +301,7 @@ public final class MapRFileSystem extends FileSystem {
                final org.apache.hadoop.fs.FSDataInputStream fdis = 
this.fs.open(
                                new org.apache.hadoop.fs.Path(f.toString()), 
bufferSize);
 
-               return new DistributedDataInputStream(fdis);
+               return new HadoopDataInputStream(fdis);
        }
 
        @Override
@@ -310,7 +310,7 @@ public final class MapRFileSystem extends FileSystem {
                final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs
                                .open(new 
org.apache.hadoop.fs.Path(f.toString()));
 
-               return new DistributedDataInputStream(fdis);
+               return new HadoopDataInputStream(fdis);
        }
 
        @Override
@@ -322,7 +322,7 @@ public final class MapRFileSystem extends FileSystem {
                                new org.apache.hadoop.fs.Path(f.toString()), 
overwrite,
                                bufferSize, replication, blockSize);
 
-               return new DistributedDataOutputStream(fdos);
+               return new HadoopDataOutputStream(fdos);
        }
 
        @Override
@@ -332,7 +332,7 @@ public final class MapRFileSystem extends FileSystem {
                final org.apache.hadoop.fs.FSDataOutputStream fdos = 
this.fs.create(
                                new org.apache.hadoop.fs.Path(f.toString()), 
overwrite);
 
-               return new DistributedDataOutputStream(fdos);
+               return new HadoopDataOutputStream(fdos);
        }
 
        @Override
@@ -352,7 +352,7 @@ public final class MapRFileSystem extends FileSystem {
 
                // Convert types
                for (int i = 0; i < files.length; i++) {
-                       files[i] = new DistributedFileStatus(hadoopFiles[i]);
+                       files[i] = new HadoopFileStatus(hadoopFiles[i]);
                }
 
                return files;

http://git-wip-us.apache.org/repos/asf/flink/blob/f2909293/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 63025af..6c1798f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -307,17 +307,127 @@ under the License.
                                <groupId>org.apache.hadoop</groupId>
                                <artifactId>hadoop-core</artifactId>
                                <version>${hadoop.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>asm</groupId>
+                                               <artifactId>asm</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-compiler</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-runtime</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jetty</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jsp-api-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jsp-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jetty-util</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.eclipse.jdt</groupId>
+                                               <artifactId>core</artifactId>
+                                       </exclusion>
+                               </exclusions>
                        </dependency>
                        <!-- YARN -->
                        <dependency>
                                <groupId>org.apache.hadoop</groupId>
                                <artifactId>hadoop-common</artifactId>
                                <version>${hadoop.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>asm</groupId>
+                                               <artifactId>asm</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-compiler</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-runtime</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jetty</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jsp-api-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jsp-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jetty-util</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.eclipse.jdt</groupId>
+                                               <artifactId>core</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>javax.servlet</groupId>
+                                               
<artifactId>servlet-api</artifactId>
+                                       </exclusion>
+                               </exclusions>
                        </dependency>
                        <dependency>
                                <groupId>org.apache.hadoop</groupId>
                                <artifactId>hadoop-hdfs</artifactId>
                                <version>${hadoop.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>asm</groupId>
+                                               <artifactId>asm</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-compiler</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-runtime</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jetty</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jsp-api-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jsp-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jetty-util</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.eclipse.jdt</groupId>
+                                               <artifactId>core</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>javax.servlet</groupId>
+                                               
<artifactId>servlet-api</artifactId>
+                                       </exclusion>
+                               </exclusions>
                        </dependency>
                        <dependency>
                                <groupId>org.apache.hadoop</groupId>
@@ -328,11 +438,79 @@ under the License.
                                <groupId>org.apache.hadoop</groupId>
                                
<artifactId>hadoop-mapreduce-client-core</artifactId>
                                <version>${hadoop.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>asm</groupId>
+                                               <artifactId>asm</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-compiler</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-runtime</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jetty</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jsp-api-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jsp-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jetty-util</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.eclipse.jdt</groupId>
+                                               <artifactId>core</artifactId>
+                                       </exclusion>
+                               </exclusions>
                        </dependency>
                        <dependency>
                                <groupId>org.apache.hadoop</groupId>
                                <artifactId>hadoop-yarn-client</artifactId>
                                <version>${hadoop.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>asm</groupId>
+                                               <artifactId>asm</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-compiler</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>tomcat</groupId>
+                                               
<artifactId>jasper-runtime</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jetty</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jsp-api-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               <artifactId>jsp-2.1</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.mortbay.jetty</groupId>
+                                               
<artifactId>jetty-util</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               
<groupId>org.eclipse.jdt</groupId>
+                                               <artifactId>core</artifactId>
+                                       </exclusion>
+                               </exclusions>
                        </dependency>
                </dependencies>
        </dependencyManagement>

Reply via email to