Repository: incubator-reef
Updated Branches:
  refs/heads/master 35f07f4e4 -> 9da895cb9


[REEF-232]  Resolving race condition in Communication Group

When CommunicationGroupNetworkObserver receives message from other
nodes, handlers for operators may not be registered yet. This race
condition may cause exception therefore resulting in sockets to be
closed. When some nodes are not able to receive message from children or
parents, entire communication get stuck. This can be repro when adding
more nodes with tree topology.

This PR is to resolve this issue by adding waiting and retry for
handlers to be registered before processing the message.

JIRA:
  [REEF-232] https://issues.apache.org/jira/browse/REEF-232

Pull Request:
  This closes #127


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9da895cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9da895cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9da895cb

Branch: refs/heads/master
Commit: 9da895cb93e9abbad3a03d1b413c14ce716f913d
Parents: 35f07f4
Author: Julia Wang <[email protected]>
Authored: Fri Mar 27 18:20:47 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Mar 27 18:36:43 2015 -0700

----------------------------------------------------------------------
 .../Group/Config/MpiConfigurationOptions.cs     | 10 ++++
 .../Group/Task/Impl/CommunicationGroupClient.cs |  5 +-
 .../Impl/CommunicationGroupNetworkObserver.cs   | 56 +++++++++++++++++---
 .../BroadcastReduceTest/BroadcastReduceTest.cs  |  2 +-
 .../Remote/Impl/TransportServer.cs              |  4 +-
 5 files changed, 67 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
index b7bd357..7ef575b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
@@ -49,6 +49,16 @@ namespace Org.Apache.REEF.Network.Group.Config
         {
         }
 
+        [NamedParameter("sleep time to wait for handlers to be registered", 
defaultValue: "500")]
+        public class SleepTimeWaitingForHandler : Name<int>
+        {
+        }
+
+        [NamedParameter("Retry times to wait for handlers to be registered", 
defaultValue: "5")]
+        public class RetryCountWaitingForHanler : Name<int>
+        {
+        }
+
         [NamedParameter("Master task identifier")]
         public class MasterTaskId : Name<string>
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 3dcce76..a4d9e70 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -70,7 +70,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             
[Parameter(typeof(MpiConfigurationOptions.SerializedOperatorConfigs))] 
ISet<string> operatorConfigs,
             IMpiNetworkObserver mpiNetworkObserver,
             NetworkService<GroupCommunicationMessage> networkService,
-            AvroConfigurationSerializer configSerializer)
+            AvroConfigurationSerializer configSerializer,
+            CommunicationGroupNetworkObserver commGroupNetworkHandler)
         {
             _taskId = taskId;
             _driverId = driverId;
@@ -81,7 +82,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             _networkService = networkService;
             _mpiNetworkHandler = mpiNetworkObserver;
-            _commGroupNetworkHandler = new CommunicationGroupNetworkObserver();
+            _commGroupNetworkHandler = commGroupNetworkHandler;
             _mpiNetworkHandler.Register(groupName, _commGroupNetworkHandler);
 
             // Deserialize operator configuration and store each injector.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
index e61e14b..20f17ec 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -19,8 +19,12 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
+using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -29,15 +33,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// </summary>
     public class CommunicationGroupNetworkObserver : 
ICommunicationGroupNetworkObserver
     {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
         private readonly Dictionary<string, 
IObserver<GroupCommunicationMessage>> _handlers;
-            
+        private readonly int _retryCount;
+        private readonly int _sleepTime;
+
         /// <summary>
         /// Creates a new CommunicationGroupNetworkObserver.
         /// </summary>
         [Inject]
-        public CommunicationGroupNetworkObserver()
+        public CommunicationGroupNetworkObserver(
+            
[Parameter(typeof(MpiConfigurationOptions.RetryCountWaitingForHanler))] int 
retryCount,
+            
[Parameter(typeof(MpiConfigurationOptions.SleepTimeWaitingForHandler))] int 
sleepTime)
         {
             _handlers = new Dictionary<string, 
IObserver<GroupCommunicationMessage>>();
+            _retryCount = retryCount;
+            _sleepTime = sleepTime;
         }
 
         /// <summary>
@@ -72,13 +83,46 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             string operatorName = message.OperatorName;
 
-            IObserver<GroupCommunicationMessage> handler;
-            if (!_handlers.TryGetValue(operatorName, out handler))
+            IObserver<GroupCommunicationMessage> handler = 
GetOperatorHandler(operatorName, _retryCount, _sleepTime);
+
+            if (handler == null)
             {
-                throw new ArgumentException("No handler registered with the 
operator name: " + operatorName);
+                Exceptions.Throw(new ArgumentException("No handler registered 
with the operator name: " + operatorName), LOGGER);
             }
+            else
+            {
+                handler.OnNext(message);
+            }
+        }
 
-            handler.OnNext(message);
+        /// <summary>
+        /// GetOperatorHandler for operatorName
+        /// </summary>
+        /// <param name="operatorName"></param>
+        /// <param name="retry"></param>
+        /// <param name="sleepTime"></param>
+        /// <returns></returns>
+        private IObserver<GroupCommunicationMessage> GetOperatorHandler(string 
operatorName, int retry, int sleepTime)
+        {
+            //registration of handler might be delayed while the Network 
Service has received message from other servers
+            for (int i = 0; i < retry; i++)
+            {
+                if (!_handlers.ContainsKey(operatorName))
+                {
+                    LOGGER.Log(Level.Info, "handler for operator {0} has not 
been registered." + operatorName);
+                    Thread.Sleep(sleepTime);
+                }
+                else
+                {
+                    IObserver<GroupCommunicationMessage> handler;
+                    if (!_handlers.TryGetValue(operatorName, out handler))
+                    {
+                        Exceptions.Throw(new ArgumentException("No handler 
registered yet with the operator name: " + operatorName), LOGGER);
+                    }
+                    return handler;
+                }
+            }
+            return null;
         }
 
         public void OnError(Exception error)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
index 2224c04..d3e74f8 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
@@ -53,7 +53,7 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
         [TestMethod]
         public void TestBroadcastAndReduce()
         {
-            int numTasks = 4;
+            int numTasks = 9;
 
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
                 DriverBridgeConfiguration.ConfigurationModule

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9da895cb/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
index 00ce09b..743bac5 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
@@ -184,10 +184,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
                     if (message == null)
                     {
+                        LOGGER.Log(Level.Error, "ProcessClient, no message 
received, break." + link.RemoteEndpoint + " - " + link.LocalEndpoint);
                         break;
                     }
                 }
+                LOGGER.Log(Level.Error, "ProcessClient close the Link. 
IsCancellationRequested: " + token.IsCancellationRequested);
             }
         }
     }
-}
+}
\ No newline at end of file

Reply via email to