Repository: incubator-reef
Updated Branches:
  refs/heads/master 940752b9f -> 2a1565ff3


[REEF-174]  Use type instead of instance for Codec in MpiDriver

Currently, a Codec instance is passed to the specifications in
MpiDriver. What we really need is the type that will be bound to Tang
Configuration. This change is to pass generic type into MdiDriver and
Specification instead so that any type of ICodec can be passed in. At
the meantime, we limit the type to be an instance of ICodec to ensure
correct type is passed. If not, catching the error at build time.
IOperatorSpec, Itopology, ICommunicationGroupDriver API and
implementations are modified accordingly.

Test cases are updated as well.

JIRA:
  [REEF-174](https://issues.apache.org/jira/browse/REEF-174)

Pull Request:
  This closes #132


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2a1565ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2a1565ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2a1565ff

Branch: refs/heads/master
Commit: 2a1565ff3c44a97ee4379e2e793e438d62d54460
Parents: 940752b
Author: Julia Wang <[email protected]>
Authored: Tue Mar 31 17:48:59 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Apr 6 16:19:59 2015 -0700

----------------------------------------------------------------------
 .../KMeans/KMeansDriverHandlers.cs              |   6 +-
 .../Group/Driver/ICommunicationGroupDriver.cs   |  50 ++----
 .../Driver/Impl/CommunicationGroupDriver.cs     | 161 +++++--------------
 .../Group/Operators/IOperatorSpec.cs            |   7 +-
 .../Operators/Impl/BroadcastOperatorSpec.cs     |   9 +-
 .../Group/Operators/Impl/ReduceOperatorSpec.cs  |  12 +-
 .../Group/Operators/Impl/ScatterOperatorSpec.cs |   9 +-
 .../Group/Topology/FlatTopology.cs              |  34 ++--
 .../Group/Topology/ITopology.cs                 |   5 +-
 .../Group/Topology/TreeTopology.cs              |  34 ++--
 .../BroadcastReduceDriver.cs                    |   8 +-
 .../ScatterReduceTest/ScatterReduceDriver.cs    |   6 +-
 .../Network/GroupCommunicationTests.cs          |  40 +++--
 .../GroupCommunicationTreeTopologyTests.cs      |  32 ++--
 14 files changed, 147 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index 0c67777..358e3e1 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -82,9 +82,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             _mpiDriver = mpiDriver;
 
             _commGroup = _mpiDriver.DefaultGroup
-                   
.AddBroadcast(Constants.CentroidsBroadcastOperatorName,Constants.MasterTaskId, 
new CentroidsCodec())
-                   
.AddBroadcast(Constants.ControlMessageBroadcastOperatorName, 
Constants.MasterTaskId, new ControlMessageCodec())
-                   .AddReduce(Constants.MeansReduceOperatorName, 
Constants.MasterTaskId, new ProcessedResultsCodec(), new 
KMeansMasterTask.AggregateMeans())
+                   .AddBroadcast<Centroids, 
CentroidsCodec>(Constants.CentroidsBroadcastOperatorName, 
Constants.MasterTaskId)
+                   .AddBroadcast<ControlMessage, 
ControlMessageCodec>(Constants.ControlMessageBroadcastOperatorName, 
Constants.MasterTaskId)
+                   .AddReduce<ProcessedResults, 
ProcessedResultsCodec>(Constants.MeansReduceOperatorName, 
Constants.MasterTaskId, new KMeansMasterTask.AggregateMeans())
                    .Build();
 
             _mpiTaskStarter = new TaskStarter(_mpiDriver, _totalEvaluators);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
index 5a857e0..0fa4aae 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -41,28 +41,17 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
-        /// <param name="operatorName">The name of the broadcast 
operator</param>
-        /// <param name="spec">The specification that defines the Broadcast 
operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
-        [System.Obsolete("use AddBroadcast<T>(string operatorName, string 
masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat)")]
-        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, 
BroadcastOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
-
-        /// <summary>
-        /// Adds the Broadcast MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
-        /// <param name="codecType">The Codec used for serialization</param>
         /// <param name="topologyType">The topology type for the 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
-        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, string 
masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat);
+        ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string 
operatorName, string masterTaskId, TopologyTypes topologyType = 
TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
 
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group. 
Default to IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
         /// <param name="topologyType">The topology type for the 
operator</param>
@@ -72,46 +61,35 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Adds the Reduce MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="spec">The specification that defines the Reduce 
operator</param>
+        /// <param name="masterTaskId">The master task id for the 
typology</param>
+        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
+        /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
-        [System.Obsolete("use AddReduce<T>(string operatorName, string 
masterTaskId, ICodec<T> codecType, IReduceFunction<T> reduceFunction, 
TopologyTypes topologyType = TopologyTypes.Flat)")]
-        ICommunicationGroupDriver AddReduce<T>(string operatorName, 
ReduceOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string 
operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, 
TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : 
ICodec<TMessage>;
 
         /// <summary>
-        /// Adds the Reduce MPI operator to the communication group.
+        /// Adds the Reduce MPI operator to the communication group with 
default IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="masterTaskId">The master task id for the 
typology</param>
-        /// <param name="codecType">The codec used for serializing 
messages.</param>
         /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
-        ICommunicationGroupDriver AddReduce<T>(string operatorName, string 
masterTaskId, ICodec<T> codecType, IReduceFunction<T> reduceFunction, 
TopologyTypes topologyType = TopologyTypes.Flat);
-
-        /// <summary>
-        /// Adds the Scatter MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
-        /// <param name="operatorName">The name of the scatter operator</param>
-        /// <param name="spec">The specification that defines the Scatter 
operator</param>
-        /// <param name="topologyType">type of topology used in the 
operaor</param>
-        /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
-        [System.Obsolete("use AddScatter<T>(string operatorName, string 
senderId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat)")]
-        ICommunicationGroupDriver AddScatter<T>(string operatorName, 
ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddReduce(string operatorName, string 
masterTaskId, IReduceFunction<int> reduceFunction, TopologyTypes topologyType = 
TopologyTypes.Flat);
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
-        /// <param name="codecType">The codec used for serializing 
messages.</param>
         /// <param name="topologyType">type of topology used in the 
operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
-        ICommunicationGroupDriver AddScatter<T>(string operatorName, string 
senderId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddScatter<TMessage, TMessageCodec>(string 
operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat) 
where TMessageCodec : ICodec<TMessage>;
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group with 
default Codec

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 154a0f5..6c07598 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -91,68 +91,32 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public List<string> TaskIds { get; private set; }
 
         /// <summary>
-        /// Adds the Broadcast MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
-        /// <param name="operatorName">The name of the broadcast 
operator</param>
-        /// <param name="spec">The specification that defines the Broadcast 
operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
-        [System.Obsolete("use AddBroadcast<T>(string operatorName, string 
masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat)")]
-        public ICommunicationGroupDriver AddBroadcast<T>(
-            string operatorName,
-            BroadcastOperatorSpec<T> spec,
-            TopologyTypes topologyType = TopologyTypes.Flat)
-        {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the 
spec has been built.");
-            }
-
-            ITopology<T> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec,
-                    _fanOut);
-            }
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
-        }
-
-        /// <summary>
-        /// Adds the Broadcast MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
-        /// <param name="codecType">The Codec used for serialization</param>
         /// <param name="topologyType">The topology type for the 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
-        public ICommunicationGroupDriver AddBroadcast<T>(string operatorName, 
string masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat)
+        /// <returns></returns>
+        public ICommunicationGroupDriver AddBroadcast<TMessage, 
TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes 
topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the 
spec has been built.");
             }
 
-            var spec = new BroadcastOperatorSpec<T>(
-                masterTaskId,
-                codecType);
+            var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>(
+                masterTaskId);
 
-            ITopology<T> topology;
+            ITopology<TMessage, TMessageCodec> topology;
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec);
+                topology = new FlatTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec,
+                topology = new TreeTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
                     _fanOut);
             }
 
