Repository: incubator-reef
Updated Branches:
  refs/heads/master 4a5a6672e -> a98a5ecb5


[REEF-492] A HDFS implementation of IFileSystem

This adds an `IFileSystem` implementation using the `hdfs` command
available on Hadoop. It also adds tests for it, which have been disabled
as they only work if HDFS is available.

JIRA:
  [REEF-492](https://issues.apache.org/jira/browse/REEF-492)

Pull Request:
  This closes #339

Author:    Markus Weimer <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/a98a5ecb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/a98a5ecb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/a98a5ecb

Branch: refs/heads/master
Commit: a98a5ecb53ac492be38621fdf3c6c1f456f60fff
Parents: 4a5a667
Author: Markus Weimer <[email protected]>
Authored: Mon Jul 27 10:55:25 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Mon Aug 17 15:10:57 2015 -0700

----------------------------------------------------------------------
 .../Files/PathUtilities.cs                      |  55 ++++
 .../Org.Apache.REEF.Common.csproj               |   1 +
 .../FileSystemTestUtilities.cs                  |  78 ++++++
 .../Org.Apache.REEF.IO.Tests.csproj             |   2 +
 .../TestHadoopFileSystem.cs                     | 147 +++++++++++
 .../FileSystem/Hadoop/CommandResult.cs          |  51 ++++
 .../FileSystem/Hadoop/HDFSCommandRunner.cs      | 249 +++++++++++++++++++
 .../FileSystem/Hadoop/HadoopFileSystem.cs       | 132 ++++++++++
 .../Hadoop/HadoopFileSystemConfiguration.cs     |  56 +++++
 .../Hadoop/Parameters/CommandTimeOut.cs         |  29 +++
 .../FileSystem/Hadoop/Parameters/HadoopHome.cs  |  30 +++
 .../Hadoop/Parameters/NumberOfRetries.cs        |  29 +++
 .../Org.Apache.REEF.IO.csproj                   |  15 ++
 .../Properties/AssemblyInfo.cs                  |  13 +-
 14 files changed, 886 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs 
b/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs
new file mode 100644
index 0000000..863226d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+
+namespace Org.Apache.REEF.Common.Files
+{
+    /// <summary>
+    /// Utility class for dealing with Paths
+    /// </summary>
+    public static class PathUtilities
+    {
+        /// <summary>
+        /// Normalizes a path for easy comparison.
+        /// </summary>
+        /// <param name="path"></param>
+        /// <returns></returns>
+        public static string NormalizePath(string path)
+        {
+            return Path
+                .GetFullPath(path) // Get the full path 
+                .TrimEnd(Path.DirectorySeparatorChar, 
Path.AltDirectorySeparatorChar) // Remove trailing `/` and `\`
+                .ToUpperInvariant(); // Windows ignores cases.
+        }
+
+        /// <summary>
+        /// Compares the two paths *after* they have been normalized using 
NormalizePath.
+        /// </summary>
+        /// <param name="path1"></param>
+        /// <param name="path2"></param>
+        /// <returns></returns>
+        public static bool AreNormalizedEquals(string path1, string path2)
+        {
+            if (null == path1 || null == path2)
+            {
+                return false;
+            }
+            return NormalizePath(path1).Equals(NormalizePath(path2));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 63843c4..91ed394 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -88,6 +88,7 @@ under the License.
     <Compile Include="Exceptions\EvaluatorException.cs" />
     <Compile Include="Exceptions\JobException.cs" />
     <Compile Include="FailedRuntime.cs" />
+    <Compile Include="Files\PathUtilities.cs" />
     <Compile Include="IContextAndTaskSubmittable.cs" />
     <Compile Include="IContextSubmittable.cs" />
     <Compile Include="IJobMessageObserver.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs 
b/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs
new file mode 100644
index 0000000..44d622a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+
+namespace Org.Apache.REEF.IO.Tests
+{
+    /// <summary>
+    /// Static methods helpful when testing IFileSystem implementations.
+    /// </summary>
+    internal sealed class FileSystemTestUtilities
+    {
+        /// <summary>
+        /// Test content used when creating file system test files.
+        /// </summary>
+        internal const byte TestByte = 123;
+
+        /// <summary>
+        /// Compares the contents of the two files given.
+        /// </summary>
+        /// <param name="path1"></param>
+        /// <param name="path2"></param>
+        /// <returns></returns>
+        internal static bool HaveSameContent(string path1, string path2)
+        {
+            using (var s1 = File.OpenRead(path1))
+            {
+                using (var s2 = File.OpenRead(path2))
+                {
+                    var byte1 = s1.ReadByte();
+                    var byte2 = s2.ReadByte();
+                    if (byte1 != byte2)
+                    {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
+        /// <summary>
+        /// Creates a temp file and writes TestByte to it.
+        /// </summary>
+        /// <returns>The path to the test file.</returns>
+        internal static string MakeLocalTempFile()
+        {
+            var result = Path.GetTempFileName();
+            MakeLocalTestFile(result);
+            return result;
+        }
+
+        private static void MakeLocalTestFile(string filePath)
+        {
+            if (File.Exists(filePath))
+            {
+                File.Delete(filePath);
+            }
+            using (var s = File.Create(filePath))
+            {
+                s.WriteByte(TestByte);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj 
b/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj
index 66971a4..704c7b9 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj
@@ -40,7 +40,9 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="FileSystemTestUtilities.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="TestHadoopFileSystem.cs" />
     <Compile Include="TestLocalFileSystem.cs" />
     <Compile Include="TestRandomDataSet.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs 
b/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs
new file mode 100644
index 0000000..7e26231
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using System.Linq;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.IO.FileSystem.Hadoop;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.IO.Tests
+{
+    /// <summary>
+    /// Tests for HadoopFileSystem.
+    /// </summary>
+    /// <see cref="HadoopFileSystem" />
+    [TestClass]
+    [Ignore] // These tests need to be run in an environment with HDFS 
installed.
+    public sealed class TestHadoopFileSystem
+    {
+        private HadoopFileSystem _fileSystem;
+
+        private Uri GetTempUri()
+        {
+            return
+                new Uri(_fileSystem.UriPrefix + "/tmp/TestHadoopFileSystem-" +
+                        DateTime.Now.ToString("yyyyMMddHHmmssfff"));
+        }
+
+        /// <summary>
+        /// Sets up the file system instance to be used for the tests.
+        /// </summary>
+        [TestInitialize]
+        public void SetupFileSystem()
+        {
+            _fileSystem =
+                TangFactory.GetTang()
+                    
.NewInjector(HadoopFileSystemConfiguration.ConfigurationModule.Build())
+                    .GetInstance<HadoopFileSystem>();
+        }
+
+        /// <summary>
+        /// Creates a temp file locally, uploads it to HDFS and downloads it 
again.
+        /// </summary>
+        [TestMethod]
+        public void TestCopyFromLocalAndBack()
+        {
+            var localFile = FileSystemTestUtilities.MakeLocalTempFile();
+            var localFileDownloaded = localFile + ".2";
+            var remoteUri = GetTempUri();
+
+            _fileSystem.CopyFromLocal(localFile, remoteUri);
+            _fileSystem.CopyToLocal(remoteUri, localFileDownloaded);
+
+            Assert.IsTrue(message: "A file up and downloaded should exist on 
the local file system.",
+                condition: File.Exists(localFileDownloaded));
+            Assert.IsTrue(message: "A file up and downloaded should not have 
changed content.",
+                condition: FileSystemTestUtilities.HaveSameContent(localFile, 
localFileDownloaded));
+
+            _fileSystem.Delete(remoteUri);
+            File.Delete(localFile);
+            File.Delete(localFileDownloaded);
+        }
+
+        /// <summary>
+        /// Tests whether .Exists() works.
+        /// </summary>
+        [TestMethod]
+        public void TestExists()
+        {
+            var remoteUri = GetTempUri();
+            Assert.IsFalse(message: "The file should not exist yet", 
condition: _fileSystem.Exists(remoteUri));
+            var localFile = FileSystemTestUtilities.MakeLocalTempFile();
+            _fileSystem.CopyFromLocal(localFile, remoteUri);
+            Assert.IsTrue(message: "The file should now exist", condition: 
_fileSystem.Exists(remoteUri));
+            _fileSystem.Delete(remoteUri);
+            Assert.IsFalse(message: "The file should no longer exist", 
condition: _fileSystem.Exists(remoteUri));
+            File.Delete(localFile);
+        }
+
+        /// <summary>
+        /// Tests for .GetChildren().
+        /// </summary>
+        [TestMethod]
+        public void TestGetChildren()
+        {
+            // Make a directory
+            var remoteDirectory = GetTempUri();
+            _fileSystem.CreateDirectory(remoteDirectory);
+            // Check that it is empty
+            Assert.AreEqual(message: "The directory should be empty.", 
expected: 0,
+                actual: _fileSystem.GetChildren(remoteDirectory).Count());
+            // Upload some localfile there
+            var localTempFile = FileSystemTestUtilities.MakeLocalTempFile();
+            var remoteUri = new Uri(remoteDirectory, 
Path.GetFileName(localTempFile));
+            _fileSystem.CopyFromLocal(localTempFile, remoteUri);
+            // Check that it is on the listing
+            var uriInResult = _fileSystem.GetChildren(remoteUri).First();
+            Assert.AreEqual(remoteUri, uriInResult);
+
+            // Download the file and make sure it is the same as before
+            var downloadedFileName = localTempFile + ".downloaded";
+            _fileSystem.CopyToLocal(uriInResult, downloadedFileName);
+            FileSystemTestUtilities.HaveSameContent(localTempFile, 
downloadedFileName);
+            File.Delete(localTempFile);
+            File.Delete(downloadedFileName);
+
+            // Delete the file
+            _fileSystem.Delete(remoteUri);
+            // Check that the folder is empty again
+            Assert.AreEqual(message: "The directory should be empty.", 
expected: 0,
+                actual: _fileSystem.GetChildren(remoteDirectory).Count());
+            // Delete the folder
+            _fileSystem.DeleteDirectory(remoteDirectory);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(NotImplementedException),
+            "Open() is not supported by HadoopFileSystem. Use CopyToLocal and 
open the local file instead.")]
+        public void TestOpen()
+        {
+            _fileSystem.Open(GetTempUri());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(NotImplementedException),
+            "Create() is not supported by HadoopFileSystem. Create a local 
file and use CopyFromLocal instead.")]
+        public void TestCreate()
+        {
+            _fileSystem.Create(GetTempUri());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs
new file mode 100644
index 0000000..bfd38c7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop
+{
+    internal sealed class CommandResult
+    {
+        private readonly int _exitCode;
+        private readonly IList<string> _stdErr;
+        private readonly IList<string> _stdOut;
+
+        internal CommandResult(List<string> stdOut, List<string> stdErr, int 
exitCode)
+        {
+            _stdOut = stdOut;
+            _stdErr = stdErr;
+            _exitCode = exitCode;
+        }
+
+        internal IList<string> StdOut
+        {
+            get { return _stdOut; }
+        }
+
+        internal IList<string> StdErr
+        {
+            get { return _stdErr; }
+        }
+
+        // Note: This is almost always 0, as the way we launch the hdfs 
process doesn't provide an exit code.
+        internal int ExitCode
+        {
+            get { return _exitCode; }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs
new file mode 100644
index 0000000..7a10a8a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop
+{
+    /// <summary>
+    /// Helper class to execute commands
+    /// </summary>
+    internal sealed class HdfsCommandRunner
+    {
+        /// <summary>
+        /// The name of the <code>hdfs</code> command.
+        /// </summary>
+        private const string HdfsCommandName = "hdfs.cmd";
+
+        /// <summary>
+        /// The folder within <code>HadoopHome</code> that contains the 
<code>hdfs</code> command.
+        /// </summary>
+        private const string BinFolderName = "bin";
+
+        /// <summary>
+        /// The name of the HADOOP_HOME environment variable.
+        /// </summary>
+        private const string HadoopHomeEnvironmentVariableName = "HADOOP_HOME";
+
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(HdfsCommandRunner));
+
+        /// <summary>
+        /// Path to hdfs.cmd
+        /// </summary>
+        private readonly string _hdfsCommandPath;
+
+        /// <summary>
+        /// The number of retries on HDFS commands.
+        /// </summary>
+        private readonly int _numberOfRetries;
+
+        /// <summary>
+        /// The timeout on each of the retries.
+        /// </summary>
+        private readonly int _timeOutInMilliSeconds;
+
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="numberOfRetries"></param>
+        /// <param name="timeOutInMilliSeconds"></param>
+        /// <param name="hadoopHome"></param>
+        /// <exception cref="FileNotFoundException">If the hdfs command can't 
be found.</exception>
+        [Inject]
+        private HdfsCommandRunner([Parameter(typeof(NumberOfRetries))] int 
numberOfRetries,
+            [Parameter(typeof(CommandTimeOut))] int timeOutInMilliSeconds,
+            [Parameter(typeof(HadoopHome))] string hadoopHome)
+        {
+            _numberOfRetries = numberOfRetries;
+            _timeOutInMilliSeconds = timeOutInMilliSeconds;
+
+            if (!PathUtilities.AreNormalizedEquals(hadoopHome, 
HadoopHome.DefaultValue))
+            {
+                // The user provided a Hadoop Home folder. 
+                if (Directory.Exists(hadoopHome))
+                {
+                    // The user provided folder does exist.
+                    _hdfsCommandPath = GetFullPathToHdfsCommand(hadoopHome);
+                }
+                else
+                {
+                    // The user provided folder does not exist. Try the 
environment variable.
+                    Logger.Log(Level.Warning,
+                        "The provided hadoop home folder {0} doesn't exist, 
trying environment variable {1} instead",
+                        hadoopHome, HadoopHomeEnvironmentVariableName);
+                    _hdfsCommandPath = 
GetFullPathToHdfsCommandBasedOnEnvironmentVariable();
+                }
+            }
+            else
+            {
+                // The user did not provide a Hadoop Home folder. Use the 
Environment variable.
+                _hdfsCommandPath = 
GetFullPathToHdfsCommandBasedOnEnvironmentVariable();
+            }
+
+            // Make sure we found the command.
+            if (!File.Exists(_hdfsCommandPath))
+            {
+                throw new FileNotFoundException("HDFS Command not found", 
_hdfsCommandPath);
+            }
+        }
+
+        internal CommandResult Run(string hdfsCommandLineArguments)
+        {
+            var processStartInfo = new ProcessStartInfo
+            {
+                FileName = _hdfsCommandPath,
+                Arguments = hdfsCommandLineArguments,
+                UseShellExecute = false,
+                RedirectStandardOutput = true,
+                RedirectStandardError = true
+            };
+            for (var attemptNumber = 0; attemptNumber < _numberOfRetries; 
++attemptNumber)
+            {
+                var processName = String.Format("HDFS_Attempt_{0}_of_{1}", 
attemptNumber, _numberOfRetries);
+                var result = RunAttempt(processStartInfo, 
_timeOutInMilliSeconds, processName);
+                if (null != result)
+                {
+                    LogCommandOutput(result);
+                    return result;
+                }
+            }
+
+            // If we reached here, we ran out of retries.
+            throw new Exception(
+                string.Format("HDFS Cmd {0} {1} could not be executed in the 
specified timeout & retry settings",
+                    _hdfsCommandPath, hdfsCommandLineArguments));
+        }
+
+        /// <summary>
+        /// Utility method that constructs the full absolute path to the 
<code>hdfs</code> command.
+        /// </summary>
+        /// <param name="hadoopHome"></param>
+        /// <returns></returns>
+        private static string GetFullPathToHdfsCommand(string hadoopHome)
+        {
+            return Path.Combine(Path.GetFullPath(hadoopHome), BinFolderName, 
HdfsCommandName);
+        }
+
+        /// <summary>
+        /// Constructs the path to the HDFS binary based on the HADOOP_HOME 
environment variable.
+        /// </summary>
+        /// <returns></returns>
+        private static string 
GetFullPathToHdfsCommandBasedOnEnvironmentVariable()
+        {
+            var hadoopHomeFromEnv = 
Environment.GetEnvironmentVariable(HadoopHomeEnvironmentVariableName);
+            Logger.Log(Level.Verbose, "{0} evaluated to {1}.", 
HadoopHomeEnvironmentVariableName, hadoopHomeFromEnv);
+            if (null == hadoopHomeFromEnv)
+            {
+                throw new Exception(HadoopHomeEnvironmentVariableName +
+                                    " not set and no path to the hadoop 
installation provided.");
+            }
+            return GetFullPathToHdfsCommand(hadoopHomeFromEnv);
+        }
+
+        /// <summary>
+        /// Helper method to log the command result.
+        /// </summary>
+        /// <param name="result"></param>
+        private static void LogCommandOutput(CommandResult result)
+        {
+            using (var messageBuilder = new StringWriter())
+            {
+                messageBuilder.WriteLine("OUTPUT:");
+                
messageBuilder.WriteLine("----------------------------------------");
+                foreach (var stdOut in result.StdOut)
+                {
+                    messageBuilder.WriteLine("Out:    " + stdOut);
+                }
+
+                
messageBuilder.WriteLine("----------------------------------------");
+                foreach (var stdErr in result.StdErr)
+                {
+                    messageBuilder.WriteLine("Err:    " + stdErr);
+                }
+                
messageBuilder.WriteLine("----------------------------------------");
+                Logger.Log(Level.Verbose, messageBuilder.ToString());
+            }
+        }
+
+        /// <summary>
+        /// Attempts to run a process with a timeout.
+        /// </summary>
+        /// <returns>The result of the attempt or null in case of 
timeout.</returns>
+        /// <param name="processStartInfo">The process start 
information.</param>
+        /// <param name="timeOutInMilliSeconds">Timeout for the 
process.</param>
+        /// <param name="processName">A human readable name used for logging 
purposes.</param>
+        private static CommandResult RunAttempt(ProcessStartInfo 
processStartInfo, int timeOutInMilliSeconds,
+            string processName)
+        {
+            // Setup the process.
+            var outList = new List<string>();
+            var errList = new List<string>();
+            processStartInfo.RedirectStandardError = true;
+            processStartInfo.RedirectStandardOutput = true;
+            var process = new Process
+            {
+                StartInfo = processStartInfo
+            };
+
+            process.OutputDataReceived += delegate(object sender, 
DataReceivedEventArgs e)
+            {
+                if (!string.IsNullOrWhiteSpace(e.Data))
+                {
+                    outList.Add(e.Data.Trim());
+                }
+            };
+
+            process.ErrorDataReceived += delegate(object sender, 
DataReceivedEventArgs e)
+            {
+                if (!string.IsNullOrWhiteSpace(e.Data))
+                {
+                    errList.Add(e.Data.Trim());
+                }
+            };
+            // Start it
+            process.Start();
+            process.BeginErrorReadLine();
+            process.BeginOutputReadLine();
+            Logger.Log(Level.Verbose, "Waiting for {0}ms for process `{1}` to 
finish", timeOutInMilliSeconds,
+                processName);
+
+            // Deal with timeouts
+            process.WaitForExit(timeOutInMilliSeconds);
+
+            if (process.HasExited)
+            {
+                // The happy path: Assemble an output
+                return new CommandResult(outList, errList, process.ExitCode);
+            }
+
+            // If we didn't return above, the process timed out.
+            Logger.Log(Level.Info, "The process `{0}` took longer than {1}ms 
to exit. Killing it.", processName,
+                timeOutInMilliSeconds);
+            process.Kill();
+            process.WaitForExit();
+            Logger.Log(Level.Info, "Killed process `{0}`.", processName);
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs
new file mode 100644
index 0000000..ab2f93d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop
+{
+    /// <summary>
+    /// Implements the IFileSystem interface for HDFS using external commands.
+    /// </summary>
+    /// <remarks>
+    /// Note that operations with this class are enormously slow. If you can, 
use a more native way to access the file system
+    /// in question.
+    /// </remarks>
+    /// <see 
cref="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html";
 />
+    internal sealed class HadoopFileSystem : IFileSystem
+    {
+        private static readonly Regex NoSuchFileOrDirectoryRegEx = new 
Regex("^ls: `.*': No such file or directory");
+        private static readonly Regex LsFirstLineRegex = new Regex("^Found .* 
items");
+        private readonly HdfsCommandRunner _commandRunner;
+        private readonly string _uriPrefix;
+
+        [Inject]
+        private HadoopFileSystem(HdfsCommandRunner commandRunner)
+        {
+            _commandRunner = commandRunner;
+            _uriPrefix = GetUriPrefix();
+        }
+
+        /// <summary>
+        /// The Prefix used for URIs on this FileSystem.
+        /// </summary>
+        public string UriPrefix
+        {
+            get { return _uriPrefix; }
+        }
+
+        /// <summary>
+        /// Not implemented by this IFileSystem.
+        /// </summary>
+        /// <param name="fileUri"></param>
+        /// <returns></returns>
+        public Stream Open(Uri fileUri)
+        {
+            throw new NotImplementedException(
+                "Open() is not supported by HadoopFileSystem. Use CopyToLocal 
and open the local file instead.");
+        }
+
+        /// <summary>
+        /// Not implemented by this IFileSystem.
+        /// </summary>
+        /// <param name="fileUri"></param>
+        /// <returns></returns>
+        public Stream Create(Uri fileUri)
+        {
+            throw new NotImplementedException(
+                "Create() is not supported by HadoopFileSystem. Create a local 
file and use CopyFromLocal instead.");
+        }
+
+        public void Delete(Uri fileUri)
+        {
+            // Delete the file via the hdfs command line.
+            _commandRunner.Run("dfs -rm " + fileUri);
+        }
+
+        public bool Exists(Uri fileUri)
+        {
+            // This determines the existence of a file based on the 'ls' 
command. 
+            // Ideally, we'd use the 'test' command's return value, but we did 
not find a way to access that.
+            return
+                _commandRunner.Run("dfs -ls " + fileUri).StdErr
+                    .All(line => !NoSuchFileOrDirectoryRegEx.IsMatch(line));
+        }
+
+        public void Copy(Uri sourceUri, Uri destinationUri)
+        {
+            _commandRunner.Run("dfs -cp " + sourceUri + " " + destinationUri);
+        }
+
+        public void CopyToLocal(Uri remoteFileUri, string localName)
+        {
+            _commandRunner.Run("dfs -get " + remoteFileUri + " " + localName);
+        }
+
+        public void CopyFromLocal(string localFileName, Uri remoteFileUri)
+        {
+            _commandRunner.Run("dfs -put " + localFileName + " " + 
remoteFileUri);
+        }
+
+        public void CreateDirectory(Uri directoryUri)
+        {
+            _commandRunner.Run("dfs -mkdir " + directoryUri);
+        }
+
+        public void DeleteDirectory(Uri directoryUri)
+        {
+            _commandRunner.Run("dfs -rmdir " + directoryUri);
+        }
+
+        public IEnumerable<Uri> GetChildren(Uri directoryUri)
+        {
+            return _commandRunner.Run("dfs -ls " + directoryUri)
+                .StdOut.Where(line => !LsFirstLineRegex.IsMatch(line))
+                .Select(line => line.Split())
+                .Select(x => new Uri(x[x.Length - 1]));
+        }
+
+        private string GetUriPrefix()
+        {
+            return _commandRunner.Run("getconf -confKey 
fs.default.name").StdOut.First();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs
new file mode 100644
index 0000000..0a9dcda
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop
+{
+    /// <summary>
+    /// Configuration Module for the (command based) Hadoop file system 
implementation of IFileSystem.
+    /// </summary>
+    /// <remarks>
+    /// This IFileSystem implementation is enormously slow as it spawns a new 
JVM per API call. Avoid if you have better means
+    /// of file system access.
+    /// Also, Stream-based operations are not supported.
+    /// </remarks>
+    public sealed class HadoopFileSystemConfiguration : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// The number of times each HDFS command will be retried. Defaults to 
3.
+        /// </summary>
+        public static readonly OptionalParameter<int> CommandRetries = new 
OptionalParameter<int>();
+
+        /// <summary>
+        /// The timeout (in milliseconds) for HDFS commands. Defaults to 
300000 (5 minutes).
+        /// </summary>
+        public static readonly OptionalParameter<int> CommandTimeOut = new 
OptionalParameter<int>();
+
+        /// <summary>
+        /// The folder in which Hadoop is installed. Defaults to %HADOOP_HOME%.
+        /// </summary>
+        public static readonly OptionalParameter<string> HadoopHome = new 
OptionalParameter<string>();
+
+        public static readonly ConfigurationModule ConfigurationModule = new 
HadoopFileSystemConfiguration()
+            .BindImplementation(GenericType<IFileSystem>.Class, 
GenericType<HadoopFileSystem>.Class)
+            .BindNamedParameter(GenericType<NumberOfRetries>.Class, 
CommandRetries)
+            .BindNamedParameter(GenericType<CommandTimeOut>.Class, 
CommandTimeOut)
+            .BindNamedParameter(GenericType<HadoopHome>.Class, HadoopHome)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs
new file mode 100644
index 0000000..315d5eb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters
+{
+    /// <summary>
+    /// The timeout (in milliseconds) for HDFS commands. Defaults to 300000 (5 
minutes).
+    /// </summary>
+    [NamedParameter("he timeout (in milliseconds) for HDFS commands.", 
defaultValue: "300000")]
+    internal sealed class CommandTimeOut : Name<int>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs
new file mode 100644
index 0000000..6b9f56d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters
+{
+    /// <summary>
+    /// Hadoop home to be used. Defaults to using %HADOOP_HOME%
+    /// </summary>
+    [NamedParameter("Hadoop home to be used. Defaults to using %HADOOP_HOME%", 
defaultValue: DefaultValue)]
+    internal sealed class HadoopHome : Name<string>
+    {
+        public const string DefaultValue = "###NOT_SET###";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs 
b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs
new file mode 100644
index 0000000..c29cc1e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters
+{
+    /// <summary>
+    /// Number of retries for HDFS commands.
+    /// </summary>
+    [NamedParameter("Number of retries for HDFS commands.", defaultValue: "3")]
+    internal sealed class NumberOfRetries : Name<int>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj 
b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
index 6007582..37aef05 100644
--- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
+++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
@@ -41,6 +41,13 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="FileSystem\Hadoop\CommandResult.cs" />
+    <Compile Include="FileSystem\Hadoop\HadoopFileSystemConfiguration.cs" />
+    <Compile Include="FileSystem\Hadoop\HDFSCommandRunner.cs" />
+    <Compile Include="FileSystem\Hadoop\HadoopFileSystem.cs" />
+    <Compile Include="FileSystem\Hadoop\Parameters\CommandTimeOut.cs" />
+    <Compile Include="FileSystem\Hadoop\Parameters\HadoopHome.cs" />
+    <Compile Include="FileSystem\Hadoop\Parameters\NumberOfRetries.cs" />
     <Compile Include="FileSystem\IFileSystem.cs" />
     <Compile Include="FileSystem\Local\LocalFileSystem.cs" />
     <Compile Include="FileSystem\Local\LocalFileSystemConfiguration.cs" />
@@ -64,6 +71,14 @@ under the License.
       <Project>{97DBB573-3994-417A-9F69-FFA25F00D2A6}</Project>
       <Name>Org.Apache.REEF.Tang</Name>
     </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+      <Project>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</Project>
+      <Name>Org.Apache.REEF.Utilities</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+      <Project>{545A0582-4105-44CE-B99C-B1379514A630}</Project>
+      <Name>Org.Apache.REEF.Common</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup />
 </Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs 
b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
index ca5d750..e0a0f47 100644
--- a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
@@ -18,6 +18,7 @@
  */
 
 using System.Reflection;
+using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
 
 [assembly: AssemblyTitle("Org.Apache.REEF.IO")]
@@ -31,4 +32,14 @@ using System.Runtime.InteropServices;
 [assembly: ComVisible(false)]
 [assembly: Guid("58c49df4-8dc0-4f58-9a7e-a341d33e40ee")]
 [assembly: AssemblyVersion("0.13.0.0")]
-[assembly: AssemblyFileVersion("0.13.0.0")]
\ No newline at end of file
+[assembly: AssemblyFileVersion("0.13.0.0")]
+// Allow the tests project access to `internal` APIs
+#if DEBUG
+    [assembly: InternalsVisibleTo("Org.Apache.REEF.IO.Tests")]
+#else
+[assembly: InternalsVisibleTo("Org.Apache.REEF.IO.Tests, publickey=" +
+ 
"00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9"
 +
+ 
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
+ 
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
+ 
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
+#endif 
\ No newline at end of file


Reply via email to