Repository: incubator-reef
Updated Branches:
  refs/heads/master 14b6d4b59 -> cb31da9f2


[REEF-581] Add example for restart on YARN

This adds an example for Driver Restarts in REEF.NET.

JIRA:
  [REEF-581](https://issues.apache.org/jira/browse/REEF-581)

Pull Request:
  This closes #453


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/cb31da9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/cb31da9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/cb31da9f

Branch: refs/heads/master
Commit: cb31da9f2d0f5169a0d7583c56d9ff274147d0d0
Parents: 14b6d4b
Author: Andrew Chung <[email protected]>
Authored: Mon Aug 31 17:32:23 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Sep 1 12:58:46 2015 -0700

----------------------------------------------------------------------
 .../DriverRestart.cs                            | 54 +++++++++++++++++++-
 ...rg.Apache.REEF.Examples.DriverRestart.csproj | 47 +++++++++++++++++
 .../DriverRestart/HelloRestartDriver.cs         | 30 +++++++++--
 3 files changed, 126 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb31da9f/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs 
b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
index 55e8889..2db7eb3 100644
--- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
@@ -17,6 +17,15 @@
  * under the License.
  */
 
+using System;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.YARN;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+
 namespace Org.Apache.REEF.Examples.DriverRestart
 {
     /// <summary>
@@ -26,9 +35,52 @@ namespace Org.Apache.REEF.Examples.DriverRestart
     /// </summary>
     public sealed class DriverRestart
     {
+        private readonly IREEFClient _reefClient;
+        private readonly JobSubmissionBuilderFactory 
_jobSubmissionBuilderFactory;
+
+        [Inject]
+        private DriverRestart(IREEFClient reefClient, 
JobSubmissionBuilderFactory jobSubmissionBuilderFactory)
+        {
+            _reefClient = reefClient;
+            _jobSubmissionBuilderFactory = jobSubmissionBuilderFactory;
+        }
+
+        /// <summary>
+        /// Runs DriverRestart using the IREEFClient passed into the 
constructor.
+        /// </summary>
+        private void Run()
+        {
+            // The driver configuration contains all the needed bindings.
+            var driverConfiguration = DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnDriverRestarted, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnContextActive, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnTaskFailed, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnDriverRestartCompleted, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnDriverRestartContextActive, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnDriverRestartTaskRunning, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnDriverReconnect, 
GenericType<DefaultYarnClusterHttpDriverConnection>.Class)
+                
.Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, (5 * 
60).ToString())
+                .Set(DriverConfiguration.MaxApplicationSubmissions, 
2.ToString())
+                .Build();
+
+            // The JobSubmission contains the Driver configuration as well as 
the files needed on the Driver.
+            var restartJobSubmission = 
_jobSubmissionBuilderFactory.GetJobSubmissionBuilder()
+                .AddDriverConfiguration(driverConfiguration)
+                .AddGlobalAssemblyForType(typeof(HelloRestartDriver))
+                .SetJobIdentifier("DriverRestart")
+                .Build();
+
+            _reefClient.Submit(restartJobSubmission);
+        }
+
         public static void Main(string[] args)
         {
-            // TODO[REEF-581]: Fill in method.
+            
TangFactory.GetTang().NewInjector(YARNClientConfiguration.ConfigurationModule.Build()).GetInstance<DriverRestart>().Run();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb31da9f/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj
 
b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj
index c874111..6d5d1d5 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj
+++ 
b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj
@@ -17,11 +17,58 @@
   </PropertyGroup>
   <Import Project="$(SolutionDir)\build.props" />
   <ItemGroup>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
     <Compile Include="DriverRestart.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="Readme.md" />
   </ItemGroup>
+  <ItemGroup>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj">
+      <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project>
+      <Name>Org.Apache.REEF.Bridge</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
+      <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project>
+      <Name>Org.Apache.REEF.Client</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+      <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+      <Name>Org.Apache.REEF.Common</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj">
+      <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
+      <Name>Org.Apache.REEF.Driver</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Evaluator\Org.Apache.REEF.Evaluator.csproj">
+      <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project>
+      <Name>Org.Apache.REEF.Evaluator</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+      <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+      <Name>Org.Apache.REEF.Tang</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+      <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+      <Name>Org.Apache.REEF.Utilities</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+      <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+      <Name>Org.Apache.REEF.Wake</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Examples\Org.Apache.REEF.Examples.csproj">
+      <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project>
+      <Name>Org.Apache.REEF.Examples</Name>
+    </ProjectReference>
+  </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
 </Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb31da9f/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs 
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
index 9d25746..63ada39 100644
--- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -18,6 +18,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using System.Diagnostics;
 using System.Text;
 using Org.Apache.REEF.Common.Tasks;
@@ -37,19 +38,21 @@ namespace Org.Apache.REEF.Examples.DriverRestart
     /// The Driver for HelloRestartREEF.
     /// This driver is meant to run on YARN on HDInsight, with the ability to 
keep containers
     /// across application attempts.
-    /// It requests 5 evaluators and runs a running task on each of the 
evaluators.
+    /// It requests 1 evaluators and runs a running task on each of the 
evaluators.
     /// Once all tasks are running, the driver kills itself and expects the RM 
to restart it.
-    /// On restart, it expects all 5 of the running tasks to report back to it.
+    /// On restart, it expects all of the running task(s) to report back to it.
     /// </summary>
     public sealed class HelloRestartDriver : 
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>, 
IObserver<IDriverStarted>, 
-        IObserver<IDriverRestarted>, IObserver<IActiveContext>, 
IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>
+        IObserver<IDriverRestarted>, IObserver<IActiveContext>, 
IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>,
+        IObserver<IFailedEvaluator>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(HelloRestartDriver));
         private const int NumberOfTasksToSubmit = 1;
 
         private readonly IEvaluatorRequestor _evaluatorRequestor;
+        private readonly ISet<string> _receivedEvaluators = new 
HashSet<string>(StringComparer.OrdinalIgnoreCase);
         private readonly object _lockObj;
-
+        
         private int _runningTaskCount;
         private int _finishedTaskCount;
         private bool _restarted;
@@ -69,11 +72,17 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         /// </summary>
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
+            lock (_lockObj)
+            {
+                _receivedEvaluators.Add(allocatedEvaluator.Id);
+            }
+
             var taskConfiguration = TaskConfiguration.ConfigurationModule
                 .Set(TaskConfiguration.Identifier, "HelloRestartTask")
                 .Set(TaskConfiguration.Task, 
GenericType<HelloRestartTask>.Class)
                 .Set(TaskConfiguration.OnMessage, 
GenericType<HelloRestartTask>.Class)
                 .Build();
+
             allocatedEvaluator.SubmitTask(taskConfiguration);
         }
 
@@ -145,6 +154,19 @@ namespace Org.Apache.REEF.Examples.DriverRestart
             IncrementFinishedTask(value.GetActiveContext());
         }
 
+        public void OnNext(IFailedEvaluator value)
+        {
+            bool restart;
+            lock (_lockObj)
+            {
+                restart = !this._receivedEvaluators.Contains(value.Id);
+            }
+
+            var append = restart ? "Restarted recovered " : string.Empty;
+
+            Logger.Log(Level.Info, append + "Evaluator [" + value + "] has 
failed!");
+        }
+
         public void OnError(Exception error)
         {
             throw error;

Reply via email to