@@ -165,7 +129,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group. 
Default to IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
         /// <param name="topologyType">The topology type for the 
operator</param>
@@ -173,45 +136,43 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public ICommunicationGroupDriver AddBroadcast(string operatorName, 
string masterTaskId,
             TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddBroadcast(operatorName, masterTaskId, new IntCodec(), 
topologyType);
+            return AddBroadcast<int,IntCodec>(operatorName, masterTaskId, 
topologyType);
         }
 
         /// <summary>
         /// Adds the Reduce MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="masterTaskId">The master task id for the 
typology</param>
-        /// <param name="codecType">The codec used for serializing 
messages.</param>
         /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
-        public ICommunicationGroupDriver AddReduce<T>(
+        public ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(
             string operatorName,
             string masterTaskId,
-            ICodec<T> codecType,
-            IReduceFunction<T> reduceFunction,
-            TopologyTypes topologyType = TopologyTypes.Flat)
+            IReduceFunction<TMessage> reduceFunction,
+            TopologyTypes topologyType = TopologyTypes.Flat) where 
TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the 
spec has been built.");
             }
 
-            var spec = new ReduceOperatorSpec<T>(
+            var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>(
                 masterTaskId,
-                codecType,
                 reduceFunction);
 
-            ITopology<T> topology;
+            ITopology<TMessage, TMessageCodec> topology;
 
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.ReceiverId, _driverId, spec);
+                topology = new FlatTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.ReceiverId, _driverId, spec,
+                topology = new TreeTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
                     _fanOut);
             }
 
