Repository: incubator-reef
Updated Branches:
  refs/heads/master 547f2d6a8 -> 6e994d25b


[REEF-593] Add a .NET Driver restart example

This addressed the issue by
  * Adding a restartable driver example.
  * Change the way to specify application restartability to use a bool
    to prevent injection of constructor for drivers.

JIRA:
  [REEF-593](https://issues.apache.org/jira/browse/REEF-593)

Pull Request:
  This closes #381


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6e994d25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6e994d25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6e994d25

Branch: refs/heads/master
Commit: 6e994d25ba75ded3d4b724c365b1c25edd89dad5
Parents: 547f2d6
Author: Andrew Chung <[email protected]>
Authored: Tue Aug 18 17:00:41 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 20 14:17:44 2015 -0700

----------------------------------------------------------------------
 .../ClrClient2JavaClientCuratedParameters.cs    |   4 +-
 .../Bridge/DriverBridgeConfiguration.cs         |   7 +
 .../Bridge/DriverBridgeConfigurationOptions.cs  |   5 +
 .../DriverConfiguration.cs                      |   6 +
 .../DriverRestart/HelloRestartDriver.cs         | 174 +++++++++++++++++++
 .../DriverRestart/HelloRestartTask.cs           |  63 +++++++
 .../Org.Apache.REEF.Examples.csproj             |   2 +
 7 files changed, 259 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
 
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
index 9b5cd9a..897cdb1 100644
--- 
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++ 
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
@@ -49,14 +49,14 @@ namespace Org.Apache.REEF.Client.Common
             [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
             [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
             
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))] 
int maxApplicationSubmissions,
-            
[Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartedHandlers))] 
ISet<IObserver<IDriverRestarted>> restartHandlers)
+            
[Parameter(typeof(DriverBridgeConfigurationOptions.RestartEnabled))] bool 
restartEnabled)
         {
             TcpPortRangeStart = tcpPortRangeStart;
             TcpPortRangeCount = tcpPortRangeCount;
             TcpPortRangeTryCount = tcpPortRangeTryCount;
             TcpPortRangeSeed = tcpPortRangeSeed;
             MaxApplicationSubmissions = maxApplicationSubmissions;
-            EnableRestart = restartHandlers.Any();
+            EnableRestart = restartEnabled;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
index d48eb5e..f4b5b23 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -180,6 +180,12 @@ namespace Org.Apache.REEF.Driver.Bridge
         [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read 
only mutable reference types", Justification = "not applicable")]
         public static readonly 
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted = new 
OptionalImpl<IObserver<IDriverRestartCompleted>>();
 
+        /// <summary>
+        /// Whether or not the application has restart enabled. Defaults to 
false.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read 
only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalParameter<bool> RestartEnabled = new 
OptionalParameter<bool>();
+
         // This is currently not needed in Bridge/Driver model
         ///// <summary>
         ///// The event handler invoked right before the driver shuts down. 
Defaults to ignore.
@@ -233,6 +239,7 @@ namespace Org.Apache.REEF.Driver.Bridge
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
 OnDriverRestartTaskRunning)
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers>.Class,
 OnDriverRestartCompleted)
                 
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
+                
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
 RestartEnabled)
                 .Build();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index 488b2a7..ee72d37 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -137,6 +137,11 @@ namespace Org.Apache.REEF.Driver.Bridge
         {
         }
 
