Repository: incubator-reef Updated Branches: refs/heads/master 14b6d4b59 -> cb31da9f2
[REEF-581] Add example for restart on YARN This adds an example for Driver Restarts in REEF.NET. JIRA: [REEF-581](https://issues.apache.org/jira/browse/REEF-581) Pull Request: This closes #453 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/cb31da9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/cb31da9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/cb31da9f Branch: refs/heads/master Commit: cb31da9f2d0f5169a0d7583c56d9ff274147d0d0 Parents: 14b6d4b Author: Andrew Chung <[email protected]> Authored: Mon Aug 31 17:32:23 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Sep 1 12:58:46 2015 -0700 ---------------------------------------------------------------------- .../DriverRestart.cs | 54 +++++++++++++++++++- ...rg.Apache.REEF.Examples.DriverRestart.csproj | 47 +++++++++++++++++ .../DriverRestart/HelloRestartDriver.cs | 30 +++++++++-- 3 files changed, 126 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb31da9f/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs index 55e8889..2db7eb3 100644 --- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs +++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs @@ -17,6 +17,15 @@ * under the License. */ +using System; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.YARN; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Util; + namespace Org.Apache.REEF.Examples.DriverRestart { /// <summary> @@ -26,9 +35,52 @@ namespace Org.Apache.REEF.Examples.DriverRestart /// </summary> public sealed class DriverRestart { + private readonly IREEFClient _reefClient; + private readonly JobSubmissionBuilderFactory _jobSubmissionBuilderFactory; + + [Inject] + private DriverRestart(IREEFClient reefClient, JobSubmissionBuilderFactory jobSubmissionBuilderFactory) + { + _reefClient = reefClient; + _jobSubmissionBuilderFactory = jobSubmissionBuilderFactory; + } + + /// <summary> + /// Runs DriverRestart using the IREEFClient passed into the constructor. + /// </summary> + private void Run() + { + // The driver configuration contains all the needed bindings. + var driverConfiguration = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnDriverRestarted, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnDriverRestartCompleted, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnDriverRestartContextActive, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnDriverRestartTaskRunning, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<HelloRestartDriver>.Class) + .Set(DriverConfiguration.OnDriverReconnect, GenericType<DefaultYarnClusterHttpDriverConnection>.Class) + .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, (5 * 60).ToString()) + .Set(DriverConfiguration.MaxApplicationSubmissions, 2.ToString()) + .Build(); + + // The JobSubmission contains the Driver configuration as well as the files needed on the Driver. + var restartJobSubmission = _jobSubmissionBuilderFactory.GetJobSubmissionBuilder() + .AddDriverConfiguration(driverConfiguration) + .AddGlobalAssemblyForType(typeof(HelloRestartDriver)) + .SetJobIdentifier("DriverRestart") + .Build(); + + _reefClient.Submit(restartJobSubmission); + } + public static void Main(string[] args) { - // TODO[REEF-581]: Fill in method. + TangFactory.GetTang().NewInjector(YARNClientConfiguration.ConfigurationModule.Build()).GetInstance<DriverRestart>().Run(); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb31da9f/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj index c874111..6d5d1d5 100644 --- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj +++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj @@ -17,11 +17,58 @@ </PropertyGroup> <Import Project="$(SolutionDir)\build.props" /> <ItemGroup> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> <Compile Include="DriverRestart.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup> <None Include="Readme.md" /> </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj"> + <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project> + <Name>Org.Apache.REEF.Bridge</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj"> + <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project> + <Name>Org.Apache.REEF.Client</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> + <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project> + <Name>Org.Apache.REEF.Common</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj"> + <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project> + <Name>Org.Apache.REEF.Driver</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Evaluator\Org.Apache.REEF.Evaluator.csproj"> + <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project> + <Name>Org.Apache.REEF.Evaluator</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Examples\Org.Apache.REEF.Examples.csproj"> + <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project> + <Name>Org.Apache.REEF.Examples</Name> + </ProjectReference> + </ItemGroup> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> </Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb31da9f/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs index 9d25746..63ada39 100644 --- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs +++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs @@ -18,6 +18,7 @@ */ using System; +using System.Collections.Generic; using System.Diagnostics; using System.Text; using Org.Apache.REEF.Common.Tasks; @@ -37,19 +38,21 @@ namespace Org.Apache.REEF.Examples.DriverRestart /// The Driver for HelloRestartREEF. /// This driver is meant to run on YARN on HDInsight, with the ability to keep containers /// across application attempts. - /// It requests 5 evaluators and runs a running task on each of the evaluators. + /// It requests 1 evaluators and runs a running task on each of the evaluators. /// Once all tasks are running, the driver kills itself and expects the RM to restart it. - /// On restart, it expects all 5 of the running tasks to report back to it. + /// On restart, it expects all of the running task(s) to report back to it. /// </summary> public sealed class HelloRestartDriver : IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>, IObserver<IDriverStarted>, - IObserver<IDriverRestarted>, IObserver<IActiveContext>, IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask> + IObserver<IDriverRestarted>, IObserver<IActiveContext>, IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>, + IObserver<IFailedEvaluator> { private static readonly Logger Logger = Logger.GetLogger(typeof(HelloRestartDriver)); private const int NumberOfTasksToSubmit = 1; private readonly IEvaluatorRequestor _evaluatorRequestor; + private readonly ISet<string> _receivedEvaluators = new HashSet<string>(StringComparer.OrdinalIgnoreCase); private readonly object _lockObj; - + private int _runningTaskCount; private int _finishedTaskCount; private bool _restarted; @@ -69,11 +72,17 @@ namespace Org.Apache.REEF.Examples.DriverRestart /// </summary> public void OnNext(IAllocatedEvaluator allocatedEvaluator) { + lock (_lockObj) + { + _receivedEvaluators.Add(allocatedEvaluator.Id); + } + var taskConfiguration = TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, "HelloRestartTask") .Set(TaskConfiguration.Task, GenericType<HelloRestartTask>.Class) .Set(TaskConfiguration.OnMessage, GenericType<HelloRestartTask>.Class) .Build(); + allocatedEvaluator.SubmitTask(taskConfiguration); } @@ -145,6 +154,19 @@ namespace Org.Apache.REEF.Examples.DriverRestart IncrementFinishedTask(value.GetActiveContext()); } + public void OnNext(IFailedEvaluator value) + { + bool restart; + lock (_lockObj) + { + restart = !this._receivedEvaluators.Contains(value.Id); + } + + var append = restart ? "Restarted recovered " : string.Empty; + + Logger.Log(Level.Info, append + "Evaluator [" + value + "] has failed!"); + } + public void OnError(Exception error) { throw error;
