Repository: incubator-reef
Updated Branches:
  refs/heads/master 13ffefeab -> 60b9eb54b


[REEF-479] Introduce time mesuarements in BroadcastReduce and 
PipelineBroadcadtReduce in Network.Examples.Client

This addressed the issue by
  * Introducing time measurements using StopWatch in master and slave tasks.
  * Making the intermediate computations between broadcast and reduce minimal 
so as to measure the time of round trip correctly.

JIRA:
  [479](https://issues.apache.org/jira/browse/REEF-479)

This Closes #299

Author:    Dhruv <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/60b9eb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/60b9eb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/60b9eb54

Branch: refs/heads/master
Commit: 60b9eb54bf47b6a0285d98276abb82eb703135b5
Parents: 13ffefe
Author: Dhruv <[email protected]>
Authored: Thu Jul 16 13:18:14 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Fri Jul 17 15:28:36 2015 -0700

----------------------------------------------------------------------
 .../BroadcastReduceDriverAndTasks/MasterTask.cs | 31 +++++++++++++--
 .../BroadcastReduceDriverAndTasks/SlaveTask.cs  | 26 +++++++++++--
 .../PipelinedMasterTask.cs                      | 41 +++++++++++++++-----
 .../PipelinedSlaveTask.cs                       | 38 +++++++++++++++---
 4 files changed, 114 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
index 21d23f8..02fc403 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
@@ -18,6 +18,7 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Linq;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -29,7 +30,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
 {
     public class MasterTask : ITask
     {
-        private static readonly Logger _logger = 
Logger.GetLogger(typeof(MasterTask));
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(MasterTask));
 
         private readonly int _numIters;
         private readonly int _numReduceSenders;
@@ -45,7 +46,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             [Parameter(typeof(GroupTestConfig.NumEvaluators))] int 
numEvaluators,
             IGroupCommClient groupCommClient)
         {
-            _logger.Log(Level.Info, "Hello from master task");
+            Logger.Log(Level.Info, "Hello from master task");
             _numIters = numIters;
             _numReduceSenders = numEvaluators - 1;
             _groupCommClient = groupCommClient;
@@ -57,20 +58,42 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
 
         public byte[] Call(byte[] memento)
         {
+            Stopwatch broadcastTime = new Stopwatch();
+            Stopwatch reduceTime = new Stopwatch();
+
             for (int i = 1; i <= _numIters; i++)
             {
+                if (i == 2)
+                {
+                    broadcastTime.Reset();
+                    reduceTime.Reset();
+                }
+
+                broadcastTime.Start();
                 // Each slave task calculates the nth triangle number
                 _broadcastSender.Send(i);
-                
+                broadcastTime.Stop();
+
+                reduceTime.Start();
                 // Sum up all of the calculated triangle numbers
                 int sum = _sumReducer.Reduce();
-                _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", 
sum, i);
+                reduceTime.Stop();
+
+                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", 
sum, i);
 
                 int expected = TriangleNumber(i) * _numReduceSenders;
                 if (sum != TriangleNumber(i) * _numReduceSenders)
                 {
                     throw new Exception("Expected " + expected + " but got " + 
sum);
                 }
+
+                if (i >= 2)
+                {
+                    Logger.Log(Level.Info,
+                        string.Format("Average time (milliseconds) taken for 
broadcast: {0} and reduce: {1}",
+                            broadcastTime.ElapsedMilliseconds/((double) (i - 
1)),
+                            reduceTime.ElapsedMilliseconds/((double) (i - 
1))));
+                }
             }
 
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
index 6db3a5c..11f8630 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System.Diagnostics;
 using System.Linq;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -28,7 +29,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
 {
     public class SlaveTask : ITask
     {
-        private static readonly Logger _logger = 
Logger.GetLogger(typeof(SlaveTask));
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(SlaveTask));
 
         private readonly int _numIterations;
         private readonly IGroupCommClient _groupCommClient;
@@ -41,7 +42,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters,
             IGroupCommClient groupCommClient)
         {
-            _logger.Log(Level.Info, "Hello from slave task");
+            Logger.Log(Level.Info, "Hello from slave task");
 
             _numIterations = numIters;
             _groupCommClient = groupCommClient;
@@ -52,16 +53,33 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
 
         public byte[] Call(byte[] memento)
         {
+            Stopwatch broadcastTime = new Stopwatch();
+            Stopwatch reduceTime = new Stopwatch();
+
             for (int i = 0; i < _numIterations; i++)
             {
+                broadcastTime.Start();
                 // Receive n from Master Task
                 int n = _broadcastReceiver.Receive();
-                _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", n);
+                broadcastTime.Stop();
+
+                Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", n);
 
                 // Calculate the nth Triangle number and send it back to driver
                 int triangleNum = TriangleNumber(n);
-                _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
+                Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
+                
+                reduceTime.Start();
                 _triangleNumberSender.Send(triangleNum);
+                reduceTime.Stop();
+                
+                if (i >= 1)
+                {
+                    Logger.Log(Level.Info,
+                        string.Format("Average time (milliseconds) taken for 
broadcast: {0} and reduce: {1}",
+                            broadcastTime.ElapsedMilliseconds / ((double)i),
+                            reduceTime.ElapsedMilliseconds / ((double)i)));
+                }
             }
 
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
index 4656fd9..2259cdb 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
@@ -18,6 +18,7 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Linq;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -63,26 +64,48 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         {
             int[] intArr = new int[_arraySize];
 
+            for (int j = 0; j < _arraySize; j++)
+            {
+                intArr[j] = j;
+            }
+
+            Stopwatch broadcastTime = new Stopwatch();
+            Stopwatch reduceTime = new Stopwatch();
+
             for (int i = 1; i <= _numIters; i++)
             {
-                for (int j = 0; j < _arraySize; j++)
+                intArr[0] = i;
+
+                if (i == 2)
                 {
-                    intArr[j] = i;
+                    broadcastTime.Reset();
+                    reduceTime.Reset();
                 }
 
+                broadcastTime.Start();
                 _broadcastSender.Send(intArr);
+                broadcastTime.Stop();
+
+                reduceTime.Start();
                 int[] sum = _sumReducer.Reduce();
+                reduceTime.Stop();
+
+                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1} 
with array length: {2}", sum[0], i,
+                    sum.Length);
 
-                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1} 
with array length: {2}", sum[0], i, sum.Length);
+                int expected = TriangleNumber(i)*_numReduceSenders;
 
-                int expected = TriangleNumber(i) * _numReduceSenders;
+                if (sum[0] != TriangleNumber(i)*_numReduceSenders)
+                {
+                    throw new Exception("Expected " + expected + " but got " + 
sum[0]);
+                }
 
-                for (int j = 0; j < intArr.Length; j++)
+                if (i >= 2)
                 {
-                    if (sum[j] != TriangleNumber(i) * _numReduceSenders)
-                    {
-                        throw new Exception("Expected " + expected + " but got 
" + sum);
-                    }
+                    Logger.Log(Level.Info,
+                        string.Format("Average time (milliseconds) taken for 
broadcast: {0} and reduce: {1}",
+                            broadcastTime.ElapsedMilliseconds/((double) (i - 
1)),
+                            reduceTime.ElapsedMilliseconds/((double) (i - 
1))));
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
index 547df28..5ec77bb 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System.Diagnostics;
 using System.Linq;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -35,14 +36,17 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         private readonly ICommunicationGroupClient _commGroup;
         private readonly IBroadcastReceiver<int[]> _broadcastReceiver;
         private readonly IReduceSender<int[]> _triangleNumberSender;
+        private readonly int _arraySize;
 
         [Inject]
         public PipelinedSlaveTask(
             [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters,
+            [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize,
             IGroupCommClient groupCommClient)
         {
             Logger.Log(Level.Info, "Hello from slave task");
 
+            _arraySize = arraySize;
             _numIterations = numIters;
             _groupCommClient = groupCommClient;
             _commGroup = 
_groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName);
@@ -52,25 +56,49 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
         public byte[] Call(byte[] memento)
         {
+            int[] resArr = new int[_arraySize];
+
+            for (int j = 0; j < resArr.Length; j++)
+            {
+                resArr[j] = j;
+            }
+
+            Stopwatch broadcastTime = new Stopwatch();
+            Stopwatch reduceTime = new Stopwatch();
+
             for (int i = 0; i < _numIterations; i++)
             {
+                if (i == 1)
+                {
+                    broadcastTime.Reset();
+                    reduceTime.Reset();
+                }
+
+                broadcastTime.Start();
                 // Receive n from Master Task
                 int[] intVec = _broadcastReceiver.Receive();
+                broadcastTime.Stop();
 
                 Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", intVec[0]);
 
                 // Calculate the nth Triangle number and send it back to driver
                 int triangleNum = TriangleNumber(intVec[0]);
+
                 Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
 
-                int[] resArr = new int[intVec.Length];
+                resArr[0] = triangleNum;
+
+                reduceTime.Start();
+                _triangleNumberSender.Send(resArr);
+                reduceTime.Stop();
 
-                for (int j = 0; j < resArr.Length; j++)
+                if (i >= 1)
                 {
-                    resArr[j] = triangleNum;
+                    Logger.Log(Level.Info,
+                        string.Format("Average time (milliseconds) taken for 
broadcast: {0} and reduce: {1}",
+                            broadcastTime.ElapsedMilliseconds/((double) i),
+                            reduceTime.ElapsedMilliseconds/((double) i)));
                 }
-
-                _triangleNumberSender.Send(resArr);
             }
 
             return null;

Reply via email to