+        [NamedParameter("Whether restart should be enabled on the 
application", "RestartEnabled", "false")]
+        public class RestartEnabled : Name<bool>
+        {
+        }
+
         [NamedParameter("The number of times an application should be 
submitted in case of failure.", "MaxApplicationSubmissions", "1")]
         public class MaxApplicationSubmissions : Name<int>
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index e597c10..716647c 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -173,6 +173,11 @@ namespace Org.Apache.REEF.Driver
         /// </summary>
         public static readonly OptionalImpl<IDriverConnection> 
OnDriverReconnect = new OptionalImpl<IDriverConnection>();
 
+        /// <summary>
+        /// Whether or not the application has restart enabled. Defaults to 
false.
+        /// </summary>
+        public static readonly OptionalParameter<bool> RestartEnabled = new 
OptionalParameter<bool>();
+
         public static ConfigurationModule ConfigurationModule
         {
             get
@@ -218,6 +223,7 @@ namespace Org.Apache.REEF.Driver
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
                         MaxApplicationSubmissions)
+                    
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.RestartEnabled>.Class,
 RestartEnabled)
                     .Build()
                     // TODO: Move this up
                     .Set(OnDriverStarted, 
GenericType<ClassHierarchyGeneratingDriverStartObserver>.Class);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs 
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
new file mode 100644
index 0000000..9d25746
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using IRunningTask = Org.Apache.REEF.Driver.Task.IRunningTask;
+
+namespace Org.Apache.REEF.Examples.DriverRestart
+{
+    /// <summary>
+    /// The Driver for HelloRestartREEF.
+    /// This driver is meant to run on YARN on HDInsight, with the ability to 
keep containers
+    /// across application attempts.
+    /// It requests 5 evaluators and runs a running task on each of the 
evaluators.
+    /// Once all tasks are running, the driver kills itself and expects the RM 
to restart it.
+    /// On restart, it expects all 5 of the running tasks to report back to it.
+    /// </summary>
+    public sealed class HelloRestartDriver : 
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>, 
IObserver<IDriverStarted>, 
+        IObserver<IDriverRestarted>, IObserver<IActiveContext>, 
IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(HelloRestartDriver));
+        private const int NumberOfTasksToSubmit = 1;
+
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
+        private readonly object _lockObj;
+
+        private int _runningTaskCount;
+        private int _finishedTaskCount;
+        private bool _restarted;
+
+        [Inject]
+        private HelloRestartDriver(IEvaluatorRequestor evaluatorRequestor)
+        {
+            _finishedTaskCount = 0;
+            _runningTaskCount = 0;
+            _evaluatorRequestor = evaluatorRequestor;
+            _restarted = false;
+            _lockObj = new object();
+        }
+
+        /// <summary>
+        /// Submits the HelloRestartTask to the Evaluator.
+        /// </summary>
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            var taskConfiguration = TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "HelloRestartTask")
+                .Set(TaskConfiguration.Task, 
GenericType<HelloRestartTask>.Class)
+                .Set(TaskConfiguration.OnMessage, 
GenericType<HelloRestartTask>.Class)
+                .Build();
+            allocatedEvaluator.SubmitTask(taskConfiguration);
+        }
+
+        /// <summary>
+        /// Called to start the driver.
+        /// </summary>
+        public void OnNext(IDriverStarted driverStarted)
+        {
+            Logger.Log(Level.Info, "HelloRestartDriver started at {0}", 
driverStarted.StartTime);
+            _evaluatorRequestor.Submit(new 
EvaluatorRequest(NumberOfTasksToSubmit, 64));
+        }
+
+        /// <summary>
+        /// Prints a restart message and enters the restart codepath.
+        /// </summary>
+        public void OnNext(IDriverRestarted value)
+        {
+            _restarted = true;
+            Logger.Log(Level.Info, "Hello! HelloRestartDriver has restarted!");
+        }
+
+        public void OnNext(IActiveContext value)
+        {
+            Logger.Log(Level.Info, "Received active context {0} from evaluator 
with ID [{1}].", value.Id, value.EvaluatorId);
+        }
+
+        public void OnNext(IRunningTask value)
+        {
+            lock (_lockObj)
+            {
+                _runningTaskCount++;
+                
+                Logger.Log(Level.Info, "Received running task with ID [{0}] " +
+                              " with restart set to {1} from evaluator with ID 
[{2}]",
+                              value.Id, _restarted, 
value.ActiveContext.EvaluatorId);
+
+                if (_restarted)
+                {
+                    value.Send(Encoding.UTF8.GetBytes("Hello from driver!"));
+                }
+
+                // Kill itself in order for the driver to restart it.
+                if (_runningTaskCount == NumberOfTasksToSubmit)
+                {
+                    if (_restarted)
+                    {
+                        Logger.Log(Level.Info, "Retrieved all running tasks 
from the previous instance.");
+                    }
+                    else
+                    {
+                        Process.GetCurrentProcess().Kill();
+                    }
+                }
+            }
+        }
+
+        public void OnNext(IDriverRestartCompleted value)
+        {
+            Logger.Log(Level.Info, "Driver restart has completed.");
+        }
+
+        public void OnNext(ICompletedTask value)
+        {
+            
IncrementFinishedTask(Optional<IActiveContext>.Of(value.ActiveContext));
+        }
+
+        public void OnNext(IFailedTask value)
+        {
+            IncrementFinishedTask(value.GetActiveContext());
+        }
+
+        public void OnError(Exception error)
+        {
+            throw error;
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void IncrementFinishedTask(Optional<IActiveContext> 
activeContext)
+        {
+            lock (_lockObj)
+            {
+                _finishedTaskCount++;
+                if (_finishedTaskCount == NumberOfTasksToSubmit)
+                {
+                    Logger.Log(Level.Info, "All tasks are done! Driver should 
exit now...");
+                }
+
+                if (activeContext.IsPresent())
+                {
+                    activeContext.Value.Dispose();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs 
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
new file mode 100644
index 0000000..7decbda
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.DriverRestart
+{
+    /// <summary>
+    /// A Task that merely prints a greeting and exits.
+    /// </summary>
+    public sealed class HelloRestartTask : ITask, IDriverMessageHandler
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(HelloRestartTask));
+        private bool _exit;
+
+        [Inject]
+        private HelloRestartTask()
+        {
+            _exit = false;
+        }
+
+        public void Dispose()
+        {
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            while (!_exit)
+            {
+                Thread.Sleep(5000);
+            }
+
+            return null;
+        }
+
+        public void Handle(IDriverMessage message)
+        {
+            Logger.Log(Level.Verbose, "Receieved a message from driver. We 
should exit now...");
+            _exit = true;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6e994d25/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj 
b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
index 7730365..f4bd67f 100644
--- a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
@@ -35,6 +35,8 @@ under the License.
     <Reference Include="System.Runtime.Serialization" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="DriverRestart\HelloRestartDriver.cs" />
+    <Compile Include="DriverRestart\HelloRestartTask.cs" />
     <Compile Include="MachineLearning\KMeans\Centroids.cs" />
     <Compile Include="MachineLearning\KMeans\codecs\CentroidsCodec.cs" />
     <Compile Include="MachineLearning\KMeans\codecs\DataVectorCodec.cs" />

Reply via email to