Repository: incubator-reef Updated Branches: refs/heads/master a620f4630 -> 7f2d6ff74
[REEF-742] A Driver Connection State event handler in Evaluator for C# This addressed the issue by * Added IDriverConnectionMessage to allow TaskRuntime to be notified of Driver connection loss. * Added IDriverConnectionMessageHandler to let users to be able to act upon Driver connection loss. JIRA: [REEF-742](https://issues.apache.org/jira/browse/REEF-742) This closes #487 Author: Andrew Chung <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/7f2d6ff7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/7f2d6ff7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/7f2d6ff7 Branch: refs/heads/master Commit: 7f2d6ff74f86e1b73d4cd93152bbe62277261dda Parents: a620f46 Author: Andrew Chung <[email protected]> Authored: Fri Sep 11 16:10:25 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Fri Sep 25 17:09:06 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 4 +++ .../Runtime/Evaluator/Context/ContextManager.cs | 8 +++++ .../Runtime/Evaluator/Context/ContextRuntime.cs | 17 +++++++++ .../Runtime/Evaluator/HeartBeatManager.cs | 9 ++++- .../Runtime/Evaluator/Task/TaskRuntime.cs | 29 +++++++++++++++ .../Tasks/DriverConnectionMessageImpl.cs | 37 ++++++++++++++++++++ .../Tasks/DriverConnectionState.cs | 30 ++++++++++++++++ .../Tasks/IDriverConnectionMessage.cs | 32 +++++++++++++++++ .../Tasks/IDriverConnectionMessageHandler.cs | 31 ++++++++++++++++ .../Tasks/TaskConfiguration.cs | 7 ++++ .../DriverRestart/HelloRestartDriver.cs | 1 + .../DriverRestart/HelloRestartTask.cs | 28 ++++++++++++++- 12 files changed, 231 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index ed6ca7f..94c9758 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -137,12 +137,16 @@ under the License. <Compile Include="Services\ServicesConfigurationOptions.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverMessageHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultTaskMessageSource.cs" /> + <Compile Include="Tasks\DriverConnectionState.cs" /> + <Compile Include="Tasks\DriverConnectionMessageImpl.cs" /> <Compile Include="Tasks\Events\ICloseEvent.cs" /> <Compile Include="Tasks\Events\IDriverMessage.cs" /> <Compile Include="Tasks\Events\ISuspendEvent.cs" /> <Compile Include="Tasks\Events\ITaskStart.cs" /> <Compile Include="Tasks\Events\ITaskStop.cs" /> + <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\IDriverMessageHandler.cs" /> + <Compile Include="Tasks\IDriverConnectionMessage.cs" /> <Compile Include="Tasks\ITask.cs" /> <Compile Include="Tasks\ITaskMessageSource.cs" /> <Compile Include="Tasks\TaskConfiguration.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index 97f2ad2..e3fbb08 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -237,6 +237,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } /// <summary> + /// Propagates the IDriverConnection message to the top level ContextRuntime. + /// </summary> + internal void HandleDriverConnectionMessage(IDriverConnectionMessage message) + { + _contextStack.Peek().HandleDriverConnectionMessage(message); + } + + /// <summary> /// Add a context to the stack. /// </summary> /// <param name="addContextProto"></param> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 5006a66..352a8be 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -425,6 +425,23 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context return contextStatusProto; } } + + /// <summary> + /// Propagates the IDriverConnection message to the TaskRuntime. + /// </summary> + internal void HandleDriverConnectionMessage(IDriverConnectionMessage message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a IDriverConnectionMessage while there was no task running. Ignored"); + return; + } + + _task.Value.HandleDriverConnectionMessage(message); + } + } } } ///// <summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index 698dcfc..355877f 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -27,6 +27,7 @@ using System.Threading; using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; @@ -134,7 +135,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator if (_heartbeatFailures >= _maxHeartbeatRetries) { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures)); + LOGGER.Log(Level.Warning, "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable", _heartbeatFailures); + LOGGER.Log(Level.Info, "=========== Entering RECOVERY mode. ==========="); + _contextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected)); + try { _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); @@ -343,7 +347,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator Thread.Sleep(500); } } + _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; + _contextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected)); + LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 721adf7..f12dcd6 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -48,6 +48,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private readonly INameClient _nameClient; + private readonly Lazy<IDriverConnectionMessageHandler> _driverConnectionMessageHandler; + public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null) { _injector = taskInjector; @@ -83,6 +85,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task // do not rethrow since user is not required to provide name client } + _driverConnectionMessageHandler = new Lazy<IDriverConnectionMessageHandler>(() => + { + try + { + return _injector.GetInstance<IDriverConnectionMessageHandler>(); + } + catch (InjectionException) + { + LOGGER.Log(Level.Info, "User did not implement IDriverConnectionMessageHandler."); + } + + return null; + }); + LOGGER.Log(Level.Info, "task message source injected"); _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources); _memento = memento == null ? @@ -318,6 +334,19 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } } + /// <summary> + /// Propagates the IDriverConnection message to the Handler as specified by the Task. + /// </summary> + internal void HandleDriverConnectionMessage(IDriverConnectionMessage message) + { + if (_driverConnectionMessageHandler.Value == null) + { + return; + } + + _driverConnectionMessageHandler.Value.OnNext(message); + } + private byte[] RunTask(byte[] memento) { return _task.Call(memento); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionMessageImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionMessageImpl.cs new file mode 100644 index 0000000..4e4aa2f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionMessageImpl.cs @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Tasks +{ + /// <summary> + /// Implementation of the IDriverConnectionMessage. + /// </summary> + internal sealed class DriverConnectionMessageImpl : IDriverConnectionMessage + { + internal DriverConnectionMessageImpl(DriverConnectionState state) + { + State = state; + } + + /// <summary> + /// State of Driver connection. + /// </summary> + public DriverConnectionState State { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionState.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionState.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionState.cs new file mode 100644 index 0000000..0f19af5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/DriverConnectionState.cs @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Tasks +{ + /// <summary> + /// The state of the connection with Driver. + /// </summary> + public enum DriverConnectionState + { + Disconnected, + Reconnected + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessage.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessage.cs new file mode 100644 index 0000000..1f5fd94 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessage.cs @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Tasks +{ + /// <summary> + /// The message triggered on a change of status in heartbeat to Driver. + /// </summary> + public interface IDriverConnectionMessage + { + /// <summary> + /// The change in Driver heartbeat connection. + /// </summary> + DriverConnectionState State { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs new file mode 100644 index 0000000..36360df --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Common.Tasks +{ + /// <summary> + /// The handler implementable by users to handle IDriverConnectionMessages, + /// which notifies the Task when there is a change in driver connection state. + /// </summary> + public interface IDriverConnectionMessageHandler : IObserver<IDriverConnectionMessage> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs index 27b965c..8b93e84 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs @@ -62,6 +62,12 @@ namespace Org.Apache.REEF.Common.Tasks public static readonly OptionalImpl<IDriverMessageHandler> OnMessage = new OptionalImpl<IDriverMessageHandler>(); /// <summary> + /// for heartbeat status changes from the Driver. Does not do anything if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IDriverConnectionMessageHandler> OnDriverConnectionChanged = new OptionalImpl<IDriverConnectionMessageHandler>(); + + /// <summary> /// for closure requests from the driver. Defaults to task failure if not bound. /// </summary> [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] @@ -125,6 +131,7 @@ namespace Org.Apache.REEF.Common.Tasks .BindImplementation(GenericType<ITask>.Class, Task) .BindImplementation(GenericType<ITaskMessageSource>.Class, OnSendMessage) .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage) + .BindImplementation(GenericType<IDriverConnectionMessageHandler>.Class, OnDriverConnectionChanged) .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier) .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento) .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs index a5208ff..2ad662a 100644 --- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs +++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs @@ -85,6 +85,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart .Set(TaskConfiguration.Identifier, "HelloRestartTask") .Set(TaskConfiguration.Task, GenericType<HelloRestartTask>.Class) .Set(TaskConfiguration.OnMessage, GenericType<HelloRestartTask>.Class) + .Set(TaskConfiguration.OnDriverConnectionChanged, GenericType<HelloRestartTask>.Class) .Build(); allocatedEvaluator.SubmitTask(taskConfiguration); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7f2d6ff7/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs index 7decbda..11ed148 100644 --- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartTask.cs @@ -29,7 +29,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart /// <summary> /// A Task that merely prints a greeting and exits. /// </summary> - public sealed class HelloRestartTask : ITask, IDriverMessageHandler + public sealed class HelloRestartTask : ITask, IDriverMessageHandler, IDriverConnectionMessageHandler { private static readonly Logger Logger = Logger.GetLogger(typeof(HelloRestartTask)); private bool _exit; @@ -59,5 +59,31 @@ namespace Org.Apache.REEF.Examples.DriverRestart Logger.Log(Level.Verbose, "Receieved a message from driver. We should exit now..."); _exit = true; } + + public void OnNext(IDriverConnectionMessage value) + { + switch (value.State) + { + case DriverConnectionState.Disconnected: + Logger.Log(Level.Warning, "Task lost connection with Driver!"); + break; + case DriverConnectionState.Reconnected: + Logger.Log(Level.Info, "Task reconnected with new Driver!"); + break; + default: + Logger.Log(Level.Warning, "Task driver connection status: " + value.State); + break; + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } } } \ No newline at end of file