@@ -222,99 +183,49 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
-        /// Adds the Reduce MPI operator to the communication group.
+        /// Adds the Reduce MPI operator to the communication group with 
default IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="spec">The specification that defines the Reduce 
operator</param>
+        /// <param name="masterTaskId">The master task id for the 
typology</param>
+        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
+        /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
-        [System.Obsolete("use AddReduce<T>(string operatorName, string 
masterTaskId, ICodec<T> codecType, IReduceFunction<T> reduceFunction, 
TopologyTypes topologyType = TopologyTypes.Flat)")]
-        public ICommunicationGroupDriver AddReduce<T>(
+        public ICommunicationGroupDriver AddReduce(
             string operatorName,
-            ReduceOperatorSpec<T> spec,
+            string masterTaskId,
+            IReduceFunction<int> reduceFunction,
             TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the 
spec has been built.");
-            }
-
-            ITopology<T> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.ReceiverId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.ReceiverId, _driverId, spec,
-                    _fanOut);
-            }
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
-        }
-
-        /// <summary>
-        /// Adds the Scatter MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
-        /// <param name="operatorName">The name of the scatter operator</param>
-        /// <param name="spec">The specification that defines the Scatter 
operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
-        [System.Obsolete("use AddScatter<T>(string operatorName, string 
senderId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat)")]
-        public ICommunicationGroupDriver AddScatter<T>(string operatorName, 
ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat)
-        {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the 
spec has been built.");
-            }
-
-            ITopology<T> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec,
-                    _fanOut);
-            }
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
+            return AddReduce<int, IntCodec>(operatorName, masterTaskId, 
reduceFunction, topologyType);
         }
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
-        /// <param name="codecType">The codec used for serializing 
messages.</param>
         /// <param name="topologyType">type of topology used in the 
operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
-        public ICommunicationGroupDriver AddScatter<T>(string operatorName, 
string senderId, ICodec<T> codecType, TopologyTypes topologyType = 
TopologyTypes.Flat)
+        public ICommunicationGroupDriver AddScatter<TMessage, 
TMessageCodec>(string operatorName, string senderId, TopologyTypes topologyType 
= TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the 
spec has been built.");
             }
 
-            var spec = new ScatterOperatorSpec<T>(senderId, codecType);
+            var spec = new ScatterOperatorSpec<TMessage, 
TMessageCodec>(senderId);
 
