Repository: incubator-reef
Updated Branches:
  refs/heads/master 6eefbf1cc -> 189c5acba


[REEF-663] Make Http Endpoint available for client after submit

 - This PR is to return HttpEndPoint to the client after Job is submitted
 - At Java side, the http endpoint is written to JobSubmission folder in HDFS
   and Driver folder
 - At REEFClient side, HttpClientHelper is added to pull the http endpoint from
   the file and return it in Submit()
 - E2e test cases are updated to get the URI and verify on it.
 - The new Submit API is noted as [Ustable] as we might change it in future
 - The change is part of the change in tlc branch with some modifications

JIRA:
  [REEF-663](https://issues.apache.org/jira/browse/REEF-663)

Pull Request:
  This closes #505


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/189c5acb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/189c5acb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/189c5acb

Branch: refs/heads/master
Commit: 189c5acba1bb5a1f8e1393a02dbe643676f959fe
Parents: 6eefbf1
Author: Julia Wang <[email protected]>
Authored: Fri Sep 18 12:47:01 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Sep 18 21:10:52 2015 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Client/API/IREEFClient.cs   |  13 +
 .../Common/HttpClientHelper.cs                  | 344 +++++++++++++++++++
 .../Common/IDriverHttpEndpoint.cs               |  31 ++
 .../Org.Apache.REEF.Client/Local/LocalClient.cs |  53 ++-
 .../Org.Apache.REEF.Client.csproj               |   7 +
 .../Properties/AssemblyInfo.cs                  |   7 +
 .../Org.Apache.REEF.Client/YARN/YARNClient.cs   |  33 +-
 .../Files/REEFFileNames.cs                      |   8 +
 .../AllHandlers.cs                              |  11 +-
 .../Functional/Bridge/TestBridgeClient.cs       |  13 +-
 .../Functional/ReefFunctionalTest.cs            |  38 +-
 .../apache/reef/bridge/client/LocalClient.java  |   8 +-
 .../bridge/client/LocalSubmissionFromCS.java    |   2 +
 .../bridge/client/YarnJobSubmissionClient.java  |  66 +++-
 .../reef/javabridge/generic/JobDriver.java      |  21 ++
 .../runtime/common/files/REEFFileNames.java     |  10 +-
 .../client/HDInsightDriverConfiguration.java    |   2 +-
 .../yarn/client/YarnSubmissionHelper.java       |   8 +
 .../yarn/driver/UploaderToJobfolder.java        |   2 +-
 .../yarn/driver/YarnContainerManager.java       |  17 +-
 .../yarn/driver/YarnDriverConfiguration.java    |   2 +-
 .../parameters/JobSubmissionDirectory.java      |   5 +-
 22 files changed, 669 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs 
b/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
index e4f4c88..74b8d95 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
@@ -17,6 +17,10 @@
  * under the License.
  */
 
+using System;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Common.Attributes;
+
 namespace Org.Apache.REEF.Client.API
 {
     /// <summary>
@@ -30,5 +34,14 @@ namespace Org.Apache.REEF.Client.API
         /// </summary>
         /// <param name="jobSubmission"></param>
         void Submit(IJobSubmission jobSubmission);
+
+        /// <summary>
+        /// Submit the job described in jobSubmission to the cluster.
+        /// Expect IDriverHttpEndpoint returned after the call.
+        /// </summary>
+        /// <param name="jobSubmission"></param>
+        /// <returns>IDriverHttpEndpoint</returns>
+        [Unstable("0.13", "Working in progress for what to return after 
submit")]
+        IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission 
jobSubmission);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs 
b/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs
new file mode 100644
index 0000000..1a00493
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Newtonsoft.Json;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Client.Common
+{
+    internal class HttpClientHelper : IDriverHttpEndpoint
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(HttpClientHelper));
+        private const int MaxConnectAttemptCount = 20;
+        private const int MilliSecondsToWaitBeforeNextConnectAttempt = 1000;
+        private const int SecondsForHttpClientTimeout = 120;
+        private const string UnAssigned = "UNASSIGNED";
+        private const string TrackingUrlKey = "trackingUrl";
+        private const string AppKey = "app";
+        private const string ThisIsStandbyRm = "This is standby RM";
+        private const string AppJson = "application/json";
+
+        private string _driverUrl;
+
+        private readonly HttpClient _client;
+
+        internal HttpClientHelper()
+        {
+            _client = new HttpClient
+            {
+                Timeout = TimeSpan.FromSeconds(SecondsForHttpClientTimeout),
+            };
+            _client.DefaultRequestHeaders.Accept.Add(new 
MediaTypeWithQualityHeaderValue(AppJson));
+        }
+
+        public string DriverUrl { get { return _driverUrl; } }
+
+        public string GetUrlResult(string url)
+        {
+            var task = Task.Run(() => CallUrl(url));
+            task.Wait();
+            return task.Result;
+        }
+
+        enum UrlResultKind
+        {
+            WasNotAbleToTalkToRm,
+            BackupRm,
+            AppIdNotThereYet,
+            UrlNotAssignedYet,
+            GotAppIdUrl,
+        }
+
+        internal static List<string> GetRmUri(string filePath)
+        {
+            using (var sr = new StreamReader(File.Open(filePath, 
FileMode.Open, FileAccess.Read, FileShare.Read)))
+            {
+                sr.ReadLine(); // appid 
+                sr.ReadLine(); // trackingUrl
+                var rmList = new List<string>();
+                var rmUri = sr.ReadLine();
+                while (rmUri != null)
+                {                    
+                    rmList.Add(rmUri);
+                    rmUri = sr.ReadLine();
+                }
+                return rmList;
+            }
+        }
+
+        internal static string GetAppId(string filePath)
+        {
+            using (var sr = new StreamReader(File.Open(filePath, 
FileMode.Open, FileAccess.Read, FileShare.Read)))
+            {
+                var appId = sr.ReadLine();                
+                return appId;
+            }
+        }
+
+        internal static string GetTrackingUrl(string filePath)
+        {
+            using (var sr = new StreamReader(File.Open(filePath, 
FileMode.Open, FileAccess.Read, FileShare.Read)))
+            {
+                sr.ReadLine(); // appid
+                var trackingUrl = sr.ReadLine();
+                return "http://"; + trackingUrl + "/";
+            }
+        }
+
+        internal async Task<string> CallUrl (string url)
+        {
+            var result = await TryGetUri(url);
+            if (HasCommandFailed(result))
+            {
+                return null;
+            }
+            LOGGER.Log(Level.Warning, "CallUrl result " + result.Item2);
+            return result.Item2;
+        }
+
+        internal string GetDriverUrlForYarn(String filePath)
+        {
+            _driverUrl = GetTrackingUrl(filePath);
+            return _driverUrl;
+        }
+
+        internal string GetDriverUrlForLocalRuntime(string filePath)
+        {
+            _driverUrl = null;
+            for (int i = 0; i < 10; i++)
+            {
+                var driverUrl = TryReadHttpServerIpAndPortFromFile(filePath);
+                if (!string.IsNullOrEmpty(driverUrl))
+                {
+                    _driverUrl = "http://"; + driverUrl + "/";
+                    break;
+                }
+                Thread.Sleep(1000);
+            }
+            return _driverUrl;
+        }
+
+        private string TryReadHttpServerIpAndPortFromFile(String fileName)
+        {
+            string httpServerIpAndPort = null;
+            try
+            {
+                LOGGER.Log(Level.Info, "try open " + fileName);
+                using (var rdr = new StreamReader(File.OpenRead(fileName)))
+                {
+                    httpServerIpAndPort = rdr.ReadLine();
+                    LOGGER.Log(Level.Info, "httpServerIpAndPort is " + 
httpServerIpAndPort);
+                }
+            }
+            catch (FileNotFoundException)
+            {
+                LOGGER.Log(Level.Info, "File does not exist: " + fileName);
+            }
+            return httpServerIpAndPort;
+        }
+
+        internal async Task<string> GetAppIdTrackingUrl(string url)
+        {
+            var result = await TryGetUri(url);
+            if (HasCommandFailed(result) ||  
+                result.Item2 == null)                
+            {
+                return null;
+            }
+
+            LOGGER.Log(Level.Info, "GetAppIdTrackingUrl: " + result.Item2);
+            return result.Item2;
+        }
+
+        private static bool ShouldRetry(HttpRequestException 
httpRequestException)
+        {
+            var shouldRetry = false;
+            if 
(httpRequestException.Message.IndexOf(((int)(HttpStatusCode.NotFound)).ToString(),
 StringComparison.Ordinal) != -1 ||
+                
httpRequestException.Message.IndexOf(((int)(HttpStatusCode.BadGateway)).ToString(),
 StringComparison.Ordinal) != -1)
+            {
+                shouldRetry = true;
+            }
+            else
+            {
+                var webException = httpRequestException.InnerException as 
System.Net.WebException;
+                if (webException != null)
+                {
+                    if (webException.Status == 
System.Net.WebExceptionStatus.ConnectFailure)
+                    {
+                        shouldRetry = true;
+                    }
+                }
+            }
+            return shouldRetry;
+        }
+
+        private static Tuple<bool, string> CommandFailed(String reason)
+        {
+            return new Tuple<bool, string>(false, null);
+        }
+
+        private static Tuple<bool, string> CommandSucceeded(string 
commandResult)
+        {
+            return new Tuple<bool, string>(true, commandResult);
+        }
+
+        private bool HasCommandFailed(Tuple<bool, string> httpCallResult)
+        {
+            return !httpCallResult.Item1;
+        }
+
+        internal async Task<Tuple<bool, string>> TryGetUri(string commandUri)
+        {
+            var connectAttemptCount = 0;
+            Tuple<bool, string> result;
+
+            while (true)
+            {
+                try
+                {
+                    string strResult = null;
+                    LOGGER.Log(Level.Warning, "Try url [" + commandUri + "] 
connectAttemptCount " + connectAttemptCount + ".");
+                    strResult = await _client.GetStringAsync(commandUri);
+                    result = CommandSucceeded(strResult);
+                    LOGGER.Log(Level.Warning, "Connection succeeded. 
connectAttemptCount was " + connectAttemptCount + ".");
+                    break;
+                }
+                catch (HttpRequestException httpRequestException)
+                {
+                    if (!ShouldRetry(httpRequestException))
+                    {
+                        LOGGER.Log(Level.Error,
+                            commandUri + " exception " + 
httpRequestException.Message + "\n" +
+                            httpRequestException.StackTrace);
+                        result = CommandFailed(httpRequestException.Message);
+                        LOGGER.Log(Level.Warning, "Connection failed. 
connectAttemptCount was " + connectAttemptCount + ".");
+                        break;
+                    }
+                }
+                catch (Exception ex)
+                {
+                    LOGGER.Log(Level.Error, commandUri + " exception " + 
ex.Message + "\n" + ex.StackTrace);
+                    result = CommandFailed(ex.Message);
+                    LOGGER.Log(Level.Warning, "Connection failed. 
connectAttemptCount was " + connectAttemptCount + ".");
+                    break;
+                }
+
+                ++connectAttemptCount;
+                if (connectAttemptCount >= MaxConnectAttemptCount)
+                {
+                    result = CommandFailed("Could not connect to " + 
commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts.");
+                    LOGGER.Log(Level.Warning, "Connection failed. 
connectAttemptCount was " + connectAttemptCount + ".");
+                    break;
+                }
+
+                Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt);
+            }
+
+            return result;
+        }
+
+        internal async Task<string> TryUntilNoConnection(string commandUri)
+        {
+            var connectAttemptCount = 0;
+            while (true)
+            {
+                try
+                {
+                    var strResult = await _client.GetStringAsync(commandUri);
+                    LOGGER.Log(Level.Info,
+                        "Connection succeeded. connectAttemptCount was " + 
connectAttemptCount + ".");
+                }
+                catch (HttpRequestException httpRequestException)
+                {
+                    LOGGER.Log(Level.Info, httpRequestException.Message);
+                    break;
+                }
+                catch (Exception e)
+                {
+                    LOGGER.Log(Level.Info, e.Message);
+                    break;
+                }
+
+                ++connectAttemptCount;
+                if (connectAttemptCount >= MaxConnectAttemptCount)
+                {
+                    LOGGER.Log(Level.Info, "Can still connect to " + 
commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts.");
+                    break;
+                }
+
+                Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt);
+            }
+
+            return null;
+        }
+
+        private static bool ShouldRetry(HttpStatusCode httpStatusCode)
+        {
+            return httpStatusCode == HttpStatusCode.NotFound;
+        }
+
+        private UrlResultKind CheckUrlAttempt(string result)
+        {
+            UrlResultKind resultKind = UrlResultKind.WasNotAbleToTalkToRm;
+            if (string.IsNullOrEmpty(result))
+            {
+                resultKind = UrlResultKind.WasNotAbleToTalkToRm;
+            }
+            else if (result.StartsWith(ThisIsStandbyRm))
+            {
+                resultKind = UrlResultKind.BackupRm;
+            }
+            else
+            {
+                dynamic deserializedValue = 
JsonConvert.DeserializeObject(result);
+                var values = deserializedValue[AppKey];
+                if (values == null || values[TrackingUrlKey] == null)
+                {
+                    resultKind = UrlResultKind.AppIdNotThereYet;
+                }
+                else
+                {
+                    _driverUrl = values[TrackingUrlKey].ToString();
+                    LOGGER.Log(Level.Info, "trackingUrl[" + _driverUrl + "]");
+
+                    if (0 == String.Compare(_driverUrl, UnAssigned))
+                    {
+                        resultKind = UrlResultKind.UrlNotAssignedYet;
+                    }
+                    else
+                    {
+                        resultKind = UrlResultKind.GotAppIdUrl;
+                    }
+
+                }
+            }
+
+            LOGGER.Log(Level.Info, "CheckUrlAttempt " + resultKind);
+            return resultKind;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs 
b/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs
new file mode 100644
index 0000000..c128ad0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Attributes;
+
+namespace Org.Apache.REEF.Client.Common
+{
+    [Unstable("0.13", "Working in progress for what to return after submit")]
+    public interface IDriverHttpEndpoint
+    {
+        string GetUrlResult(string url);
+
+        string DriverUrl { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs 
b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
index 518533e..af2d47b 100644
--- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
@@ -20,9 +20,11 @@
 using System;
 using System.IO;
 using System.Linq;
+using System.Threading.Tasks;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Local.Parameters;
+using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Utilities.Logging;
@@ -49,29 +51,37 @@ namespace Org.Apache.REEF.Client.Local
         private readonly JavaClientLauncher _javaClientLauncher;
         private readonly int _numberOfEvaluators;
         private readonly string _runtimeFolder;
+        private string _driverUrl;
+        private REEFFileNames _fileNames;
 
         [Inject]
         private LocalClient(DriverFolderPreparationHelper 
driverFolderPreparationHelper,
             [Parameter(typeof(LocalRuntimeDirectory))] string runtimeFolder,
-            [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, 
JavaClientLauncher javaClientLauncher)
+            [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators,
+            JavaClientLauncher javaClientLauncher,
+            REEFFileNames fileNames)
         {
             _driverFolderPreparationHelper = driverFolderPreparationHelper;
             _runtimeFolder = runtimeFolder;
             _numberOfEvaluators = numberOfEvaluators;
             _javaClientLauncher = javaClientLauncher;
+            _fileNames = fileNames;
         }
 
         /// <summary>
         /// Uses Path.GetTempPath() as the runtime execution folder.
         /// </summary>
         /// <param name="driverFolderPreparationHelper"></param>
-        /// <param name="reefJarPath"></param>
         /// <param name="numberOfEvaluators"></param>
+        /// <param name="javaClientLauncher"></param>
+        /// <param name="fileNames"></param>
         [Inject]
         private LocalClient(
             DriverFolderPreparationHelper driverFolderPreparationHelper,
-            [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, 
JavaClientLauncher javaClientLauncher)
-            : this(driverFolderPreparationHelper, Path.GetTempPath(), 
numberOfEvaluators, javaClientLauncher)
+            [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators,
+            JavaClientLauncher javaClientLauncher,
+            REEFFileNames fileNames)
+            : this(driverFolderPreparationHelper, Path.GetTempPath(), 
numberOfEvaluators, javaClientLauncher, fileNames)
         {
             // Intentionally left blank.
         }
@@ -99,6 +109,41 @@ namespace Org.Apache.REEF.Client.Local
             Logger.Log(Level.Info, "Submitted the Driver for execution.");
         }
 
+        public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission 
jobSubmission)
+        {
+            // Prepare the job submission folder
+            var jobFolder = CreateJobFolder(jobSubmission.JobIdentifier);
+            var driverFolder = Path.Combine(jobFolder, DriverFolderName);
+            Logger.Log(Level.Info, "Preparing driver folder in " + 
driverFolder);
+
+            _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, 
driverFolder);
+
+            //TODO: Remove this when we have a generalized way to pass config 
to java
+            var javaParams = TangFactory.GetTang()
+                .NewInjector(jobSubmission.DriverConfigurations.ToArray())
+                .GetInstance<ClrClient2JavaClientCuratedParameters>();
+
+            Task.Run(() =>
+            _javaClientLauncher.Launch(JavaClassName, driverFolder, 
jobSubmission.JobIdentifier,
+                _numberOfEvaluators.ToString(),
+                javaParams.TcpPortRangeStart.ToString(),
+                javaParams.TcpPortRangeCount.ToString(),
+                javaParams.TcpPortRangeTryCount.ToString()
+                ));
+
+            var fileName = Path.Combine(driverFolder, 
_fileNames.DriverHttpEndpoint);
+            HttpClientHelper helper = new HttpClientHelper();
+            _driverUrl = helper.GetDriverUrlForLocalRuntime(fileName);
+
+            Logger.Log(Level.Info, "Submitted the Driver for execution. 
Returned driverUrl is: " + _driverUrl);
+            return helper;
+        }
+
+        public string DriverUrl
+        {
+            get { return _driverUrl; }
+        }
+
         /// <summary>
         /// Creates the temporary directory to hold the job submission.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj 
b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index ee19ca6..a21c2e2 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -37,10 +37,15 @@ under the License.
     <UseVSHostingProcess>false</UseVSHostingProcess>
   </PropertyGroup>
   <ItemGroup>
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="Newtonsoft.Json">
+      
<HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
+    </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core" />
     <Reference Include="System.IO.Compression" />
     <Reference Include="System.IO.Compression.FileSystem" />
+    <Reference Include="System.Net.Http" />
   </ItemGroup>
   <ItemGroup>
     <Compile Include="API\ClientFactory.cs" />
@@ -58,6 +63,8 @@ under the License.
     <Compile Include="Common\ClrClient2JavaClientCuratedParameters.cs" />
     <Compile Include="Common\DriverFolderPreparationHelper.cs" />
     <Compile Include="Common\FileSets.cs" />
+    <Compile Include="Common\HttpClientHelper.cs" />
+    <Compile Include="Common\IDriverHttpEndpoint.cs" />
     <Compile Include="Common\JavaClientLauncher.cs" />
     <Compile Include="Common\ResourceHelper.cs" />
     <Compile Include="Local\LocalClient.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs 
b/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs
index 8ea0bff..b503884 100644
--- a/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Properties/AssemblyInfo.cs
@@ -18,6 +18,7 @@
  */
 
 using System.Reflection;
+using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
 
 // General Information about an assembly is controlled through the following 
@@ -52,3 +53,9 @@ using System.Runtime.InteropServices;
 // [assembly: AssemblyVersion("1.0.*")]
 [assembly: AssemblyVersion("0.13.0.0")]
 [assembly: AssemblyFileVersion("0.13.0.0")]
+
+[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" +
+ 
"00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9"
 +
+ 
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
+ 
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
+ 
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
index f7e27ad..cda887d 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -22,6 +22,7 @@ using System.IO;
 using System.Linq;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Utilities.Logging;
@@ -38,15 +39,19 @@ namespace Org.Apache.REEF.Client.YARN
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(YARNClient));
         private readonly DriverFolderPreparationHelper 
_driverFolderPreparationHelper;
         private readonly JavaClientLauncher _javaClientLauncher;
+        private String _driverUrl;
+        private REEFFileNames _fileNames;
 
         [Inject]
         internal YARNClient(JavaClientLauncher javaClientLauncher,
             DriverFolderPreparationHelper driverFolderPreparationHelper,
+            REEFFileNames fileNames,
             YarnCommandLineEnvironment yarn)
         {
             _javaClientLauncher = javaClientLauncher;
             _javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList());
             _driverFolderPreparationHelper = driverFolderPreparationHelper;
+            _fileNames = fileNames;
         }
 
         public void Submit(IJobSubmission jobSubmission)
@@ -55,6 +60,27 @@ namespace Org.Apache.REEF.Client.YARN
             var driverFolderPath = 
CreateDriverFolder(jobSubmission.JobIdentifier);
             Logger.Log(Level.Info, "Preparing driver folder in " + 
driverFolderPath);
 
+            Launch(jobSubmission, driverFolderPath);
+        }
+
+        public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission 
jobSubmission)
+        {
+            // Prepare the job submission folder
+            var driverFolderPath = 
CreateDriverFolder(jobSubmission.JobIdentifier);
+            Logger.Log(Level.Info, "Preparing driver folder in " + 
driverFolderPath);
+
+            Launch(jobSubmission, driverFolderPath);
+
+            var pointerFileName = Path.Combine(driverFolderPath, 
_fileNames.DriverHttpEndpoint);
+
+            var httpClient = new HttpClientHelper();
+            _driverUrl = httpClient.GetDriverUrlForYarn(pointerFileName);
+
+            return httpClient;
+        }
+
+        private void Launch(IJobSubmission jobSubmission, string 
driverFolderPath)
+        {
             _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, 
driverFolderPath);
 
             //TODO: Remove this when we have a generalized way to pass config 
to java
@@ -71,7 +97,12 @@ namespace Org.Apache.REEF.Client.YARN
                 javaParams.MaxApplicationSubmissions.ToString(),
                 javaParams.DriverRestartEvaluatorRecoverySeconds.ToString()
                 );
-            Logger.Log(Level.Info, "Submitted the Driver for execution.");
+            Logger.Log(Level.Info, "Submitted the Driver for execution." + 
jobSubmission.JobIdentifier);
+        }
+
+        public string DriverUrl
+        {
+            get { return _driverUrl; }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs 
b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
index 5316301..15ddd02 100644
--- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
@@ -19,6 +19,7 @@
 
 using System.Diagnostics.CodeAnalysis;
 using System.IO;
+using Org.Apache.REEF.Common.Attributes;
 using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Common.Files
@@ -48,6 +49,7 @@ namespace Org.Apache.REEF.Common.Files
         private const string DRIVER_CONFIGURATION_NAME = "driver.conf";
         private const string EVALUATOR_CONFIGURATION_NAME = "evaluator.conf";
         private const string CLR_DRIVER_CONFIGURATION_NAME = "clrdriver.conf";
+        private const string DRIVER_HTTP_ENDPOINT_FILE_NAME = 
"DriverHttpEndpoint.txt";
         private const string BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
         private const string BRIDGE_EXE_CONFIG_NAME = 
"Org.Apache.REEF.Bridge.exe.config";
 
@@ -229,6 +231,12 @@ namespace Org.Apache.REEF.Common.Files
             return Path.Combine(REEF_BASE_FOLDER, BRIDGE_EXE_CONFIG_NAME);
         }
 
+        /// <summary>
+        /// </summary>
+        /// <returns>File name that contains the dfs path for the 
DriverHttpEndpoint</returns>
+        [Unstable("0.13", "Working in progress for what to return after 
submit")]
+        public string DriverHttpEndpoint { get { return 
DRIVER_HTTP_ENDPOINT_FILE_NAME; } }
+
         private static readonly string GLOBAL_FOLDER_PATH = 
Path.Combine(REEF_BASE_FOLDER, GLOBAL_FOLDER);
         private static readonly string LOCAL_FOLDER_PATH = 
Path.Combine(REEF_BASE_FOLDER, LOCAL_FOLDER);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs 
b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
index 27a553b..0eeb7d5 100644
--- a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
@@ -20,6 +20,7 @@
 using System;
 using System.IO;
 using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Local;
 using Org.Apache.REEF.Client.YARN;
 using Org.Apache.REEF.Common.Evaluator;
@@ -50,7 +51,7 @@ namespace Org.Apache.REEF.Examples.AllHandlers
             _jobSubmissionBuilderFactory = jobSubmissionBuilderFactory;
         }
 
-        private void Run()
+        private IDriverHttpEndpoint Run()
         {
             var helloDriverConfiguration = 
DriverConfiguration.ConfigurationModule
                 .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<HelloAllocatedEvaluatorHandler>.Class)
@@ -82,7 +83,8 @@ namespace Org.Apache.REEF.Examples.AllHandlers
                 .SetJobIdentifier("HelloDriver")
                 .Build();
 
-            _reefClient.Submit(helloJobSubmission);
+            IDriverHttpEndpoint driverHttpEndpoint = 
_reefClient.SubmitAndGetDriverUrl(helloJobSubmission);
+            return driverHttpEndpoint;
         }
 
         /// <summary></summary>
@@ -122,11 +124,12 @@ namespace Org.Apache.REEF.Examples.AllHandlers
         /// args[0] specify either running local or YARN. Default is local
         /// args[1] specify running folder. Default is REEF_LOCAL_RUNTIME
         /// </remarks>
-        public static void Run(string[] args)
+        public static IDriverHttpEndpoint Run(string[] args)
         {
             string runOnYarn = args.Length > 0 ? args[0] : Local;
             string runtimeFolder = args.Length > 1 ? args[1] : 
"REEF_LOCAL_RUNTIME";
-            
TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, 
runtimeFolder)).GetInstance<AllHandlers>().Run();
+            IDriverHttpEndpoint driverEndPoint = 
TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, 
runtimeFolder)).GetInstance<AllHandlers>().Run();
+            return driverEndPoint;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs
index aa582c4..6a4e15c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs
@@ -19,6 +19,7 @@
 
 using System;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Examples.AllHandlers;
 using Org.Apache.REEF.Utilities.Logging;
 
@@ -62,11 +63,19 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             RunClrBridgeClient(false, testRuntimeFolder);
         }
 
-        private void RunClrBridgeClient(bool runOnYarn, string 
testRuntimeFolder)
+        private async void RunClrBridgeClient(bool runOnYarn, string 
testRuntimeFolder)
         {
             string[] a = new[] { runOnYarn ? "yarn" : "local", 
testRuntimeFolder };
-            AllHandlers.Run(a);
+            IDriverHttpEndpoint driverHttpEndpoint = AllHandlers.Run(a);
+
+            var uri = driverHttpEndpoint.DriverUrl + "NRT/status?a=1&b=2";
+            var strStatus = driverHttpEndpoint.GetUrlResult(uri);
+            Assert.IsTrue(strStatus.Equals("Byte array returned from 
HelloHttpHandler in CLR!!!\r\n"));
+
+            await 
((HttpClientHelper)driverHttpEndpoint).TryUntilNoConnection(uri);
+
             ValidateSuccessForLocalRuntime(2, testRuntimeFolder);
+
             CleanUp(testRuntimeFolder);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index e19f53f..0a8935d 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -51,6 +51,7 @@ namespace Org.Apache.REEF.Tests.Functional
 
         private const string Local = "local";
         private const string YARN = "yarn";
+        private const int SleepTime = 1000;
 
         private readonly static Logger Logger = 
Logger.GetLogger(typeof(ReefFunctionalTest));
         private const string StorageAccountKeyEnvironmentVariable = 
"REEFTestStorageAccountKey";
@@ -139,14 +140,35 @@ namespace Org.Apache.REEF.Tests.Functional
             const string successIndication = "EXIT: 
ActiveContextClr2Java::Close";
             const string failedTaskIndication = 
"Java_com_microsoft_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext";
             const string failedEvaluatorIndication = 
"Java_com_microsoft_reef_javabridge_NativeInterop_clrSystemFailedEvaluatorHandlerOnNext";
-            string[] lines = File.ReadAllLines(GetLogFile(_stdout, 
testFolder));
-            Console.WriteLine("Lines read from log file : " + lines.Count());
-            string[] successIndicators = lines.Where(s => 
s.Contains(successIndication)).ToArray();
-            string[] failedTaskIndicators = lines.Where(s => 
s.Contains(failedTaskIndication)).ToArray();
-            string[] failedIndicators = lines.Where(s => 
s.Contains(failedEvaluatorIndication)).ToArray();
-            Assert.IsTrue(successIndicators.Count() == 
numberOfEvaluatorsToClose);
-            Assert.IsFalse(failedTaskIndicators.Any());
-            Assert.IsFalse(failedIndicators.Any());
+            string[] lines = null;
+            for (int i = 0; i < 60; i++)
+            {
+                try
+                {
+                    lines = File.ReadAllLines(GetLogFile(_stdout, testFolder));
+                    break;
+                }
+                catch (Exception)
+                {
+                    Thread.Sleep(SleepTime);
+                }
+            }
+
+            if (lines != null)
+            {
+                Console.WriteLine("Lines read from log file : " + 
lines.Count());
+                string[] successIndicators = lines.Where(s => 
s.Contains(successIndication)).ToArray();
+                string[] failedTaskIndicators = lines.Where(s => 
s.Contains(failedTaskIndication)).ToArray();
+                string[] failedIndicators = lines.Where(s => 
s.Contains(failedEvaluatorIndication)).ToArray();
+                Assert.IsTrue(successIndicators.Count() == 
numberOfEvaluatorsToClose);
+                Assert.IsFalse(failedTaskIndicators.Any());
+                Assert.IsFalse(failedIndicators.Any());
+            }
+            else
+            {
+                Console.WriteLine("Cannot read from log file");
+                Assert.Fail();
+            }
         }
 
         protected void PeriodicUploadLog(object source, ElapsedEventArgs e)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index 19a4055..7a08ee2 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -19,6 +19,7 @@
 package org.apache.reef.bridge.client;
 
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
@@ -79,8 +80,11 @@ public final class LocalClient {
     final Configuration providedConfigurations = configurationBuilder.build();
     final Configuration driverConfiguration = Configurations.merge(
         driverConfiguration1,
+        Tang.Factory.getTang()
+            .newConfigurationBuilder()
+            .bindNamedParameter(JobSubmissionDirectory.class, 
driverFolder.toString())
+            .build(),
         providedConfigurations);
-
     final File driverConfigurationFile = new File(driverFolder, 
fileNames.getDriverConfigurationPath());
     configurationSerializer.toFile(driverConfiguration, 
driverConfigurationFile);
     launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), 
CLIENT_REMOTE_ID);
@@ -97,4 +101,4 @@ public final class LocalClient {
 
     client.submit(localSubmissionFromCS);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
index 58c5fea..ee37590 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
@@ -20,6 +20,7 @@ package org.apache.reef.bridge.client;
 
 import org.apache.commons.lang.Validate;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.io.TcpPortConfigurationProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import 
org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
@@ -93,6 +94,7 @@ final class LocalSubmissionFromCS {
         .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(tcpBeginPort))
         .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(tcpRangeCount))
         .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(tcpTryCount))
+        .bindNamedParameter(JobSubmissionDirectory.class, 
runtimeRootFolder.getAbsolutePath())
         .bindList(DriverLaunchCommandPrefix.class, 
driverLaunchCommandPrefixList)
         .build();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index be5c24b..fa90602 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.bridge.client;
 
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -43,11 +44,14 @@ import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.JARFileMaker;
-
 import javax.inject.Inject;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
 import java.util.List;
 import java.util.Set;
 import java.util.logging.Level;
@@ -173,7 +177,6 @@ public final class YarnJobSubmissionClient {
     try (final YarnSubmissionHelper submissionHelper =
              new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, 
tokenProvider, commandPrefixList)) {
 
-
       // 
------------------------------------------------------------------------
       // Prepare the JAR
       final JobFolder jobFolderOnDFS = 
this.uploader.createJobFolder(submissionHelper.getApplicationId());
@@ -183,7 +186,6 @@ public final class YarnJobSubmissionClient {
       final File jarFile = makeJar(yarnSubmission.getDriverFolder());
       LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
 
-
       // 
------------------------------------------------------------------------
       // Upload the JAR
       LOG.info("Uploading job submission JAR");
@@ -207,7 +209,65 @@ public final class YarnJobSubmissionClient {
       } catch (InjectionException ie) {
         throw new RuntimeException("Unable to submit job due to " + ie);
       }
+      writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
+          submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
+    }
+  }
+
+  /**
+   * We leave a file behind in job submission directory so that clr client can 
figure out
+   * the applicationId and yarn rest endpoint.
+   * @param driverFolder
+   * @param applicationId
+   * @throws IOException
+   */
+  private void writeDriverHttpEndPoint(final File driverFolder,
+                                       final String applicationId,
+                                       final Path dfsPath) throws  IOException 
{
+    final FileSystem fs = FileSystem.get(yarnConfiguration);
+    final Path httpEndpointPath = new Path(dfsPath, 
fileNames.getDriverHttpEndpoint());
+
+    String trackingUri = null;
+    for (int i = 0; i < 60; i++) {
+      try {
+        LOG.log(Level.INFO, "Attempt " + i + " reading " + 
httpEndpointPath.toString());
+        if (fs.exists(httpEndpointPath)) {
+          FSDataInputStream input = fs.open(httpEndpointPath);
+          BufferedReader reader = new BufferedReader(new 
InputStreamReader(input, "UTF-8"));
+          trackingUri = reader.readLine();
+          reader.close();
+          break;
+        }
+      } catch (Exception ex) {
+      }
+      try{
+        Thread.sleep(1000);
+      } catch(InterruptedException ex2) {
+        break;
+      }
+    }
+
+    if (null == trackingUri) {
+      trackingUri = "";
+      LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString());
+    }
+
+    final File driverHttpEndpointFile = new File(driverFolder, 
fileNames.getDriverHttpEndpoint());
+    BufferedWriter out = new BufferedWriter(new 
FileWriter(driverHttpEndpointFile));
+    out.write(applicationId + "\n");
+    out.write(trackingUri + "\n");
+    String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address");
+    if (null == addr || addr.startsWith("0.0.0.0")) {
+      String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids");
+      if (null != str2) {
+        for (String rm : str2.split(",")) {
+          
out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) 
+"\n");
+        }
+      }
+    } else {
+      out.write(addr +"\n");
     }
+    out.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index 054408e..f619f5f 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -31,6 +31,7 @@ import org.apache.reef.javabridge.*;
 import org.apache.reef.driver.restart.DriverRestartCompleted;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.util.Optional;
 import org.apache.reef.util.logging.CLRBufferedLogHandler;
@@ -48,6 +49,9 @@ import org.apache.reef.webserver.*;
 import javax.inject.Inject;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -110,6 +114,8 @@ public final class JobDriver {
    */
   private final Map<String, ActiveContext> contexts = new HashMap<>();
 
+  private final REEFFileNames reefFileNames;
+  private final LocalAddressProvider localAddressProvider;
   /**
    * Logging scope factory that provides LoggingScope.
    */
@@ -140,6 +146,7 @@ public final class JobDriver {
       new HashMap<String, AllocatedEvaluatorBridge>();
   private EvaluatorRequestorBridge evaluatorRequestorBridge;
 
+
   /**
    * Job driver constructor.
    * All parameters are injected from TANG automatically.
@@ -159,6 +166,7 @@ public final class JobDriver {
             final LoggingScopeFactory loggingScopeFactory,
             final LocalAddressProvider localAddressProvider,
             final ActiveContextBridgeFactory activeContextBridgeFactory,
+            final REEFFileNames reefFileNames,
             final AllocatedEvaluatorBridgeFactory 
allocatedEvaluatorBridgeFactory,
             final CLRProcessFactory clrProcessFactory) {
     this.clock = clock;
@@ -171,6 +179,8 @@ public final class JobDriver {
     this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory;
     this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + 
this.nameServer.getPort();
     this.loggingScopeFactory = loggingScopeFactory;
+    this.reefFileNames = reefFileNames;
+    this.localAddressProvider = localAddressProvider;
     this.clrProcessFactory = clrProcessFactory;
   }
 
@@ -188,6 +198,17 @@ public final class JobDriver {
       }
 
       final String portNumber = httpServer == null ? null : 
Integer.toString((httpServer.getPort()));
+      if (portNumber != null){
+        try {
+          final File outputFileName = new 
File(reefFileNames.getDriverHttpEndpoint());
+          BufferedWriter out = new BufferedWriter(new 
FileWriter(outputFileName));
+          out.write(localAddressProvider.getLocalAddress() + ":" + portNumber 
+ "\n");
+          out.close();
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+
       this.evaluatorRequestorBridge =
           new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, 
false, loggingScopeFactory);
       final long[] handlers = initializer.getClrHandlers(portNumber, 
evaluatorRequestorBridge);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
index 68a6fb0..2c93e22 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
@@ -47,9 +47,9 @@ public final class REEFFileNames {
   private static final String DRIVER_STDOUT = "driver.stdout";
   private static final String EVALUATOR_STDERR = "evaluator.stderr";
   private static final String EVALUATOR_STDOUT = "evaluator.stdout";
+  private static final String DRIVER_HTTP_ENDPOINT_FILE_NAME = 
"DriverHttpEndpoint.txt";
   private static final String BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
 
-
   @Inject
   public REEFFileNames() {
   }
@@ -208,4 +208,12 @@ public final class REEFFileNames {
   public String getEvaluatorStdoutFileName() {
     return EVALUATOR_STDOUT;
   }
+
+  /**
+   * @return File name that contains the dfs path for the DriverHttpEndpoint.
+   */
+  public String getDriverHttpEndpoint() {
+    return DRIVER_HTTP_ENDPOINT_FILE_NAME;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
index 1a79505..72ec88b 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
@@ -35,7 +35,7 @@ import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.hdinsight.HDInsightClasspathProvider;
 import org.apache.reef.runtime.hdinsight.HDInsightJVMPathProvider;
 import org.apache.reef.runtime.yarn.driver.*;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
 import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
 import org.apache.reef.tang.formats.ConfigurationModule;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 8723ac3..e21d81b 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -102,6 +102,14 @@ public final class YarnSubmissionHelper implements 
Closeable{
   }
 
   /**
+   *
+   * @return the application ID string representation assigned by YARN.
+   */
+  public String getStringApplicationId() {
+    return this.applicationId.toString();
+  }
+
+  /**
    * Set the name of the application to be submitted.
    * @param applicationName
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
index c6d8291..b8467d5 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
@@ -28,7 +28,7 @@ import 
org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index e088649..028744f 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.yarn.driver;
 
 import com.google.protobuf.ByteString;
 import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
@@ -36,6 +38,7 @@ import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent
 import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
+import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
@@ -75,6 +78,8 @@ final class YarnContainerManager
   private final ContainerRequestCounter containerRequestCounter;
   private final DriverStatusManager driverStatusManager;
   private final TrackingURLProvider trackingURLProvider;
+  private final String jobSubmissionDirectory;
+  private final REEFFileNames reefFileNames;
   private final RackNameFormatter rackNameFormatter;
 
   @Inject
@@ -86,9 +91,10 @@ final class YarnContainerManager
       final ApplicationMasterRegistration registration,
       final ContainerRequestCounter containerRequestCounter,
       final DriverStatusManager driverStatusManager,
+      final REEFFileNames reefFileNames,
+      @Parameter(JobSubmissionDirectory.class) final String 
jobSubmissionDirectory,
       final TrackingURLProvider trackingURLProvider,
       final RackNameFormatter rackNameFormatter) throws IOException {
-
     this.reefEventHandlers = reefEventHandlers;
     this.driverStatusManager = driverStatusManager;
 
@@ -104,6 +110,8 @@ final class YarnContainerManager
 
     this.resourceManager = 
AMRMClientAsync.createAMRMClientAsync(yarnRMHeartbeatPeriod, this);
     this.nodeManager = new NMClientAsyncImpl(this);
+    this.jobSubmissionDirectory = jobSubmissionDirectory;
+    this.reefFileNames = reefFileNames;
     LOG.log(Level.FINEST, "Instantiated YarnContainerManager");
   }
 
@@ -250,7 +258,12 @@ final class YarnContainerManager
       
this.registration.setRegistration(this.resourceManager.registerApplicationMaster(
           "", 0, this.trackingURLProvider.getTrackingUrl()));
       LOG.log(Level.FINE, "YARN registration: {0}", registration);
-
+      final FileSystem fs = FileSystem.get(this.yarnConf);
+      final Path outputFileName = new Path(this.jobSubmissionDirectory, 
this.reefFileNames.getDriverHttpEndpoint());
+      final FSDataOutputStream out = fs.create(outputFileName);
+      out.writeBytes(this.trackingURLProvider.getTrackingUrl() + "\n");
+      out.flush();
+      out.close();
     } catch (final YarnException | IOException e) {
       LOG.log(Level.WARNING, "Unable to register application master.", e);
       onRuntimeError(e);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
index e3cade9..a7de1d3 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
@@ -30,7 +30,7 @@ import 
org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
 import org.apache.reef.runtime.common.launch.parameters.LaunchID;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.yarn.YarnClasspathProvider;
-import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
 import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
 import org.apache.reef.tang.formats.*;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/189c5acb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
index 9c36503..6213bf9 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.reef.runtime.yarn.driver.parameters;
 
-import org.apache.reef.tang.annotations.Name;
+package org.apache.reef.driver.parameters;
+
 import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Name;
 
 /**
  * The job submission directory.

Reply via email to