Repository: incubator-reef Updated Branches: refs/heads/master 35f07f4e4 -> 9da895cb9
[REEF-232] Resolving race condition in Communication Group When CommunicationGroupNetworkObserver receives message from other nodes, handlers for operators may not be registered yet. This race condition may cause exception therefore resulting in sockets to be closed. When some nodes are not able to receive message from children or parents, entire communication get stuck. This can be repro when adding more nodes with tree topology. This PR is to resolve this issue by adding waiting and retry for handlers to be registered before processing the message. JIRA: [REEF-232] https://issues.apache.org/jira/browse/REEF-232 Pull Request: This closes #127 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9da895cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9da895cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9da895cb Branch: refs/heads/master Commit: 9da895cb93e9abbad3a03d1b413c14ce716f913d Parents: 35f07f4 Author: Julia Wang <[email protected]> Authored: Fri Mar 27 18:20:47 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Mar 27 18:36:43 2015 -0700 ---------------------------------------------------------------------- .../Group/Config/MpiConfigurationOptions.cs | 10 ++++ .../Group/Task/Impl/CommunicationGroupClient.cs | 5 +- .../Impl/CommunicationGroupNetworkObserver.cs | 56 +++++++++++++++++--- .../BroadcastReduceTest/BroadcastReduceTest.cs | 2 +- .../Remote/Impl/TransportServer.cs | 4 +- 5 files changed, 67 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs index b7bd357..7ef575b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs @@ -49,6 +49,16 @@ namespace Org.Apache.REEF.Network.Group.Config { } + [NamedParameter("sleep time to wait for handlers to be registered", defaultValue: "500")] + public class SleepTimeWaitingForHandler : Name<int> + { + } + + [NamedParameter("Retry times to wait for handlers to be registered", defaultValue: "5")] + public class RetryCountWaitingForHanler : Name<int> + { + } + [NamedParameter("Master task identifier")] public class MasterTaskId : Name<string> { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs index 3dcce76..a4d9e70 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs @@ -70,7 +70,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl [Parameter(typeof(MpiConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs, IMpiNetworkObserver mpiNetworkObserver, NetworkService<GroupCommunicationMessage> networkService, - AvroConfigurationSerializer configSerializer) + AvroConfigurationSerializer configSerializer, + CommunicationGroupNetworkObserver commGroupNetworkHandler) { _taskId = taskId; _driverId = driverId; @@ -81,7 +82,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _networkService = networkService; _mpiNetworkHandler = mpiNetworkObserver; - _commGroupNetworkHandler = new CommunicationGroupNetworkObserver(); + _commGroupNetworkHandler = commGroupNetworkHandler; _mpiNetworkHandler.Register(groupName, _commGroupNetworkHandler); // Deserialize operator configuration and store each injector. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs index e61e14b..20f17ec 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs @@ -19,8 +19,12 @@ using System; using System.Collections.Generic; +using System.Threading; +using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Task.Impl { @@ -29,15 +33,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> public class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver)); private readonly Dictionary<string, IObserver<GroupCommunicationMessage>> _handlers; - + private readonly int _retryCount; + private readonly int _sleepTime; + /// <summary> /// Creates a new CommunicationGroupNetworkObserver. /// </summary> [Inject] - public CommunicationGroupNetworkObserver() + public CommunicationGroupNetworkObserver( + [Parameter(typeof(MpiConfigurationOptions.RetryCountWaitingForHanler))] int retryCount, + [Parameter(typeof(MpiConfigurationOptions.SleepTimeWaitingForHandler))] int sleepTime) { _handlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>(); + _retryCount = retryCount; + _sleepTime = sleepTime; } /// <summary> @@ -72,13 +83,46 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { string operatorName = message.OperatorName; - IObserver<GroupCommunicationMessage> handler; - if (!_handlers.TryGetValue(operatorName, out handler)) + IObserver<GroupCommunicationMessage> handler = GetOperatorHandler(operatorName, _retryCount, _sleepTime); + + if (handler == null) { - throw new ArgumentException("No handler registered with the operator name: " + operatorName); + Exceptions.Throw(new ArgumentException("No handler registered with the operator name: " + operatorName), LOGGER); } + else + { + handler.OnNext(message); + } + } - handler.OnNext(message); + /// <summary> + /// GetOperatorHandler for operatorName + /// </summary> + /// <param name="operatorName"></param> + /// <param name="retry"></param> + /// <param name="sleepTime"></param> + /// <returns></returns> + private IObserver<GroupCommunicationMessage> GetOperatorHandler(string operatorName, int retry, int sleepTime) + { + //registration of handler might be delayed while the Network Service has received message from other servers + for (int i = 0; i < retry; i++) + { + if (!_handlers.ContainsKey(operatorName)) + { + LOGGER.Log(Level.Info, "handler for operator {0} has not been registered." + operatorName); + Thread.Sleep(sleepTime); + } + else + { + IObserver<GroupCommunicationMessage> handler; + if (!_handlers.TryGetValue(operatorName, out handler)) + { + Exceptions.Throw(new ArgumentException("No handler registered yet with the operator name: " + operatorName), LOGGER); + } + return handler; + } + } + return null; } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs index 2224c04..d3e74f8 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs @@ -53,7 +53,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest [TestMethod] public void TestBroadcastAndReduce() { - int numTasks = 4; + int numTasks = 9; IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( DriverBridgeConfiguration.ConfigurationModule http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs index 00ce09b..743bac5 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs @@ -184,10 +184,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl if (message == null) { + LOGGER.Log(Level.Error, "ProcessClient, no message received, break." + link.RemoteEndpoint + " - " + link.LocalEndpoint); break; } } + LOGGER.Log(Level.Error, "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested); } } } -} +} \ No newline at end of file