-            ITopology<T> topology;
+            ITopology<TMessage, TMessageCodec> topology;
 
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec);
+                topology = new FlatTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec,
+                topology = new TreeTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
                     _fanOut);
             }
             _topologies[operatorName] = topology;
@@ -324,7 +235,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
-        /// Adds the Scatter MPI operator to the communication group with 
default Codec
+        /// Adds the Scatter MPI operator to the communication group with 
default IntCodec
         /// </summary>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
@@ -332,7 +243,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
         public ICommunicationGroupDriver AddScatter(string operatorName, 
string senderId, TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddScatter(operatorName, senderId, new IntCodec(), 
topologyType);
+            return AddScatter<int, IntCodec>(operatorName, senderId, 
topologyType);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
index b1c119c..4a1bfd7 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators
@@ -24,11 +25,11 @@ namespace Org.Apache.REEF.Network.Group.Operators
     /// <summary>
     /// The specification used to define Broadcast Operators.
     /// </summary>
-    public interface IOperatorSpec<T>
+    public interface IOperatorSpec<T1, T2> where T2 : ICodec<T1>
     {
         /// <summary>
-        /// Returns the codec used to serialize and deserialize messages.
+        /// Returns the codec type used to serialize and deserialize messages.
         /// </summary>
-        ICodec<T> Codec { get; }
+        Type Codec { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
index 904e4ef..15a4374 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
@@ -24,17 +25,17 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// <summary>
     /// The specification used to define Broadcast Operators.
     /// </summary>
-    public class BroadcastOperatorSpec<T> : IOperatorSpec<T>
+    public class BroadcastOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where 
T2 : ICodec<T1>
     {
         /// <summary>
         /// Create a new BroadcastOperatorSpec.
         /// </summary>
         /// <param name="senderId">The identifier of the root sending 
Task.</param>
         /// <param name="codecType">The codec used to serialize 
messages.</param>
-        public BroadcastOperatorSpec(string senderId, ICodec<T> codecType)
+        public BroadcastOperatorSpec(string senderId)
         {
             SenderId = senderId;
-            Codec = codecType;
+            Codec = typeof(T2);
         }
 
         /// <summary>
@@ -45,6 +46,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Returns the ICodec used to serialize messages.
         /// </summary>
-        public ICodec<T> Codec { get; private set; }
+        public Type Codec { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
index 37b6ce7..f72cea5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
@@ -24,7 +25,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// <summary>
     /// The specification used to define Reduce MPI Operators.
     /// </summary>
-    public class ReduceOperatorSpec<T> : IOperatorSpec<T>
+    public class ReduceOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 : 
ICodec<T1>
     {
         /// <summary>
         /// Creates a new ReduceOperatorSpec.
@@ -35,11 +36,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
         public ReduceOperatorSpec(
             string receiverId, 
-            ICodec<T> codec, 
-            IReduceFunction<T> reduceFunction)
+            IReduceFunction<T1> reduceFunction)
         {
             ReceiverId = receiverId;
-            Codec = codec;
+            Codec = typeof(T2);
             ReduceFunction = reduceFunction;
         }
 
@@ -52,11 +52,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// The codec used to serialize and deserialize messages.
         /// </summary>
-        public ICodec<T> Codec { get; private set; }
+        public Type Codec { get; private set; }
 
         /// <summary>
         /// The class used to aggregate incoming messages.
         /// </summary>
-        public IReduceFunction<T> ReduceFunction { get; private set; } 
+        public IReduceFunction<T1> ReduceFunction { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
index 57cd3a9..158a6c5 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
@@ -24,7 +25,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// <summary>
     /// The specification used to define Scatter MPI Operators.
     /// </summary>
-    public class ScatterOperatorSpec<T> : IOperatorSpec<T>
+    public class ScatterOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 
: ICodec<T1>
     {
         /// <summary>
         /// Creates a new ScatterOperatorSpec.
@@ -33,10 +34,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// be sending messages</param>
         /// <param name="codec">The codec used to serialize and 
         /// deserialize messages</param>
-        public ScatterOperatorSpec(string senderId, ICodec<T> codec)
+        public ScatterOperatorSpec(string senderId)
         {
             SenderId = senderId;
-            Codec = codec;
+            Codec = typeof(T2);
         }
 
         /// <summary>
@@ -48,6 +49,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// The codec used to serialize and deserialize messages.
         /// </summary>
-        public ICodec<T> Codec { get; private set; }
+        public Type Codec { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index 892c82c..83990d7 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -34,7 +34,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
     /// nodes: the root and all children extending from the root.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class FlatTopology<T> : ITopology<T>
+    public class FlatTopology<T1, T2> : ITopology<T1, T2> where T2 : ICodec<T1>
     {
         private readonly string _groupName;
         private readonly string _operatorName;
@@ -58,7 +58,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             string groupName, 
             string rootId,
             string driverId,
-            IOperatorSpec<T> operatorSpec)
+            IOperatorSpec<T1, T2> operatorSpec)
         {
             _groupName = groupName;
             _operatorName = operatorName;
@@ -73,7 +73,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
         /// <summary>
         /// Gets the Operator specification
         /// </summary>
-        public IOperatorSpec<T> OperatorSpec { get; set; }
+        public IOperatorSpec<T1, T2> OperatorSpec { get; set; }
 
         /// <summary>
         /// Gets the task configuration for the operator topology.
@@ -83,7 +83,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
         public IConfiguration GetTaskConfiguration(string taskId)
         {
             var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation(typeof(ICodec<T>), 
OperatorSpec.Codec.GetType())
+                .BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
                 
.BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
                     
GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
                     _rootId);
@@ -101,42 +101,42 @@ namespace Org.Apache.REEF.Network.Group.Topology
                 }
             }
 
-            if (OperatorSpec is BroadcastOperatorSpec<T>)
+            if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
             {
-                BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T>;
+                BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T1, T2>;
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<BroadcastSender<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<BroadcastSender<T1>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<BroadcastReceiver<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<BroadcastReceiver<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ReduceOperatorSpec<T>)
+            else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
             {
-                ReduceOperatorSpec<T> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T>;
-                confBuilder.BindImplementation(typeof(IReduceFunction<T>), 
reduceSpec.ReduceFunction.GetType());
+                ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T1, T2>;
+                confBuilder.BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType());
                 
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ReduceReceiver<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ReduceReceiver<T1>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ReduceSender<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ReduceSender<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ScatterOperatorSpec<T>)
+            else if (OperatorSpec is ScatterOperatorSpec<T1, T2>)
             {
-                ScatterOperatorSpec<T> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T>;
+                ScatterOperatorSpec<T1, T2> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T1, T2>;
                 if (taskId.Equals(scatterSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ScatterSender<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ScatterSender<T1>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ScatterReceiver<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ScatterReceiver<T1>>.Class);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
index 32fe5cc..3f15318 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
@@ -19,15 +19,16 @@
 
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
     /// <summary>
     /// Represents a topology graph for IMpiOperators.
     /// </summary>
-    public interface ITopology<T>
+    public interface ITopology<T1, T2> where T2 : ICodec<T1>
     {
-        IOperatorSpec<T> OperatorSpec { get; }
+        IOperatorSpec<T1, T2> OperatorSpec { get; }
 
         IConfiguration GetTaskConfiguration(string taskId);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
index 50b2636..bc324cf 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -29,7 +29,7 @@ using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
-    public class TreeTopology<T> : ITopology<T>
+    public class TreeTopology<T1, T2> : ITopology<T1, T2> where T2 : ICodec<T1>
     {
         private readonly string _groupName;
         private readonly string _operatorName;
@@ -58,7 +58,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             string groupName, 
             string rootId,
             string driverId,
-            IOperatorSpec<T> operatorSpec,
+            IOperatorSpec<T1, T2> operatorSpec,
             int fanOut)
         {
             _groupName = groupName;
@@ -72,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             _nodes = new Dictionary<string, TaskNode>(); 
         }
 
-        public IOperatorSpec<T> OperatorSpec { get; set; }
+        public IOperatorSpec<T1, T2> OperatorSpec { get; set; }
 
         /// <summary>
         /// Gets the task configuration for the operator topology.
@@ -105,7 +105,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
 
             //add parentid, if no parent, add itself
             var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation(typeof(ICodec<T>), 
OperatorSpec.Codec.GetType())
+                .BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
                 
.BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
                     
GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
                     parentId);
@@ -118,42 +118,42 @@ namespace Org.Apache.REEF.Network.Group.Topology
                     childNode.TaskId);
             }
 
-            if (OperatorSpec is BroadcastOperatorSpec<T>)
+            if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
             {
-                BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T>;
+                BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T1, T2>;
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<BroadcastSender<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<BroadcastSender<T1>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<BroadcastReceiver<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<BroadcastReceiver<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ReduceOperatorSpec<T>)
+            else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
             {
-                ReduceOperatorSpec<T> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T>;
-                confBuilder.BindImplementation(typeof(IReduceFunction<T>), 
reduceSpec.ReduceFunction.GetType());
+                ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T1, T2>;
+                confBuilder.BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType());
 
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ReduceReceiver<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ReduceReceiver<T1>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ReduceSender<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ReduceSender<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ScatterOperatorSpec<T>)
+            else if (OperatorSpec is ScatterOperatorSpec<T1, T2>)
             {
-                ScatterOperatorSpec<T> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T>;
+                ScatterOperatorSpec<T1, T2> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T1, T2>;
                 if (taskId.Equals(scatterSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ScatterSender<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ScatterSender<T1>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ScatterReceiver<T>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<ScatterReceiver<T1>>.Class);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
index ed3e7b5..8cc32b8 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
@@ -65,14 +65,12 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
             _numIterations = numIterations;
             _mpiDriver = mpiDriver;
             _commGroup = _mpiDriver.DefaultGroup
-                    .AddBroadcast(
+                    .AddBroadcast<int, IntCodec>(
                         MpiTestConstants.BroadcastOperatorName,
-                       MpiTestConstants.MasterTaskId,
-                            new IntCodec())
-                    .AddReduce(
+                       MpiTestConstants.MasterTaskId)
+                    .AddReduce<int, IntCodec>(
                         MpiTestConstants.ReduceOperatorName,
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec(), 
                             new SumFunction())
                     .Build();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
index 6029bfe..a71e886 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
@@ -63,15 +63,13 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
             _numEvaluators = numEvaluators;
             _mpiDriver = mpiDriver; 
             _commGroup = _mpiDriver.DefaultGroup
-                    .AddScatter(
+                    .AddScatter<int, IntCodec>(
                         MpiTestConstants.ScatterOperatorName,
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec(),
                             TopologyTypes.Tree)
-                    .AddReduce(
+                    .AddReduce<int, IntCodec>(
                         MpiTestConstants.ReduceOperatorName,
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec(), 
                             new SumFunction())
                     .Build();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs 
b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
index 2dc9445..5f931f2 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
@@ -102,14 +102,12 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast<int>(
+                .AddBroadcast<int, IntCodec>(
                     broadcastOperatorName,
-                    masterTaskId,
-                    new IntCodec())
-                .AddReduce<int>(
+                    masterTaskId)
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                     masterTaskId,
-                    new IntCodec(),
                     new SumFunction())
                 .Build();
 
@@ -159,14 +157,12 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter<int>(
+                .AddScatter<int, IntCodec>(
                     scatterOperatorName,
-                    masterTaskId,
-                    new IntCodec())
-                .AddReduce(
+                    masterTaskId)
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                         masterTaskId,
-                        new IntCodec(),
                         new SumFunction())
                 .Build();
 
@@ -224,7 +220,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast(operatorName, masterTaskId, new IntCodec())
+                .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -292,7 +288,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-              .AddBroadcast(operatorName, masterTaskId, new IntCodec())
+              .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
               .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -331,7 +327,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddReduce(operatorName, "task0", new IntCodec(), new 
SumFunction())
+                .AddReduce<int, IntCodec>(operatorName, "task0", new 
SumFunction())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -366,7 +362,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddReduce(operatorName, "task0", new IntCodec(), new 
SumFunction())
+                .AddReduce<int, IntCodec>(operatorName, "task0", new 
SumFunction())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -410,7 +406,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -488,7 +484,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -538,7 +534,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -585,7 +581,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, 
masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -622,8 +618,8 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestConfigurationBroadcastSpec()
         {
-            FlatTopology<int> topology = new FlatTopology<int>("Operator", 
"Operator", "task1", "driverid",
-                new BroadcastOperatorSpec<int>("Sender", new IntCodec()));
+            FlatTopology<int, IntCodec> topology = new FlatTopology<int, 
IntCodec>("Operator", "Operator", "task1", "driverid",
+                new BroadcastOperatorSpec<int, IntCodec>("Sender"));
 
             topology.AddTask("task1");
             var conf = topology.GetTaskConfiguration("task1");
@@ -635,8 +631,8 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestConfigurationReduceSpec()
         {
-            FlatTopology<int> topology = new FlatTopology<int>("Operator", 
"Group", "task1", "driverid",
-                new ReduceOperatorSpec<int>("task1", new IntCodec(), new 
SumFunction()));
+            FlatTopology<int, IntCodec> topology = new FlatTopology<int, 
IntCodec>("Operator", "Group", "task1", "driverid",
+                new ReduceOperatorSpec<int, IntCodec>("task1", new 
SumFunction()));
 
             topology.AddTask("task1");
             var conf2 = topology.GetTaskConfiguration("task1");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs 
b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
index e74c506..667e45f 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
@@ -34,8 +34,8 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestTreeTopology()
         {
-            TreeTopology<int> topology = new TreeTopology<int>("Operator", 
"Operator", "task1", "driverid",
-                new BroadcastOperatorSpec<int>("task1", new IntCodec()), 2);
+            TreeTopology<int, IntCodec> topology = new TreeTopology<int, 
IntCodec>("Operator", "Operator", "task1", "driverid",
+                new BroadcastOperatorSpec<int, IntCodec>("task1"), 2);
             for (int i = 1; i < 8; i++)
             {
                 string taskid = "task" + i;
@@ -61,7 +61,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddReduce(operatorName, masterTaskId, new IntCodec(), new 
SumFunction(), TopologyTypes.Tree)
+                .AddReduce<int, IntCodec>(operatorName, masterTaskId, new 
SumFunction(), TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -117,7 +117,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
+                .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, 
TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -193,15 +193,13 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast<int>(
+                .AddBroadcast<int, IntCodec>(
                     broadcastOperatorName,
                     masterTaskId,
-                    new IntCodec(), 
                     TopologyTypes.Tree)
-                .AddReduce<int>(
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                     masterTaskId,
-                    new IntCodec(),
                     new SumFunction(),
                     TopologyTypes.Tree)
                 .Build();
@@ -308,7 +306,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, 
TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -353,7 +351,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, 
TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -407,7 +405,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, 
TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -458,7 +456,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, 
TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -510,7 +508,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, 
TopologyTypes.Tree)
                 .Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
@@ -574,16 +572,14 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-              .AddScatter(
+              .AddScatter<int, IntCodec>(
                     scatterOperatorName,
                     masterTaskId,
-                    new IntCodec(), 
                     TopologyTypes.Tree)
-                .AddReduce(
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                     masterTaskId,
-                    new IntCodec(),
-                    new SumFunction(), 
+                    new SumFunction(),
                     TopologyTypes.Tree).Build();
 
             var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);

Reply via email to