Repository: incubator-reef Updated Branches: refs/heads/master 4a5a6672e -> a98a5ecb5
[REEF-492] A HDFS implementation of IFileSystem This adds an `IFileSystem` implementation using the `hdfs` command available on Hadoop. It also adds tests for it, which have been disabled as they only work if HDFS is available. JIRA: [REEF-492](https://issues.apache.org/jira/browse/REEF-492) Pull Request: This closes #339 Author: Markus Weimer <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/a98a5ecb Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/a98a5ecb Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/a98a5ecb Branch: refs/heads/master Commit: a98a5ecb53ac492be38621fdf3c6c1f456f60fff Parents: 4a5a667 Author: Markus Weimer <[email protected]> Authored: Mon Jul 27 10:55:25 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Mon Aug 17 15:10:57 2015 -0700 ---------------------------------------------------------------------- .../Files/PathUtilities.cs | 55 ++++ .../Org.Apache.REEF.Common.csproj | 1 + .../FileSystemTestUtilities.cs | 78 ++++++ .../Org.Apache.REEF.IO.Tests.csproj | 2 + .../TestHadoopFileSystem.cs | 147 +++++++++++ .../FileSystem/Hadoop/CommandResult.cs | 51 ++++ .../FileSystem/Hadoop/HDFSCommandRunner.cs | 249 +++++++++++++++++++ .../FileSystem/Hadoop/HadoopFileSystem.cs | 132 ++++++++++ .../Hadoop/HadoopFileSystemConfiguration.cs | 56 +++++ .../Hadoop/Parameters/CommandTimeOut.cs | 29 +++ .../FileSystem/Hadoop/Parameters/HadoopHome.cs | 30 +++ .../Hadoop/Parameters/NumberOfRetries.cs | 29 +++ .../Org.Apache.REEF.IO.csproj | 15 ++ .../Properties/AssemblyInfo.cs | 13 +- 14 files changed, 886 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs b/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs new file mode 100644 index 0000000..863226d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Files/PathUtilities.cs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; + +namespace Org.Apache.REEF.Common.Files +{ + /// <summary> + /// Utility class for dealing with Paths + /// </summary> + public static class PathUtilities + { + /// <summary> + /// Normalizes a path for easy comparison. + /// </summary> + /// <param name="path"></param> + /// <returns></returns> + public static string NormalizePath(string path) + { + return Path + .GetFullPath(path) // Get the full path + .TrimEnd(Path.DirectorySeparatorChar, Path.AltDirectorySeparatorChar) // Remove trailing `/` and `\` + .ToUpperInvariant(); // Windows ignores cases. + } + + /// <summary> + /// Compares the two paths *after* they have been normalized using NormalizePath. + /// </summary> + /// <param name="path1"></param> + /// <param name="path2"></param> + /// <returns></returns> + public static bool AreNormalizedEquals(string path1, string path2) + { + if (null == path1 || null == path2) + { + return false; + } + return NormalizePath(path1).Equals(NormalizePath(path2)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 63843c4..91ed394 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -88,6 +88,7 @@ under the License. <Compile Include="Exceptions\EvaluatorException.cs" /> <Compile Include="Exceptions\JobException.cs" /> <Compile Include="FailedRuntime.cs" /> + <Compile Include="Files\PathUtilities.cs" /> <Compile Include="IContextAndTaskSubmittable.cs" /> <Compile Include="IContextSubmittable.cs" /> <Compile Include="IJobMessageObserver.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs b/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs new file mode 100644 index 0000000..44d622a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO.Tests/FileSystemTestUtilities.cs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; + +namespace Org.Apache.REEF.IO.Tests +{ + /// <summary> + /// Static methods helpful when testing IFileSystem implementations. + /// </summary> + internal sealed class FileSystemTestUtilities + { + /// <summary> + /// Test content used when creating file system test files. + /// </summary> + internal const byte TestByte = 123; + + /// <summary> + /// Compares the contents of the two files given. + /// </summary> + /// <param name="path1"></param> + /// <param name="path2"></param> + /// <returns></returns> + internal static bool HaveSameContent(string path1, string path2) + { + using (var s1 = File.OpenRead(path1)) + { + using (var s2 = File.OpenRead(path2)) + { + var byte1 = s1.ReadByte(); + var byte2 = s2.ReadByte(); + if (byte1 != byte2) + { + return false; + } + } + } + return true; + } + + /// <summary> + /// Creates a temp file and writes TestByte to it. + /// </summary> + /// <returns>The path to the test file.</returns> + internal static string MakeLocalTempFile() + { + var result = Path.GetTempFileName(); + MakeLocalTestFile(result); + return result; + } + + private static void MakeLocalTestFile(string filePath) + { + if (File.Exists(filePath)) + { + File.Delete(filePath); + } + using (var s = File.Create(filePath)) + { + s.WriteByte(TestByte); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj b/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj index 66971a4..704c7b9 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.IO.Tests/Org.Apache.REEF.IO.Tests.csproj @@ -40,7 +40,9 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> + <Compile Include="FileSystemTestUtilities.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="TestHadoopFileSystem.cs" /> <Compile Include="TestLocalFileSystem.cs" /> <Compile Include="TestRandomDataSet.cs" /> </ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs new file mode 100644 index 0000000..7e26231 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestHadoopFileSystem.cs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Linq; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.IO.FileSystem.Hadoop; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.IO.Tests +{ + /// <summary> + /// Tests for HadoopFileSystem. + /// </summary> + /// <see cref="HadoopFileSystem" /> + [TestClass] + [Ignore] // These tests need to be run in an environment with HDFS installed. + public sealed class TestHadoopFileSystem + { + private HadoopFileSystem _fileSystem; + + private Uri GetTempUri() + { + return + new Uri(_fileSystem.UriPrefix + "/tmp/TestHadoopFileSystem-" + + DateTime.Now.ToString("yyyyMMddHHmmssfff")); + } + + /// <summary> + /// Sets up the file system instance to be used for the tests. + /// </summary> + [TestInitialize] + public void SetupFileSystem() + { + _fileSystem = + TangFactory.GetTang() + .NewInjector(HadoopFileSystemConfiguration.ConfigurationModule.Build()) + .GetInstance<HadoopFileSystem>(); + } + + /// <summary> + /// Creates a temp file locally, uploads it to HDFS and downloads it again. + /// </summary> + [TestMethod] + public void TestCopyFromLocalAndBack() + { + var localFile = FileSystemTestUtilities.MakeLocalTempFile(); + var localFileDownloaded = localFile + ".2"; + var remoteUri = GetTempUri(); + + _fileSystem.CopyFromLocal(localFile, remoteUri); + _fileSystem.CopyToLocal(remoteUri, localFileDownloaded); + + Assert.IsTrue(message: "A file up and downloaded should exist on the local file system.", + condition: File.Exists(localFileDownloaded)); + Assert.IsTrue(message: "A file up and downloaded should not have changed content.", + condition: FileSystemTestUtilities.HaveSameContent(localFile, localFileDownloaded)); + + _fileSystem.Delete(remoteUri); + File.Delete(localFile); + File.Delete(localFileDownloaded); + } + + /// <summary> + /// Tests whether .Exists() works. + /// </summary> + [TestMethod] + public void TestExists() + { + var remoteUri = GetTempUri(); + Assert.IsFalse(message: "The file should not exist yet", condition: _fileSystem.Exists(remoteUri)); + var localFile = FileSystemTestUtilities.MakeLocalTempFile(); + _fileSystem.CopyFromLocal(localFile, remoteUri); + Assert.IsTrue(message: "The file should now exist", condition: _fileSystem.Exists(remoteUri)); + _fileSystem.Delete(remoteUri); + Assert.IsFalse(message: "The file should no longer exist", condition: _fileSystem.Exists(remoteUri)); + File.Delete(localFile); + } + + /// <summary> + /// Tests for .GetChildren(). + /// </summary> + [TestMethod] + public void TestGetChildren() + { + // Make a directory + var remoteDirectory = GetTempUri(); + _fileSystem.CreateDirectory(remoteDirectory); + // Check that it is empty + Assert.AreEqual(message: "The directory should be empty.", expected: 0, + actual: _fileSystem.GetChildren(remoteDirectory).Count()); + // Upload some localfile there + var localTempFile = FileSystemTestUtilities.MakeLocalTempFile(); + var remoteUri = new Uri(remoteDirectory, Path.GetFileName(localTempFile)); + _fileSystem.CopyFromLocal(localTempFile, remoteUri); + // Check that it is on the listing + var uriInResult = _fileSystem.GetChildren(remoteUri).First(); + Assert.AreEqual(remoteUri, uriInResult); + + // Download the file and make sure it is the same as before + var downloadedFileName = localTempFile + ".downloaded"; + _fileSystem.CopyToLocal(uriInResult, downloadedFileName); + FileSystemTestUtilities.HaveSameContent(localTempFile, downloadedFileName); + File.Delete(localTempFile); + File.Delete(downloadedFileName); + + // Delete the file + _fileSystem.Delete(remoteUri); + // Check that the folder is empty again + Assert.AreEqual(message: "The directory should be empty.", expected: 0, + actual: _fileSystem.GetChildren(remoteDirectory).Count()); + // Delete the folder + _fileSystem.DeleteDirectory(remoteDirectory); + } + + [TestMethod] + [ExpectedException(typeof(NotImplementedException), + "Open() is not supported by HadoopFileSystem. Use CopyToLocal and open the local file instead.")] + public void TestOpen() + { + _fileSystem.Open(GetTempUri()); + } + + [TestMethod] + [ExpectedException(typeof(NotImplementedException), + "Create() is not supported by HadoopFileSystem. Create a local file and use CopyFromLocal instead.")] + public void TestCreate() + { + _fileSystem.Create(GetTempUri()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs new file mode 100644 index 0000000..bfd38c7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/CommandResult.cs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop +{ + internal sealed class CommandResult + { + private readonly int _exitCode; + private readonly IList<string> _stdErr; + private readonly IList<string> _stdOut; + + internal CommandResult(List<string> stdOut, List<string> stdErr, int exitCode) + { + _stdOut = stdOut; + _stdErr = stdErr; + _exitCode = exitCode; + } + + internal IList<string> StdOut + { + get { return _stdOut; } + } + + internal IList<string> StdErr + { + get { return _stdErr; } + } + + // Note: This is almost always 0, as the way we launch the hdfs process doesn't provide an exit code. + internal int ExitCode + { + get { return _exitCode; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs new file mode 100644 index 0000000..7a10a8a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSCommandRunner.cs @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using Org.Apache.REEF.Common.Files; +using Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop +{ + /// <summary> + /// Helper class to execute commands + /// </summary> + internal sealed class HdfsCommandRunner + { + /// <summary> + /// The name of the <code>hdfs</code> command. + /// </summary> + private const string HdfsCommandName = "hdfs.cmd"; + + /// <summary> + /// The folder within <code>HadoopHome</code> that contains the <code>hdfs</code> command. + /// </summary> + private const string BinFolderName = "bin"; + + /// <summary> + /// The name of the HADOOP_HOME environment variable. + /// </summary> + private const string HadoopHomeEnvironmentVariableName = "HADOOP_HOME"; + + private static readonly Logger Logger = Logger.GetLogger(typeof(HdfsCommandRunner)); + + /// <summary> + /// Path to hdfs.cmd + /// </summary> + private readonly string _hdfsCommandPath; + + /// <summary> + /// The number of retries on HDFS commands. + /// </summary> + private readonly int _numberOfRetries; + + /// <summary> + /// The timeout on each of the retries. + /// </summary> + private readonly int _timeOutInMilliSeconds; + + /// <summary> + /// + /// </summary> + /// <param name="numberOfRetries"></param> + /// <param name="timeOutInMilliSeconds"></param> + /// <param name="hadoopHome"></param> + /// <exception cref="FileNotFoundException">If the hdfs command can't be found.</exception> + [Inject] + private HdfsCommandRunner([Parameter(typeof(NumberOfRetries))] int numberOfRetries, + [Parameter(typeof(CommandTimeOut))] int timeOutInMilliSeconds, + [Parameter(typeof(HadoopHome))] string hadoopHome) + { + _numberOfRetries = numberOfRetries; + _timeOutInMilliSeconds = timeOutInMilliSeconds; + + if (!PathUtilities.AreNormalizedEquals(hadoopHome, HadoopHome.DefaultValue)) + { + // The user provided a Hadoop Home folder. + if (Directory.Exists(hadoopHome)) + { + // The user provided folder does exist. + _hdfsCommandPath = GetFullPathToHdfsCommand(hadoopHome); + } + else + { + // The user provided folder does not exist. Try the environment variable. + Logger.Log(Level.Warning, + "The provided hadoop home folder {0} doesn't exist, trying environment variable {1} instead", + hadoopHome, HadoopHomeEnvironmentVariableName); + _hdfsCommandPath = GetFullPathToHdfsCommandBasedOnEnvironmentVariable(); + } + } + else + { + // The user did not provide a Hadoop Home folder. Use the Environment variable. + _hdfsCommandPath = GetFullPathToHdfsCommandBasedOnEnvironmentVariable(); + } + + // Make sure we found the command. + if (!File.Exists(_hdfsCommandPath)) + { + throw new FileNotFoundException("HDFS Command not found", _hdfsCommandPath); + } + } + + internal CommandResult Run(string hdfsCommandLineArguments) + { + var processStartInfo = new ProcessStartInfo + { + FileName = _hdfsCommandPath, + Arguments = hdfsCommandLineArguments, + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true + }; + for (var attemptNumber = 0; attemptNumber < _numberOfRetries; ++attemptNumber) + { + var processName = String.Format("HDFS_Attempt_{0}_of_{1}", attemptNumber, _numberOfRetries); + var result = RunAttempt(processStartInfo, _timeOutInMilliSeconds, processName); + if (null != result) + { + LogCommandOutput(result); + return result; + } + } + + // If we reached here, we ran out of retries. + throw new Exception( + string.Format("HDFS Cmd {0} {1} could not be executed in the specified timeout & retry settings", + _hdfsCommandPath, hdfsCommandLineArguments)); + } + + /// <summary> + /// Utility method that constructs the full absolute path to the <code>hdfs</code> command. + /// </summary> + /// <param name="hadoopHome"></param> + /// <returns></returns> + private static string GetFullPathToHdfsCommand(string hadoopHome) + { + return Path.Combine(Path.GetFullPath(hadoopHome), BinFolderName, HdfsCommandName); + } + + /// <summary> + /// Constructs the path to the HDFS binary based on the HADOOP_HOME environment variable. + /// </summary> + /// <returns></returns> + private static string GetFullPathToHdfsCommandBasedOnEnvironmentVariable() + { + var hadoopHomeFromEnv = Environment.GetEnvironmentVariable(HadoopHomeEnvironmentVariableName); + Logger.Log(Level.Verbose, "{0} evaluated to {1}.", HadoopHomeEnvironmentVariableName, hadoopHomeFromEnv); + if (null == hadoopHomeFromEnv) + { + throw new Exception(HadoopHomeEnvironmentVariableName + + " not set and no path to the hadoop installation provided."); + } + return GetFullPathToHdfsCommand(hadoopHomeFromEnv); + } + + /// <summary> + /// Helper method to log the command result. + /// </summary> + /// <param name="result"></param> + private static void LogCommandOutput(CommandResult result) + { + using (var messageBuilder = new StringWriter()) + { + messageBuilder.WriteLine("OUTPUT:"); + messageBuilder.WriteLine("----------------------------------------"); + foreach (var stdOut in result.StdOut) + { + messageBuilder.WriteLine("Out: " + stdOut); + } + + messageBuilder.WriteLine("----------------------------------------"); + foreach (var stdErr in result.StdErr) + { + messageBuilder.WriteLine("Err: " + stdErr); + } + messageBuilder.WriteLine("----------------------------------------"); + Logger.Log(Level.Verbose, messageBuilder.ToString()); + } + } + + /// <summary> + /// Attempts to run a process with a timeout. + /// </summary> + /// <returns>The result of the attempt or null in case of timeout.</returns> + /// <param name="processStartInfo">The process start information.</param> + /// <param name="timeOutInMilliSeconds">Timeout for the process.</param> + /// <param name="processName">A human readable name used for logging purposes.</param> + private static CommandResult RunAttempt(ProcessStartInfo processStartInfo, int timeOutInMilliSeconds, + string processName) + { + // Setup the process. + var outList = new List<string>(); + var errList = new List<string>(); + processStartInfo.RedirectStandardError = true; + processStartInfo.RedirectStandardOutput = true; + var process = new Process + { + StartInfo = processStartInfo + }; + + process.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e) + { + if (!string.IsNullOrWhiteSpace(e.Data)) + { + outList.Add(e.Data.Trim()); + } + }; + + process.ErrorDataReceived += delegate(object sender, DataReceivedEventArgs e) + { + if (!string.IsNullOrWhiteSpace(e.Data)) + { + errList.Add(e.Data.Trim()); + } + }; + // Start it + process.Start(); + process.BeginErrorReadLine(); + process.BeginOutputReadLine(); + Logger.Log(Level.Verbose, "Waiting for {0}ms for process `{1}` to finish", timeOutInMilliSeconds, + processName); + + // Deal with timeouts + process.WaitForExit(timeOutInMilliSeconds); + + if (process.HasExited) + { + // The happy path: Assemble an output + return new CommandResult(outList, errList, process.ExitCode); + } + + // If we didn't return above, the process timed out. + Logger.Log(Level.Info, "The process `{0}` took longer than {1}ms to exit. Killing it.", processName, + timeOutInMilliSeconds); + process.Kill(); + process.WaitForExit(); + Logger.Log(Level.Info, "Killed process `{0}`.", processName); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs new file mode 100644 index 0000000..ab2f93d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystem.cs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop +{ + /// <summary> + /// Implements the IFileSystem interface for HDFS using external commands. + /// </summary> + /// <remarks> + /// Note that operations with this class are enormously slow. If you can, use a more native way to access the file system + /// in question. + /// </remarks> + /// <see cref="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html" /> + internal sealed class HadoopFileSystem : IFileSystem + { + private static readonly Regex NoSuchFileOrDirectoryRegEx = new Regex("^ls: `.*': No such file or directory"); + private static readonly Regex LsFirstLineRegex = new Regex("^Found .* items"); + private readonly HdfsCommandRunner _commandRunner; + private readonly string _uriPrefix; + + [Inject] + private HadoopFileSystem(HdfsCommandRunner commandRunner) + { + _commandRunner = commandRunner; + _uriPrefix = GetUriPrefix(); + } + + /// <summary> + /// The Prefix used for URIs on this FileSystem. + /// </summary> + public string UriPrefix + { + get { return _uriPrefix; } + } + + /// <summary> + /// Not implemented by this IFileSystem. + /// </summary> + /// <param name="fileUri"></param> + /// <returns></returns> + public Stream Open(Uri fileUri) + { + throw new NotImplementedException( + "Open() is not supported by HadoopFileSystem. Use CopyToLocal and open the local file instead."); + } + + /// <summary> + /// Not implemented by this IFileSystem. + /// </summary> + /// <param name="fileUri"></param> + /// <returns></returns> + public Stream Create(Uri fileUri) + { + throw new NotImplementedException( + "Create() is not supported by HadoopFileSystem. Create a local file and use CopyFromLocal instead."); + } + + public void Delete(Uri fileUri) + { + // Delete the file via the hdfs command line. + _commandRunner.Run("dfs -rm " + fileUri); + } + + public bool Exists(Uri fileUri) + { + // This determines the existence of a file based on the 'ls' command. + // Ideally, we'd use the 'test' command's return value, but we did not find a way to access that. + return + _commandRunner.Run("dfs -ls " + fileUri).StdErr + .All(line => !NoSuchFileOrDirectoryRegEx.IsMatch(line)); + } + + public void Copy(Uri sourceUri, Uri destinationUri) + { + _commandRunner.Run("dfs -cp " + sourceUri + " " + destinationUri); + } + + public void CopyToLocal(Uri remoteFileUri, string localName) + { + _commandRunner.Run("dfs -get " + remoteFileUri + " " + localName); + } + + public void CopyFromLocal(string localFileName, Uri remoteFileUri) + { + _commandRunner.Run("dfs -put " + localFileName + " " + remoteFileUri); + } + + public void CreateDirectory(Uri directoryUri) + { + _commandRunner.Run("dfs -mkdir " + directoryUri); + } + + public void DeleteDirectory(Uri directoryUri) + { + _commandRunner.Run("dfs -rmdir " + directoryUri); + } + + public IEnumerable<Uri> GetChildren(Uri directoryUri) + { + return _commandRunner.Run("dfs -ls " + directoryUri) + .StdOut.Where(line => !LsFirstLineRegex.IsMatch(line)) + .Select(line => line.Split()) + .Select(x => new Uri(x[x.Length - 1])); + } + + private string GetUriPrefix() + { + return _commandRunner.Run("getconf -confKey fs.default.name").StdOut.First(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs new file mode 100644 index 0000000..0a9dcda --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HadoopFileSystemConfiguration.cs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop +{ + /// <summary> + /// Configuration Module for the (command based) Hadoop file system implementation of IFileSystem. + /// </summary> + /// <remarks> + /// This IFileSystem implementation is enormously slow as it spawns a new JVM per API call. Avoid if you have better means + /// of file system access. + /// Also, Stream-based operations are not supported. + /// </remarks> + public sealed class HadoopFileSystemConfiguration : ConfigurationModuleBuilder + { + /// <summary> + /// The number of times each HDFS command will be retried. Defaults to 3. + /// </summary> + public static readonly OptionalParameter<int> CommandRetries = new OptionalParameter<int>(); + + /// <summary> + /// The timeout (in milliseconds) for HDFS commands. Defaults to 300000 (5 minutes). + /// </summary> + public static readonly OptionalParameter<int> CommandTimeOut = new OptionalParameter<int>(); + + /// <summary> + /// The folder in which Hadoop is installed. Defaults to %HADOOP_HOME%. + /// </summary> + public static readonly OptionalParameter<string> HadoopHome = new OptionalParameter<string>(); + + public static readonly ConfigurationModule ConfigurationModule = new HadoopFileSystemConfiguration() + .BindImplementation(GenericType<IFileSystem>.Class, GenericType<HadoopFileSystem>.Class) + .BindNamedParameter(GenericType<NumberOfRetries>.Class, CommandRetries) + .BindNamedParameter(GenericType<CommandTimeOut>.Class, CommandTimeOut) + .BindNamedParameter(GenericType<HadoopHome>.Class, HadoopHome) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs new file mode 100644 index 0000000..315d5eb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/CommandTimeOut.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters +{ + /// <summary> + /// The timeout (in milliseconds) for HDFS commands. Defaults to 300000 (5 minutes). + /// </summary> + [NamedParameter("he timeout (in milliseconds) for HDFS commands.", defaultValue: "300000")] + internal sealed class CommandTimeOut : Name<int> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs new file mode 100644 index 0000000..6b9f56d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/HadoopHome.cs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters +{ + /// <summary> + /// Hadoop home to be used. Defaults to using %HADOOP_HOME% + /// </summary> + [NamedParameter("Hadoop home to be used. Defaults to using %HADOOP_HOME%", defaultValue: DefaultValue)] + internal sealed class HadoopHome : Name<string> + { + public const string DefaultValue = "###NOT_SET###"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs new file mode 100644 index 0000000..c29cc1e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/Parameters/NumberOfRetries.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters +{ + /// <summary> + /// Number of retries for HDFS commands. + /// </summary> + [NamedParameter("Number of retries for HDFS commands.", defaultValue: "3")] + internal sealed class NumberOfRetries : Name<int> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj index 6007582..37aef05 100644 --- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj +++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj @@ -41,6 +41,13 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> + <Compile Include="FileSystem\Hadoop\CommandResult.cs" /> + <Compile Include="FileSystem\Hadoop\HadoopFileSystemConfiguration.cs" /> + <Compile Include="FileSystem\Hadoop\HDFSCommandRunner.cs" /> + <Compile Include="FileSystem\Hadoop\HadoopFileSystem.cs" /> + <Compile Include="FileSystem\Hadoop\Parameters\CommandTimeOut.cs" /> + <Compile Include="FileSystem\Hadoop\Parameters\HadoopHome.cs" /> + <Compile Include="FileSystem\Hadoop\Parameters\NumberOfRetries.cs" /> <Compile Include="FileSystem\IFileSystem.cs" /> <Compile Include="FileSystem\Local\LocalFileSystem.cs" /> <Compile Include="FileSystem\Local\LocalFileSystemConfiguration.cs" /> @@ -64,6 +71,14 @@ under the License. <Project>{97DBB573-3994-417A-9F69-FFA25F00D2A6}</Project> <Name>Org.Apache.REEF.Tang</Name> </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> + <Project>{545A0582-4105-44CE-B99C-B1379514A630}</Project> + <Name>Org.Apache.REEF.Common</Name> + </ProjectReference> </ItemGroup> <ItemGroup /> </Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a98a5ecb/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs index ca5d750..e0a0f47 100644 --- a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs @@ -18,6 +18,7 @@ */ using System.Reflection; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; [assembly: AssemblyTitle("Org.Apache.REEF.IO")] @@ -31,4 +32,14 @@ using System.Runtime.InteropServices; [assembly: ComVisible(false)] [assembly: Guid("58c49df4-8dc0-4f58-9a7e-a341d33e40ee")] [assembly: AssemblyVersion("0.13.0.0")] -[assembly: AssemblyFileVersion("0.13.0.0")] \ No newline at end of file +[assembly: AssemblyFileVersion("0.13.0.0")] +// Allow the tests project access to `internal` APIs +#if DEBUG + [assembly: InternalsVisibleTo("Org.Apache.REEF.IO.Tests")] +#else +[assembly: InternalsVisibleTo("Org.Apache.REEF.IO.Tests, publickey=" + + "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] +#endif \ No newline at end of file
