Repository: incubator-reef
Updated Branches:
refs/heads/master 547f2d6a8 -> 6e994d25b
[REEF-593] Add a .NET Driver restart example
This addressed the issue by
* Adding a restartable driver example.
* Change the way to specify application restartability to use a bool
to prevent injection of constructor for drivers.
JIRA:
[REEF-593](https://issues.apache.org/jira/browse/REEF-593)
Pull Request:
This closes #381
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6e994d25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6e994d25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6e994d25
Branch: refs/heads/master
Commit: 6e994d25ba75ded3d4b724c365b1c25edd89dad5
Parents: 547f2d6
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 18 17:00:41 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 20 14:17:44 2015 -0700
----------------------------------------------------------------------
.../ClrClient2JavaClientCuratedParameters.cs | 4 +-
.../Bridge/DriverBridgeConfiguration.cs | 7 +
.../Bridge/DriverBridgeConfigurationOptions.cs | 5 +
.../DriverConfiguration.cs | 6 +
.../DriverRestart/HelloRestartDriver.cs | 174 +++++++++++++++++++
.../DriverRestart/HelloRestartTask.cs | 63 +++++++
.../Org.Apache.REEF.Examples.csproj | 2 +
7 files changed, 259 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
index 9b5cd9a..897cdb1 100644
---
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
@@ -49,14 +49,14 @@ namespace Org.Apache.REEF.Client.Common
[Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
[Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))]
int maxApplicationSubmissions,
-
[Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartedHandlers))]
ISet<IObserver<IDriverRestarted>> restartHandlers)
+
[Parameter(typeof(DriverBridgeConfigurationOptions.RestartEnabled))] bool
restartEnabled)
{
TcpPortRangeStart = tcpPortRangeStart;
TcpPortRangeCount = tcpPortRangeCount;
TcpPortRangeTryCount = tcpPortRangeTryCount;
TcpPortRangeSeed = tcpPortRangeSeed;
MaxApplicationSubmissions = maxApplicationSubmissions;
- EnableRestart = restartHandlers.Any();
+ EnableRestart = restartEnabled;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
index d48eb5e..f4b5b23 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -180,6 +180,12 @@ namespace Org.Apache.REEF.Driver.Bridge
[SuppressMessage("Microsoft.Security", "CA2104:Do not declare read
only mutable reference types", Justification = "not applicable")]
public static readonly
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted = new
OptionalImpl<IObserver<IDriverRestartCompleted>>();
+ /// <summary>
+ /// Whether or not the application has restart enabled. Defaults to
false.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read
only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<bool> RestartEnabled = new
OptionalParameter<bool>();
+
// This is currently not needed in Bridge/Driver model
///// <summary>
///// The event handler invoked right before the driver shuts down.
Defaults to ignore.
@@ -233,6 +239,7 @@ namespace Org.Apache.REEF.Driver.Bridge
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
OnDriverRestartTaskRunning)
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers>.Class,
OnDriverRestartCompleted)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
+
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
RestartEnabled)
.Build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index 488b2a7..ee72d37 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -137,6 +137,11 @@ namespace Org.Apache.REEF.Driver.Bridge
{
}
+ [NamedParameter("Whether restart should be enabled on the
application", "RestartEnabled", "false")]
+ public class RestartEnabled : Name<bool>
+ {
+ }
+
[NamedParameter("The number of times an application should be
submitted in case of failure.", "MaxApplicationSubmissions", "1")]
public class MaxApplicationSubmissions : Name<int>
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index e597c10..716647c 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -173,6 +173,11 @@ namespace Org.Apache.REEF.Driver
/// </summary>
public static readonly OptionalImpl<IDriverConnection>
OnDriverReconnect = new OptionalImpl<IDriverConnection>();
+ /// <summary>
+ /// Whether or not the application has restart enabled. Defaults to
false.
+ /// </summary>
+ public static readonly OptionalParameter<bool> RestartEnabled = new
OptionalParameter<bool>();
+
public static ConfigurationModule ConfigurationModule
{
get
@@ -218,6 +223,7 @@ namespace Org.Apache.REEF.Driver
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
MaxApplicationSubmissions)
+
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
RestartEnabled)
.Build()
// TODO: Move this up
.Set(OnDriverStarted,
GenericType<ClassHierarchyGeneratingDriverStartObserver>.Class);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
new file mode 100644
index 0000000..9d25746
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using IRunningTask = Org.Apache.REEF.Driver.Task.IRunningTask;
+
+namespace Org.Apache.REEF.Examples.DriverRestart
+{
+ /// <summary>
+ /// The Driver for HelloRestartREEF.
+ /// This driver is meant to run on YARN on HDInsight, with the ability to
keep containers
+ /// across application attempts.
+ /// It requests 5 evaluators and runs a running task on each of the
evaluators.
+ /// Once all tasks are running, the driver kills itself and expects the RM
to restart it.
+ /// On restart, it expects all 5 of the running tasks to report back to it.
+ /// </summary>
+ public sealed class HelloRestartDriver :
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>,
IObserver<IDriverStarted>,
+ IObserver<IDriverRestarted>, IObserver<IActiveContext>,
IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(HelloRestartDriver));
+ private const int NumberOfTasksToSubmit = 1;
+
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
+ private readonly object _lockObj;
+
+ private int _runningTaskCount;
+ private int _finishedTaskCount;
+ private bool _restarted;
+
+ [Inject]
+ private HelloRestartDriver(IEvaluatorRequestor evaluatorRequestor)
+ {
+ _finishedTaskCount = 0;
+ _runningTaskCount = 0;
+ _evaluatorRequestor = evaluatorRequestor;
+ _restarted = false;
+ _lockObj = new object();
+ }
+
+ /// <summary>
+ /// Submits the HelloRestartTask to the Evaluator.
+ /// </summary>
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+ {
+ var taskConfiguration = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, "HelloRestartTask")
+ .Set(TaskConfiguration.Task,
GenericType<HelloRestartTask>.Class)
+ .Set(TaskConfiguration.OnMessage,
GenericType<HelloRestartTask>.Class)
+ .Build();
+ allocatedEvaluator.SubmitTask(taskConfiguration);
+ }
+
+ /// <summary>
+ /// Called to start the driver.
+ /// </summary>
+ public void OnNext(IDriverStarted driverStarted)
+ {
+ Logger.Log(Level.Info, "HelloRestartDriver started at {0}",
driverStarted.StartTime);
+ _evaluatorRequestor.Submit(new
EvaluatorRequest(NumberOfTasksToSubmit, 64));
+ }
+
+ /// <summary>
+ /// Prints a restart message and enters the restart codepath.
+ /// </summary>
+ public void OnNext(IDriverRestarted value)
+ {
+ _restarted = true;
+ Logger.Log(Level.Info, "Hello! HelloRestartDriver has restarted!");
+ }
+
+ public void OnNext(IActiveContext value)
+ {
+ Logger.Log(Level.Info, "Received active context {0} from evaluator
with ID [{1}].", value.Id, value.EvaluatorId);
+ }
+
+ public void OnNext(IRunningTask value)
+ {
+ lock (_lockObj)
+ {
+ _runningTaskCount++;
+
+ Logger.Log(Level.Info, "Received running task with ID [{0}] " +
+ " with restart set to {1} from evaluator with ID
[{2}]",
+ value.Id, _restarted,
value.ActiveContext.EvaluatorId);
+
+ if (_restarted)
+ {
+ value.Send(Encoding.UTF8.GetBytes("Hello from driver!"));
+ }
+
+ // Kill itself in order for the driver to restart it.
+ if (_runningTaskCount == NumberOfTasksToSubmit)
+ {
+ if (_restarted)
+ {
+ Logger.Log(Level.Info, "Retrieved all running tasks
from the previous instance.");
+ }
+ else
+ {
+ Process.GetCurrentProcess().Kill();
+ }
+ }
+ }
+ }
+
+ public void OnNext(IDriverRestartCompleted value)
+ {
+ Logger.Log(Level.Info, "Driver restart has completed.");
+ }
+
+ public void OnNext(ICompletedTask value)
+ {
+
IncrementFinishedTask(Optional<IActiveContext>.Of(value.ActiveContext));
+ }
+
+ public void OnNext(IFailedTask value)
+ {
+ IncrementFinishedTask(value.GetActiveContext());
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ private void IncrementFinishedTask(Optional<IActiveContext>
activeContext)
+ {
+ lock (_lockObj)
+ {
+ _finishedTaskCount++;
+ if (_finishedTaskCount == NumberOfTasksToSubmit)
+ {
+ Logger.Log(Level.Info, "All tasks are done! Driver should
exit now...");
+ }
+
+ if (activeContext.IsPresent())
+ {
+ activeContext.Value.Dispose();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
new file mode 100644
index 0000000..7decbda
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.DriverRestart
+{
+ /// <summary>
+ /// A Task that merely prints a greeting and exits.
+ /// </summary>
+ public sealed class HelloRestartTask : ITask, IDriverMessageHandler
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(HelloRestartTask));
+ private bool _exit;
+
+ [Inject]
+ private HelloRestartTask()
+ {
+ _exit = false;
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ while (!_exit)
+ {
+ Thread.Sleep(5000);
+ }
+
+ return null;
+ }
+
+ public void Handle(IDriverMessage message)
+ {
+ Logger.Log(Level.Verbose, "Receieved a message from driver. We
should exit now...");
+ _exit = true;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
index 7730365..f4bd67f 100644
--- a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
@@ -35,6 +35,8 @@ under the License.
<Reference Include="System.Runtime.Serialization" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="DriverRestart\HelloRestartDriver.cs" />
+ <Compile Include="DriverRestart\HelloRestartTask.cs" />
<Compile Include="MachineLearning\KMeans\Centroids.cs" />
<Compile Include="MachineLearning\KMeans\codecs\CentroidsCodec.cs" />
<Compile Include="MachineLearning\KMeans\codecs\DataVectorCodec.cs" />