Repository: incubator-reef Updated Branches: refs/heads/master 6eefbf1cc -> 189c5acba
[REEF-663] Make Http Endpoint available for client after submit - This PR is to return HttpEndPoint to the client after Job is submitted - At Java side, the http endpoint is written to JobSubmission folder in HDFS and Driver folder - At REEFClient side, HttpClientHelper is added to pull the http endpoint from the file and return it in Submit() - E2e test cases are updated to get the URI and verify on it. - The new Submit API is noted as [Ustable] as we might change it in future - The change is part of the change in tlc branch with some modifications JIRA: [REEF-663](https://issues.apache.org/jira/browse/REEF-663) Pull Request: This closes #505 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/189c5acb Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/189c5acb Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/189c5acb Branch: refs/heads/master Commit: 189c5acba1bb5a1f8e1393a02dbe643676f959fe Parents: 6eefbf1 Author: Julia Wang <[email protected]> Authored: Fri Sep 18 12:47:01 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Sep 18 21:10:52 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Client/API/IREEFClient.cs | 13 + .../Common/HttpClientHelper.cs | 344 +++++++++++++++++++ .../Common/IDriverHttpEndpoint.cs | 31 ++ .../Org.Apache.REEF.Client/Local/LocalClient.cs | 53 ++- .../Org.Apache.REEF.Client.csproj | 7 + .../Properties/AssemblyInfo.cs | 7 + .../Org.Apache.REEF.Client/YARN/YARNClient.cs | 33 +- .../Files/REEFFileNames.cs | 8 + .../AllHandlers.cs | 11 +- .../Functional/Bridge/TestBridgeClient.cs | 13 +- .../Functional/ReefFunctionalTest.cs | 38 +- .../apache/reef/bridge/client/LocalClient.java | 8 +- .../bridge/client/LocalSubmissionFromCS.java | 2 + .../bridge/client/YarnJobSubmissionClient.java | 66 +++- .../reef/javabridge/generic/JobDriver.java | 21 ++ .../runtime/common/files/REEFFileNames.java | 10 +- .../client/HDInsightDriverConfiguration.java | 2 +- .../yarn/client/YarnSubmissionHelper.java | 8 + .../yarn/driver/UploaderToJobfolder.java | 2 +- .../yarn/driver/YarnContainerManager.java | 17 +- .../yarn/driver/YarnDriverConfiguration.java | 2 +- .../parameters/JobSubmissionDirectory.java | 5 +- 22 files changed, 669 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs index e4f4c88..74b8d95 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs @@ -17,6 +17,10 @@ * under the License. */ +using System; +using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Common.Attributes; + namespace Org.Apache.REEF.Client.API { /// <summary> @@ -30,5 +34,14 @@ namespace Org.Apache.REEF.Client.API /// </summary> /// <param name="jobSubmission"></param> void Submit(IJobSubmission jobSubmission); + + /// <summary> + /// Submit the job described in jobSubmission to the cluster. + /// Expect IDriverHttpEndpoint returned after the call. + /// </summary> + /// <param name="jobSubmission"></param> + /// <returns>IDriverHttpEndpoint</returns> + [Unstable("0.13", "Working in progress for what to return after submit")] + IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs b/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs new file mode 100644 index 0000000..1a00493 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Newtonsoft.Json; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Client.Common +{ + internal class HttpClientHelper : IDriverHttpEndpoint + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof (HttpClientHelper)); + private const int MaxConnectAttemptCount = 20; + private const int MilliSecondsToWaitBeforeNextConnectAttempt = 1000; + private const int SecondsForHttpClientTimeout = 120; + private const string UnAssigned = "UNASSIGNED"; + private const string TrackingUrlKey = "trackingUrl"; + private const string AppKey = "app"; + private const string ThisIsStandbyRm = "This is standby RM"; + private const string AppJson = "application/json"; + + private string _driverUrl; + + private readonly HttpClient _client; + + internal HttpClientHelper() + { + _client = new HttpClient + { + Timeout = TimeSpan.FromSeconds(SecondsForHttpClientTimeout), + }; + _client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(AppJson)); + } + + public string DriverUrl { get { return _driverUrl; } } + + public string GetUrlResult(string url) + { + var task = Task.Run(() => CallUrl(url)); + task.Wait(); + return task.Result; + } + + enum UrlResultKind + { + WasNotAbleToTalkToRm, + BackupRm, + AppIdNotThereYet, + UrlNotAssignedYet, + GotAppIdUrl, + } + + internal static List<string> GetRmUri(string filePath) + { + using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))) + { + sr.ReadLine(); // appid + sr.ReadLine(); // trackingUrl + var rmList = new List<string>(); + var rmUri = sr.ReadLine(); + while (rmUri != null) + { + rmList.Add(rmUri); + rmUri = sr.ReadLine(); + } + return rmList; + } + } + + internal static string GetAppId(string filePath) + { + using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))) + { + var appId = sr.ReadLine(); + return appId; + } + } + + internal static string GetTrackingUrl(string filePath) + { + using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))) + { + sr.ReadLine(); // appid + var trackingUrl = sr.ReadLine(); + return "http://" + trackingUrl + "/"; + } + } + + internal async Task<string> CallUrl (string url) + { + var result = await TryGetUri(url); + if (HasCommandFailed(result)) + { + return null; + } + LOGGER.Log(Level.Warning, "CallUrl result " + result.Item2); + return result.Item2; + } + + internal string GetDriverUrlForYarn(String filePath) + { + _driverUrl = GetTrackingUrl(filePath); + return _driverUrl; + } + + internal string GetDriverUrlForLocalRuntime(string filePath) + { + _driverUrl = null; + for (int i = 0; i < 10; i++) + { + var driverUrl = TryReadHttpServerIpAndPortFromFile(filePath); + if (!string.IsNullOrEmpty(driverUrl)) + { + _driverUrl = "http://" + driverUrl + "/"; + break; + } + Thread.Sleep(1000); + } + return _driverUrl; + } + + private string TryReadHttpServerIpAndPortFromFile(String fileName) + { + string httpServerIpAndPort = null; + try + { + LOGGER.Log(Level.Info, "try open " + fileName); + using (var rdr = new StreamReader(File.OpenRead(fileName))) + { + httpServerIpAndPort = rdr.ReadLine(); + LOGGER.Log(Level.Info, "httpServerIpAndPort is " + httpServerIpAndPort); + } + } + catch (FileNotFoundException) + { + LOGGER.Log(Level.Info, "File does not exist: " + fileName); + } + return httpServerIpAndPort; + } + + internal async Task<string> GetAppIdTrackingUrl(string url) + { + var result = await TryGetUri(url); + if (HasCommandFailed(result) || + result.Item2 == null) + { + return null; + } + + LOGGER.Log(Level.Info, "GetAppIdTrackingUrl: " + result.Item2); + return result.Item2; + } + + private static bool ShouldRetry(HttpRequestException httpRequestException) + { + var shouldRetry = false; + if (httpRequestException.Message.IndexOf(((int)(HttpStatusCode.NotFound)).ToString(), StringComparison.Ordinal) != -1 || + httpRequestException.Message.IndexOf(((int)(HttpStatusCode.BadGateway)).ToString(), StringComparison.Ordinal) != -1) + { + shouldRetry = true; + } + else + { + var webException = httpRequestException.InnerException as System.Net.WebException; + if (webException != null) + { + if (webException.Status == System.Net.WebExceptionStatus.ConnectFailure) + { + shouldRetry = true; + } + } + } + return shouldRetry; + } + + private static Tuple<bool, string> CommandFailed(String reason) + { + return new Tuple<bool, string>(false, null); + } + + private static Tuple<bool, string> CommandSucceeded(string commandResult) + { + return new Tuple<bool, string>(true, commandResult); + } + + private bool HasCommandFailed(Tuple<bool, string> httpCallResult) + { + return !httpCallResult.Item1; + } + + internal async Task<Tuple<bool, string>> TryGetUri(string commandUri) + { + var connectAttemptCount = 0; + Tuple<bool, string> result; + + while (true) + { + try + { + string strResult = null; + LOGGER.Log(Level.Warning, "Try url [" + commandUri + "] connectAttemptCount " + connectAttemptCount + "."); + strResult = await _client.GetStringAsync(commandUri); + result = CommandSucceeded(strResult); + LOGGER.Log(Level.Warning, "Connection succeeded. connectAttemptCount was " + connectAttemptCount + "."); + break; + } + catch (HttpRequestException httpRequestException) + { + if (!ShouldRetry(httpRequestException)) + { + LOGGER.Log(Level.Error, + commandUri + " exception " + httpRequestException.Message + "\n" + + httpRequestException.StackTrace); + result = CommandFailed(httpRequestException.Message); + LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + "."); + break; + } + } + catch (Exception ex) + { + LOGGER.Log(Level.Error, commandUri + " exception " + ex.Message + "\n" + ex.StackTrace); + result = CommandFailed(ex.Message); + LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + "."); + break; + } + + ++connectAttemptCount; + if (connectAttemptCount >= MaxConnectAttemptCount) + { + result = CommandFailed("Could not connect to " + commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts."); + LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + "."); + break; + } + + Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt); + } + + return result; + } + + internal async Task<string> TryUntilNoConnection(string commandUri) + { + var connectAttemptCount = 0; + while (true) + { + try + { + var strResult = await _client.GetStringAsync(commandUri); + LOGGER.Log(Level.Info, + "Connection succeeded. connectAttemptCount was " + connectAttemptCount + "."); + } + catch (HttpRequestException httpRequestException) + { + LOGGER.Log(Level.Info, httpRequestException.Message); + break; + } + catch (Exception e) + { + LOGGER.Log(Level.Info, e.Message); + break; + } + + ++connectAttemptCount; + if (connectAttemptCount >= MaxConnectAttemptCount) + { + LOGGER.Log(Level.Info, "Can still connect to " + commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts."); + break; + } + + Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt); + } + + return null; + } + + private static bool ShouldRetry(HttpStatusCode httpStatusCode) + { + return httpStatusCode == HttpStatusCode.NotFound; + } + + private UrlResultKind CheckUrlAttempt(string result) + { + UrlResultKind resultKind = UrlResultKind.WasNotAbleToTalkToRm; + if (string.IsNullOrEmpty(result)) + { + resultKind = UrlResultKind.WasNotAbleToTalkToRm; + } + else if (result.StartsWith(ThisIsStandbyRm)) + { + resultKind = UrlResultKind.BackupRm; + } + else + { + dynamic deserializedValue = JsonConvert.DeserializeObject(result); + var values = deserializedValue[AppKey]; + if (values == null || values[TrackingUrlKey] == null) + { + resultKind = UrlResultKind.AppIdNotThereYet; + } + else + { + _driverUrl = values[TrackingUrlKey].ToString(); + LOGGER.Log(Level.Info, "trackingUrl[" + _driverUrl + "]"); + + if (0 == String.Compare(_driverUrl, UnAssigned)) + { + resultKind = UrlResultKind.UrlNotAssignedYet; + } + else + { + resultKind = UrlResultKind.GotAppIdUrl; + } + + } + } + + LOGGER.Log(Level.Info, "CheckUrlAttempt " + resultKind); + return resultKind; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs b/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs new file mode 100644 index 0000000..c128ad0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Attributes; + +namespace Org.Apache.REEF.Client.Common +{ + [Unstable("0.13", "Working in progress for what to return after submit")] + public interface IDriverHttpEndpoint + { + string GetUrlResult(string url); + + string DriverUrl { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs index 518533e..af2d47b 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs @@ -20,9 +20,11 @@ using System; using System.IO; using System.Linq; +using System.Threading.Tasks; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Local.Parameters; +using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; @@ -49,29 +51,37 @@ namespace Org.Apache.REEF.Client.Local private readonly JavaClientLauncher _javaClientLauncher; private readonly int _numberOfEvaluators; private readonly string _runtimeFolder; + private string _driverUrl; + private REEFFileNames _fileNames; [Inject] private LocalClient(DriverFolderPreparationHelper driverFolderPreparationHelper, [Parameter(typeof(LocalRuntimeDirectory))] string runtimeFolder, - [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, JavaClientLauncher javaClientLauncher) + [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, + JavaClientLauncher javaClientLauncher, + REEFFileNames fileNames) { _driverFolderPreparationHelper = driverFolderPreparationHelper; _runtimeFolder = runtimeFolder; _numberOfEvaluators = numberOfEvaluators; _javaClientLauncher = javaClientLauncher; + _fileNames = fileNames; } /// <summary> /// Uses Path.GetTempPath() as the runtime execution folder. /// </summary> /// <param name="driverFolderPreparationHelper"></param> - /// <param name="reefJarPath"></param> /// <param name="numberOfEvaluators"></param> + /// <param name="javaClientLauncher"></param> + /// <param name="fileNames"></param> [Inject] private LocalClient( DriverFolderPreparationHelper driverFolderPreparationHelper, - [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, JavaClientLauncher javaClientLauncher) - : this(driverFolderPreparationHelper, Path.GetTempPath(), numberOfEvaluators, javaClientLauncher) + [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, + JavaClientLauncher javaClientLauncher, + REEFFileNames fileNames) + : this(driverFolderPreparationHelper, Path.GetTempPath(), numberOfEvaluators, javaClientLauncher, fileNames) { // Intentionally left blank. } @@ -99,6 +109,41 @@ namespace Org.Apache.REEF.Client.Local Logger.Log(Level.Info, "Submitted the Driver for execution."); } + public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission) + { + // Prepare the job submission folder + var jobFolder = CreateJobFolder(jobSubmission.JobIdentifier); + var driverFolder = Path.Combine(jobFolder, DriverFolderName); + Logger.Log(Level.Info, "Preparing driver folder in " + driverFolder); + + _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolder); + + //TODO: Remove this when we have a generalized way to pass config to java + var javaParams = TangFactory.GetTang() + .NewInjector(jobSubmission.DriverConfigurations.ToArray()) + .GetInstance<ClrClient2JavaClientCuratedParameters>(); + + Task.Run(() => + _javaClientLauncher.Launch(JavaClassName, driverFolder, jobSubmission.JobIdentifier, + _numberOfEvaluators.ToString(), + javaParams.TcpPortRangeStart.ToString(), + javaParams.TcpPortRangeCount.ToString(), + javaParams.TcpPortRangeTryCount.ToString() + )); + + var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint); + HttpClientHelper helper = new HttpClientHelper(); + _driverUrl = helper.GetDriverUrlForLocalRuntime(fileName); + + Logger.Log(Level.Info, "Submitted the Driver for execution. Returned driverUrl is: " + _driverUrl); + return helper; + } + + public string DriverUrl + { + get { return _driverUrl; } + } + /// <summary> /// Creates the temporary directory to hold the job submission. /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index ee19ca6..a21c2e2 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -37,10 +37,15 @@ under the License. <UseVSHostingProcess>false</UseVSHostingProcess> </PropertyGroup> <ItemGroup> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="Newtonsoft.Json"> + <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath> + </Reference> <Reference Include="System" /> <Reference Include="System.Core" /> <Reference Include="System.IO.Compression" /> <Reference Include="System.IO.Compression.FileSystem" /> + <Reference Include="System.Net.Http" /> </ItemGroup> <ItemGroup> <Compile Include="API\ClientFactory.cs" /> @@ -58,6 +63,8 @@ under the License. <Compile Include="Common\ClrClient2JavaClientCuratedParameters.cs" /> <Compile Include="Common\DriverFolderPreparationHelper.cs" /> <Compile Include="Common\FileSets.cs" /> + <Compile Include="Common\HttpClientHelper.cs" /> + <Compile Include="Common\IDriverHttpEndpoint.cs" /> <Compile Include="Common\JavaClientLauncher.cs" /> <Compile Include="Common\ResourceHelper.cs" /> <Compile Include="Local\LocalClient.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs index 8ea0bff..b503884 100644 --- a/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs @@ -18,6 +18,7 @@ */ using System.Reflection; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following @@ -52,3 +53,9 @@ using System.Runtime.InteropServices; // [assembly: AssemblyVersion("1.0.*")] [assembly: AssemblyVersion("0.13.0.0")] [assembly: AssemblyFileVersion("0.13.0.0")] + +[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" + + "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs index f7e27ad..cda887d 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs @@ -22,6 +22,7 @@ using System.IO; using System.Linq; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; @@ -38,15 +39,19 @@ namespace Org.Apache.REEF.Client.YARN private static readonly Logger Logger = Logger.GetLogger(typeof(YARNClient)); private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper; private readonly JavaClientLauncher _javaClientLauncher; + private String _driverUrl; + private REEFFileNames _fileNames; [Inject] internal YARNClient(JavaClientLauncher javaClientLauncher, DriverFolderPreparationHelper driverFolderPreparationHelper, + REEFFileNames fileNames, YarnCommandLineEnvironment yarn) { _javaClientLauncher = javaClientLauncher; _javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList()); _driverFolderPreparationHelper = driverFolderPreparationHelper; + _fileNames = fileNames; } public void Submit(IJobSubmission jobSubmission) @@ -55,6 +60,27 @@ namespace Org.Apache.REEF.Client.YARN var driverFolderPath = CreateDriverFolder(jobSubmission.JobIdentifier); Logger.Log(Level.Info, "Preparing driver folder in " + driverFolderPath); + Launch(jobSubmission, driverFolderPath); + } + + public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission) + { + // Prepare the job submission folder + var driverFolderPath = CreateDriverFolder(jobSubmission.JobIdentifier); + Logger.Log(Level.Info, "Preparing driver folder in " + driverFolderPath); + + Launch(jobSubmission, driverFolderPath); + + var pointerFileName = Path.Combine(driverFolderPath, _fileNames.DriverHttpEndpoint); + + var httpClient = new HttpClientHelper(); + _driverUrl = httpClient.GetDriverUrlForYarn(pointerFileName); + + return httpClient; + } + + private void Launch(IJobSubmission jobSubmission, string driverFolderPath) + { _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolderPath); //TODO: Remove this when we have a generalized way to pass config to java @@ -71,7 +97,12 @@ namespace Org.Apache.REEF.Client.YARN javaParams.MaxApplicationSubmissions.ToString(), javaParams.DriverRestartEvaluatorRecoverySeconds.ToString() ); - Logger.Log(Level.Info, "Submitted the Driver for execution."); + Logger.Log(Level.Info, "Submitted the Driver for execution." + jobSubmission.JobIdentifier); + } + + public string DriverUrl + { + get { return _driverUrl; } } /// <summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs index 5316301..15ddd02 100644 --- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs +++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs @@ -19,6 +19,7 @@ using System.Diagnostics.CodeAnalysis; using System.IO; +using Org.Apache.REEF.Common.Attributes; using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Common.Files @@ -48,6 +49,7 @@ namespace Org.Apache.REEF.Common.Files private const string DRIVER_CONFIGURATION_NAME = "driver.conf"; private const string EVALUATOR_CONFIGURATION_NAME = "evaluator.conf"; private const string CLR_DRIVER_CONFIGURATION_NAME = "clrdriver.conf"; + private const string DRIVER_HTTP_ENDPOINT_FILE_NAME = "DriverHttpEndpoint.txt"; private const string BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe"; private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config"; @@ -229,6 +231,12 @@ namespace Org.Apache.REEF.Common.Files return Path.Combine(REEF_BASE_FOLDER, BRIDGE_EXE_CONFIG_NAME); } + /// <summary> + /// </summary> + /// <returns>File name that contains the dfs path for the DriverHttpEndpoint</returns> + [Unstable("0.13", "Working in progress for what to return after submit")] + public string DriverHttpEndpoint { get { return DRIVER_HTTP_ENDPOINT_FILE_NAME; } } + private static readonly string GLOBAL_FOLDER_PATH = Path.Combine(REEF_BASE_FOLDER, GLOBAL_FOLDER); private static readonly string LOCAL_FOLDER_PATH = Path.Combine(REEF_BASE_FOLDER, LOCAL_FOLDER); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs index 27a553b..0eeb7d5 100644 --- a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs @@ -20,6 +20,7 @@ using System; using System.IO; using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Local; using Org.Apache.REEF.Client.YARN; using Org.Apache.REEF.Common.Evaluator; @@ -50,7 +51,7 @@ namespace Org.Apache.REEF.Examples.AllHandlers _jobSubmissionBuilderFactory = jobSubmissionBuilderFactory; } - private void Run() + private IDriverHttpEndpoint Run() { var helloDriverConfiguration = DriverConfiguration.ConfigurationModule .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<HelloAllocatedEvaluatorHandler>.Class) @@ -82,7 +83,8 @@ namespace Org.Apache.REEF.Examples.AllHandlers .SetJobIdentifier("HelloDriver") .Build(); - _reefClient.Submit(helloJobSubmission); + IDriverHttpEndpoint driverHttpEndpoint = _reefClient.SubmitAndGetDriverUrl(helloJobSubmission); + return driverHttpEndpoint; } /// <summary></summary> @@ -122,11 +124,12 @@ namespace Org.Apache.REEF.Examples.AllHandlers /// args[0] specify either running local or YARN. Default is local /// args[1] specify running folder. Default is REEF_LOCAL_RUNTIME /// </remarks> - public static void Run(string[] args) + public static IDriverHttpEndpoint Run(string[] args) { string runOnYarn = args.Length > 0 ? args[0] : Local; string runtimeFolder = args.Length > 1 ? args[1] : "REEF_LOCAL_RUNTIME"; - TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, runtimeFolder)).GetInstance<AllHandlers>().Run(); + IDriverHttpEndpoint driverEndPoint = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, runtimeFolder)).GetInstance<AllHandlers>().Run(); + return driverEndPoint; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs index aa582c4..6a4e15c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs @@ -19,6 +19,7 @@ using System; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Examples.AllHandlers; using Org.Apache.REEF.Utilities.Logging; @@ -62,11 +63,19 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge RunClrBridgeClient(false, testRuntimeFolder); } - private void RunClrBridgeClient(bool runOnYarn, string testRuntimeFolder) + private async void RunClrBridgeClient(bool runOnYarn, string testRuntimeFolder) { string[] a = new[] { runOnYarn ? "yarn" : "local", testRuntimeFolder }; - AllHandlers.Run(a); + IDriverHttpEndpoint driverHttpEndpoint = AllHandlers.Run(a); + + var uri = driverHttpEndpoint.DriverUrl + "NRT/status?a=1&b=2"; + var strStatus = driverHttpEndpoint.GetUrlResult(uri); + Assert.IsTrue(strStatus.Equals("Byte array returned from HelloHttpHandler in CLR!!!\r\n")); + + await ((HttpClientHelper)driverHttpEndpoint).TryUntilNoConnection(uri); + ValidateSuccessForLocalRuntime(2, testRuntimeFolder); + CleanUp(testRuntimeFolder); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index e19f53f..0a8935d 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -51,6 +51,7 @@ namespace Org.Apache.REEF.Tests.Functional private const string Local = "local"; private const string YARN = "yarn"; + private const int SleepTime = 1000; private readonly static Logger Logger = Logger.GetLogger(typeof(ReefFunctionalTest)); private const string StorageAccountKeyEnvironmentVariable = "REEFTestStorageAccountKey"; @@ -139,14 +140,35 @@ namespace Org.Apache.REEF.Tests.Functional const string successIndication = "EXIT: ActiveContextClr2Java::Close"; const string failedTaskIndication = "Java_com_microsoft_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext"; const string failedEvaluatorIndication = "Java_com_microsoft_reef_javabridge_NativeInterop_clrSystemFailedEvaluatorHandlerOnNext"; - string[] lines = File.ReadAllLines(GetLogFile(_stdout, testFolder)); - Console.WriteLine("Lines read from log file : " + lines.Count()); - string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray(); - string[] failedTaskIndicators = lines.Where(s => s.Contains(failedTaskIndication)).ToArray(); - string[] failedIndicators = lines.Where(s => s.Contains(failedEvaluatorIndication)).ToArray(); - Assert.IsTrue(successIndicators.Count() == numberOfEvaluatorsToClose); - Assert.IsFalse(failedTaskIndicators.Any()); - Assert.IsFalse(failedIndicators.Any()); + string[] lines = null; + for (int i = 0; i < 60; i++) + { + try + { + lines = File.ReadAllLines(GetLogFile(_stdout, testFolder)); + break; + } + catch (Exception) + { + Thread.Sleep(SleepTime); + } + } + + if (lines != null) + { + Console.WriteLine("Lines read from log file : " + lines.Count()); + string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray(); + string[] failedTaskIndicators = lines.Where(s => s.Contains(failedTaskIndication)).ToArray(); + string[] failedIndicators = lines.Where(s => s.Contains(failedEvaluatorIndication)).ToArray(); + Assert.IsTrue(successIndicators.Count() == numberOfEvaluatorsToClose); + Assert.IsFalse(failedTaskIndicators.Any()); + Assert.IsFalse(failedIndicators.Any()); + } + else + { + Console.WriteLine("Cannot read from log file"); + Assert.Fail(); + } } protected void PeriodicUploadLog(object source, ElapsedEventArgs e) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java index 19a4055..7a08ee2 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java @@ -19,6 +19,7 @@ package org.apache.reef.bridge.client; import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.driver.parameters.JobSubmissionDirectory; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.local.client.DriverConfigurationProvider; @@ -79,8 +80,11 @@ public final class LocalClient { final Configuration providedConfigurations = configurationBuilder.build(); final Configuration driverConfiguration = Configurations.merge( driverConfiguration1, + Tang.Factory.getTang() + .newConfigurationBuilder() + .bindNamedParameter(JobSubmissionDirectory.class, driverFolder.toString()) + .build(), providedConfigurations); - final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath()); configurationSerializer.toFile(driverConfiguration, driverConfigurationFile); launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID); @@ -97,4 +101,4 @@ public final class LocalClient { client.submit(localSubmissionFromCS); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java index 58c5fea..ee37590 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java @@ -20,6 +20,7 @@ package org.apache.reef.bridge.client; import org.apache.commons.lang.Validate; import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.driver.parameters.JobSubmissionDirectory; import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; @@ -93,6 +94,7 @@ final class LocalSubmissionFromCS { .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) + .bindNamedParameter(JobSubmissionDirectory.class, runtimeRootFolder.getAbsolutePath()) .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList) .build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index be5c24b..fa90602 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -18,6 +18,7 @@ */ package org.apache.reef.bridge.client; +import org.apache.hadoop.fs.*; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -43,11 +44,14 @@ import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.JARFileMaker; - import javax.inject.Inject; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.FileWriter; import java.util.List; import java.util.Set; import java.util.logging.Level; @@ -173,7 +177,6 @@ public final class YarnJobSubmissionClient { try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) { - // ------------------------------------------------------------------------ // Prepare the JAR final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId()); @@ -183,7 +186,6 @@ public final class YarnJobSubmissionClient { final File jarFile = makeJar(yarnSubmission.getDriverFolder()); LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile); - // ------------------------------------------------------------------------ // Upload the JAR LOG.info("Uploading job submission JAR"); @@ -207,7 +209,65 @@ public final class YarnJobSubmissionClient { } catch (InjectionException ie) { throw new RuntimeException("Unable to submit job due to " + ie); } + writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), + submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); + } + } + + /** + * We leave a file behind in job submission directory so that clr client can figure out + * the applicationId and yarn rest endpoint. + * @param driverFolder + * @param applicationId + * @throws IOException + */ + private void writeDriverHttpEndPoint(final File driverFolder, + final String applicationId, + final Path dfsPath) throws IOException { + final FileSystem fs = FileSystem.get(yarnConfiguration); + final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint()); + + String trackingUri = null; + for (int i = 0; i < 60; i++) { + try { + LOG.log(Level.INFO, "Attempt " + i + " reading " + httpEndpointPath.toString()); + if (fs.exists(httpEndpointPath)) { + FSDataInputStream input = fs.open(httpEndpointPath); + BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8")); + trackingUri = reader.readLine(); + reader.close(); + break; + } + } catch (Exception ex) { + } + try{ + Thread.sleep(1000); + } catch(InterruptedException ex2) { + break; + } + } + + if (null == trackingUri) { + trackingUri = ""; + LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString()); + } + + final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint()); + BufferedWriter out = new BufferedWriter(new FileWriter(driverHttpEndpointFile)); + out.write(applicationId + "\n"); + out.write(trackingUri + "\n"); + String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address"); + if (null == addr || addr.startsWith("0.0.0.0")) { + String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids"); + if (null != str2) { + for (String rm : str2.split(",")) { + out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n"); + } + } + } else { + out.write(addr +"\n"); } + out.close(); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index 054408e..f619f5f 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -31,6 +31,7 @@ import org.apache.reef.javabridge.*; import org.apache.reef.driver.restart.DriverRestartCompleted; import org.apache.reef.runtime.common.driver.DriverStatusManager; import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.tang.annotations.Unit; import org.apache.reef.util.Optional; import org.apache.reef.util.logging.CLRBufferedLogHandler; @@ -48,6 +49,9 @@ import org.apache.reef.webserver.*; import javax.inject.Inject; import javax.servlet.ServletException; import javax.servlet.http.HttpServletResponse; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -110,6 +114,8 @@ public final class JobDriver { */ private final Map<String, ActiveContext> contexts = new HashMap<>(); + private final REEFFileNames reefFileNames; + private final LocalAddressProvider localAddressProvider; /** * Logging scope factory that provides LoggingScope. */ @@ -140,6 +146,7 @@ public final class JobDriver { new HashMap<String, AllocatedEvaluatorBridge>(); private EvaluatorRequestorBridge evaluatorRequestorBridge; + /** * Job driver constructor. * All parameters are injected from TANG automatically. @@ -159,6 +166,7 @@ public final class JobDriver { final LoggingScopeFactory loggingScopeFactory, final LocalAddressProvider localAddressProvider, final ActiveContextBridgeFactory activeContextBridgeFactory, + final REEFFileNames reefFileNames, final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, final CLRProcessFactory clrProcessFactory) { this.clock = clock; @@ -171,6 +179,8 @@ public final class JobDriver { this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory; this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort(); this.loggingScopeFactory = loggingScopeFactory; + this.reefFileNames = reefFileNames; + this.localAddressProvider = localAddressProvider; this.clrProcessFactory = clrProcessFactory; } @@ -188,6 +198,17 @@ public final class JobDriver { } final String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort())); + if (portNumber != null){ + try { + final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint()); + BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName)); + out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n"); + out.close(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + this.evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); final long[] handlers = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java index 68a6fb0..2c93e22 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java @@ -47,9 +47,9 @@ public final class REEFFileNames { private static final String DRIVER_STDOUT = "driver.stdout"; private static final String EVALUATOR_STDERR = "evaluator.stderr"; private static final String EVALUATOR_STDOUT = "evaluator.stdout"; + private static final String DRIVER_HTTP_ENDPOINT_FILE_NAME = "DriverHttpEndpoint.txt"; private static final String BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe"; - @Inject public REEFFileNames() { } @@ -208,4 +208,12 @@ public final class REEFFileNames { public String getEvaluatorStdoutFileName() { return EVALUATOR_STDOUT; } + + /** + * @return File name that contains the dfs path for the DriverHttpEndpoint. + */ + public String getDriverHttpEndpoint() { + return DRIVER_HTTP_ENDPOINT_FILE_NAME; + } + } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java index 1a79505..72ec88b 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java @@ -35,7 +35,7 @@ import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.hdinsight.HDInsightClasspathProvider; import org.apache.reef.runtime.hdinsight.HDInsightJVMPathProvider; import org.apache.reef.runtime.yarn.driver.*; -import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; +import org.apache.reef.driver.parameters.JobSubmissionDirectory; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; import org.apache.reef.tang.formats.ConfigurationModule; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index 8723ac3..e21d81b 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -102,6 +102,14 @@ public final class YarnSubmissionHelper implements Closeable{ } /** + * + * @return the application ID string representation assigned by YARN. + */ + public String getStringApplicationId() { + return this.applicationId.toString(); + } + + /** * Set the name of the application to be submitted. * @param applicationName * @return http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java index c6d8291..b8467d5 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; +import org.apache.reef.driver.parameters.JobSubmissionDirectory; import org.apache.reef.tang.annotations.Parameter; import javax.inject.Inject; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index e088649..028744f 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -20,6 +20,7 @@ package org.apache.reef.runtime.yarn.driver; import com.google.protobuf.ByteString; import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.fs.*; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.reef.driver.parameters.JobSubmissionDirectory; import org.apache.reef.exception.DriverFatalRuntimeException; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; @@ -36,6 +38,7 @@ import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; +import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.util.Optional; @@ -75,6 +78,8 @@ final class YarnContainerManager private final ContainerRequestCounter containerRequestCounter; private final DriverStatusManager driverStatusManager; private final TrackingURLProvider trackingURLProvider; + private final String jobSubmissionDirectory; + private final REEFFileNames reefFileNames; private final RackNameFormatter rackNameFormatter; @Inject @@ -86,9 +91,10 @@ final class YarnContainerManager final ApplicationMasterRegistration registration, final ContainerRequestCounter containerRequestCounter, final DriverStatusManager driverStatusManager, + final REEFFileNames reefFileNames, + @Parameter(JobSubmissionDirectory.class) final String jobSubmissionDirectory, final TrackingURLProvider trackingURLProvider, final RackNameFormatter rackNameFormatter) throws IOException { - this.reefEventHandlers = reefEventHandlers; this.driverStatusManager = driverStatusManager; @@ -104,6 +110,8 @@ final class YarnContainerManager this.resourceManager = AMRMClientAsync.createAMRMClientAsync(yarnRMHeartbeatPeriod, this); this.nodeManager = new NMClientAsyncImpl(this); + this.jobSubmissionDirectory = jobSubmissionDirectory; + this.reefFileNames = reefFileNames; LOG.log(Level.FINEST, "Instantiated YarnContainerManager"); } @@ -250,7 +258,12 @@ final class YarnContainerManager this.registration.setRegistration(this.resourceManager.registerApplicationMaster( "", 0, this.trackingURLProvider.getTrackingUrl())); LOG.log(Level.FINE, "YARN registration: {0}", registration); - + final FileSystem fs = FileSystem.get(this.yarnConf); + final Path outputFileName = new Path(this.jobSubmissionDirectory, this.reefFileNames.getDriverHttpEndpoint()); + final FSDataOutputStream out = fs.create(outputFileName); + out.writeBytes(this.trackingURLProvider.getTrackingUrl() + "\n"); + out.flush(); + out.close(); } catch (final YarnException | IOException e) { LOG.log(Level.WARNING, "Unable to register application master.", e); onRuntimeError(e); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java index e3cade9..a7de1d3 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java @@ -30,7 +30,7 @@ import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; import org.apache.reef.runtime.common.launch.parameters.LaunchID; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.yarn.YarnClasspathProvider; -import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; +import org.apache.reef.driver.parameters.JobSubmissionDirectory; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; import org.apache.reef.tang.formats.*; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java index 9c36503..6213bf9 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.reef.runtime.yarn.driver.parameters; -import org.apache.reef.tang.annotations.Name; +package org.apache.reef.driver.parameters; + import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.annotations.Name; /** * The job submission directory.
