Repository: incubator-reef
Updated Branches:
  refs/heads/master 0da6b504b -> 37746a044


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
index dbfd1c7..016455f 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
@@ -21,69 +21,38 @@ using System;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
     /// The specification used to define Reduce Group Communication Operators.
     /// </summary>
-    public class ReduceOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 : 
ICodec<T1>
+    public class ReduceOperatorSpec : IOperatorSpec
     {
         /// <summary>
         /// Creates a new ReduceOperatorSpec.
         /// </summary>
-        /// <param name="receiverId">The identifier of the task that
-        /// will receive and reduce incoming messages.</param>
-        /// <param name="codec">The codec used for serializing 
messages.</param>
-        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
-        public ReduceOperatorSpec(
-            string receiverId, 
-            IReduceFunction<T1> reduceFunction)
-        {
-            ReceiverId = receiverId;
-            Codec = typeof(T2);
-            ReduceFunction = reduceFunction;
-            PipelineDataConverter = new DefaultPipelineDataConverter<T1>();
-        }
-
-        /// <summary>
-        /// Creates a new ReduceOperatorSpec.
-        /// </summary>
-        /// <param name="receiverId">The identifier of the task that
-        /// will receive and reduce incoming messages.</param>
-        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
-        /// <param name="dataConverter">The converter used to convert original
-        /// message to pipelined ones and vice versa.</param>
+        /// <param name="receiverId">The identifier of the task that will 
receive and reduce incoming messages.</param>
+        /// <param name="configurations">The configuration used for Codec, 
ReduceFunction and DataConverter.</param>
         public ReduceOperatorSpec(
             string receiverId,
-            IPipelineDataConverter<T1> dataConverter,
-            IReduceFunction<T1> reduceFunction)
+            params IConfiguration[] configurations)
         {
             ReceiverId = receiverId;
-            Codec = typeof(T2);
-            ReduceFunction = reduceFunction;
-            PipelineDataConverter = dataConverter ?? new 
DefaultPipelineDataConverter<T1>();
+            Configiration = Configurations.Merge(configurations);
         }
 
         /// <summary>
-        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
-        /// </summary>
-        public IPipelineDataConverter<T1> PipelineDataConverter { get; private 
set; }
-
-        /// <summary>
         /// Returns the identifier for the task that receives and reduces
         /// incoming messages.
         /// </summary>
         public string ReceiverId { get; private set; }
 
         /// <summary>
-        /// The codec used to serialize and deserialize messages.
-        /// </summary>
-        public Type Codec { get; private set; }
-
-        /// <summary>
-        /// The class used to aggregate incoming messages.
+        /// Returns the Configuration for Codec, ReduceFunction and 
DataConverter
         /// </summary>
-        public IReduceFunction<T1> ReduceFunction { get; private set; }
+        public IConfiguration Configiration { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
index 7807f3b..4c74483 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
@@ -17,61 +17,36 @@
  * under the License.
  */
 
-using System;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
     /// The specification used to define Scatter Group Communication Operators.
     /// </summary>
-    public class ScatterOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 
: ICodec<T1>
+    public class ScatterOperatorSpec : IOperatorSpec
     {
         /// <summary>
         /// Creates a new ScatterOperatorSpec.
         /// </summary>
-        /// <param name="senderId">The identifier of the task that will
-        /// be sending messages</param>
-        /// <param name="codec">The codec used to serialize and 
-        /// deserialize messages</param>
-        public ScatterOperatorSpec(string senderId)
+        /// <param name="senderId">The identifier of the task that will be 
sending messages</param>
+        /// <param name="configurations">The configuration Codec, 
ReduceFunction and DataConverter used for group communication client</param>
+        public ScatterOperatorSpec(string senderId, params IConfiguration[] 
configurations)
         {
             SenderId = senderId;
-            Codec = typeof(T2);
+            Configiration = Configurations.Merge(configurations);
         }
 
         /// <summary>
-        /// Creates a new ScatterOperatorSpec.
-        /// </summary>
-        /// <param name="senderId">The identifier of the task that will
-        /// be sending messages</param>
-        /// deserialize messages</param>
-        /// <param name="dataConverter">The converter used to convert original
-        /// message to pipelined ones and vice versa.</param>
-        public ScatterOperatorSpec(
-            string senderId,
-            IPipelineDataConverter<T1> dataConverter)
-        {
-            SenderId = senderId;
-            Codec = typeof(T2);
-            PipelineDataConverter = dataConverter;
-        }
-
-        /// <summary>
-        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
-        /// </summary>
-        public IPipelineDataConverter<T1> PipelineDataConverter { get; private 
set; }
-
-        /// <summary>
         /// Returns the identifier for the task that splits and scatters a list
         /// of messages to other tasks.
         /// </summary>
         public string SenderId { get; private set; }
 
         /// <summary>
-        /// The codec used to serialize and deserialize messages.
+        /// Returns the Configuration for Codec, ReduceFunction and 
DataConverter
         /// </summary>
-        public Type Codec { get; private set; }
+        public IConfiguration Configiration { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
index fa0d072..f2dc551 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
@@ -47,12 +47,5 @@ namespace Org.Apache.REEF.Network.Group.Pipelining
         /// <param name="pipelineMessage">The enumerator over received 
pipelined messages</param>
         /// <returns>The full constructed message</returns>
         T FullMessage(List<PipelineMessage<T>> pipelineMessage);
-
-        /// <summary>
-        /// Constructs the configuration of the class. Basically the arguments 
of the class like chunksize
-        /// </summary>
-        /// <returns>The configuration for this data converter class</returns>
-        IConfiguration GetConfiguration();
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
index 5601cb7..fb0e5ad 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
@@ -65,10 +65,5 @@ namespace Org.Apache.REEF.Network.Group.Pipelining.Impl
 
             return pipelineMessage[0].Data;
         }
-
-        public IConfiguration GetConfiguration()
-        {
-            return TangFactory.GetTang().NewConfigurationBuilder().Build();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index d5c84c8..e80dea6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
@@ -35,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
     /// nodes: the root and all children extending from the root.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class FlatTopology<T1, T2> : ITopology<T1, T2> where T2 : ICodec<T1>
+    public class FlatTopology<T> : ITopology<T> 
     {
         private readonly string _groupName;
         private readonly string _operatorName;
@@ -59,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             string groupName, 
             string rootId,
             string driverId,
-            IOperatorSpec<T1, T2> operatorSpec)
+            IOperatorSpec operatorSpec)
         {
             _groupName = groupName;
             _operatorName = operatorName;
@@ -74,7 +75,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
         /// <summary>
         /// Gets the Operator specification
         /// </summary>
-        public IOperatorSpec<T1, T2> OperatorSpec { get; set; }
+        public IOperatorSpec OperatorSpec { get; set; }
 
         /// <summary>
         /// Gets the task configuration for the operator topology.
@@ -83,11 +84,11 @@ namespace Org.Apache.REEF.Network.Group.Topology
         /// <returns>The task configuration</returns>
         public IConfiguration GetTaskConfiguration(string taskId)
         {
-            var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
+            ICsConfigurationBuilder confBuilder;
+            confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
                 
.BindNamedParameter<GroupCommConfigurationOptions.TopologyRootTaskId, string>(
-                    
GenericType<GroupCommConfigurationOptions.TopologyRootTaskId>.Class,
-                    _rootId);
+                
GenericType<GroupCommConfigurationOptions.TopologyRootTaskId>.Class,
+                _rootId);
 
             if (taskId.Equals(_rootId))
             {
@@ -102,50 +103,41 @@ namespace Org.Apache.REEF.Network.Group.Topology
                 }
             }
 
-            if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
+            if (OperatorSpec is BroadcastOperatorSpec)
             {
-                var broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, 
T2>;
-
-                
confBuilder.AddConfiguration(broadcastSpec.PipelineDataConverter.GetConfiguration());
-                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
broadcastSpec.PipelineDataConverter.GetType())
-                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
+                var broadcastSpec = OperatorSpec as BroadcastOperatorSpec;
 
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<BroadcastSender<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<BroadcastSender<T>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<BroadcastReceiver<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<BroadcastReceiver<T>>.Class);
                 }
             }
-            else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
+            else if (OperatorSpec is ReduceOperatorSpec)
             {
-                var reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>;
-                
confBuilder.AddConfiguration(reduceSpec.PipelineDataConverter.GetConfiguration());
-                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
reduceSpec.PipelineDataConverter.GetType())
-                .BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType())
-                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
-                
+                var reduceSpec = OperatorSpec as ReduceOperatorSpec;
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ReduceReceiver<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ReduceReceiver<T>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ReduceSender<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ReduceSender<T>>.Class);
                 }
             }
-            else if (OperatorSpec is ScatterOperatorSpec<T1, T2>)
+            else if (OperatorSpec is ScatterOperatorSpec)
             {
-                ScatterOperatorSpec<T1, T2> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T1, T2>;
+                ScatterOperatorSpec scatterSpec = OperatorSpec as 
ScatterOperatorSpec;
                 if (taskId.Equals(scatterSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ScatterSender<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ScatterSender<T>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ScatterReceiver<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ScatterReceiver<T>>.Class);
                 }
             }
             else
@@ -153,7 +145,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
                 throw new NotSupportedException("Spec type not supported");
             }
 
-            return confBuilder.Build();
+            return Configurations.Merge(confBuilder.Build(), 
OperatorSpec.Configiration);
         }
 
         /// <summary>
@@ -205,4 +197,4 @@ namespace Org.Apache.REEF.Network.Group.Topology
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
index 083330d..0c7f164 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
@@ -26,9 +26,9 @@ namespace Org.Apache.REEF.Network.Group.Topology
     /// <summary>
     /// Represents a topology graph for IGroupCommOperators.
     /// </summary>
-    public interface ITopology<T1, T2> where T2 : ICodec<T1>
+    public interface ITopology<T>
     {
-        IOperatorSpec<T1, T2> OperatorSpec { get; }
+        IOperatorSpec OperatorSpec { get; }
 
         IConfiguration GetTaskConfiguration(string taskId);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
index b129f8a..e8ba6c1 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -27,11 +27,16 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
-    public class TreeTopology<T1, T2> : ITopology<T1, T2> where T2 : ICodec<T1>
+    public class TreeTopology<T> : ITopology<T> 
     {
+        private readonly Logger LOGGER = 
Logger.GetLogger(typeof(TreeTopology<T>));
+
         private readonly string _groupName;
         private readonly string _operatorName;
 
@@ -59,7 +64,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             string groupName, 
             string rootId,
             string driverId,
-            IOperatorSpec<T1, T2> operatorSpec,
+            IOperatorSpec operatorSpec,
             int fanOut)
         {
             _groupName = groupName;
@@ -73,7 +78,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             _nodes = new Dictionary<string, TaskNode>(); 
         }
 
-        public IOperatorSpec<T1, T2> OperatorSpec { get; set; }
+        public IOperatorSpec OperatorSpec { get; set; }
 
         /// <summary>
         /// Gets the task configuration for the operator topology.
@@ -105,8 +110,8 @@ namespace Org.Apache.REEF.Network.Group.Topology
             }
 
             //add parentid, if no parent, add itself
-            var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
+            ICsConfigurationBuilder confBuilder = 
TangFactory.GetTang().NewConfigurationBuilder()
+                //.BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
                 
.BindNamedParameter<GroupCommConfigurationOptions.TopologyRootTaskId, string>(
                     
GenericType<GroupCommConfigurationOptions.TopologyRootTaskId>.Class,
                     parentId);
@@ -119,48 +124,40 @@ namespace Org.Apache.REEF.Network.Group.Topology
                     childNode.TaskId);
             }
 
-            if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
+            if (OperatorSpec is BroadcastOperatorSpec)
             {
-                var broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, 
T2>;
-                
confBuilder.AddConfiguration(broadcastSpec.PipelineDataConverter.GetConfiguration());
-                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
broadcastSpec.PipelineDataConverter.GetType())
-                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
+                var broadcastSpec = OperatorSpec as BroadcastOperatorSpec;
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<BroadcastSender<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<BroadcastSender<T>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<BroadcastReceiver<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<BroadcastReceiver<T>>.Class);
                 }
             }
-            else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
+            else if (OperatorSpec is ReduceOperatorSpec)
             {
-                var reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>;
-                
confBuilder.AddConfiguration(reduceSpec.PipelineDataConverter.GetConfiguration());
-                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
reduceSpec.PipelineDataConverter.GetType())
-                .BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType())
-                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
-
+                var reduceSpec = OperatorSpec as ReduceOperatorSpec;
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ReduceReceiver<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ReduceReceiver<T>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ReduceSender<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ReduceSender<T>>.Class);
                 }
             }
-            else if (OperatorSpec is ScatterOperatorSpec<T1, T2>)
+            else if (OperatorSpec is ScatterOperatorSpec)
             {
-                ScatterOperatorSpec<T1, T2> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T1, T2>;
+                ScatterOperatorSpec scatterSpec = OperatorSpec as 
ScatterOperatorSpec;
                 if (taskId.Equals(scatterSpec.SenderId))
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ScatterSender<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ScatterSender<T>>.Class);
                 }
                 else
                 {
-                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T1>>.Class, 
GenericType<ScatterReceiver<T1>>.Class);
+                    
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, 
GenericType<ScatterReceiver<T>>.Class);
                 }
             }
             else
@@ -168,7 +165,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
                 throw new NotSupportedException("Spec type not supported");
             }
 
-            return confBuilder.Build();
+            return Configurations.Merge(confBuilder.Build(), 
OperatorSpec.Configiration);
         }
 
         public void AddTask(string taskId)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 9834269..9a47a69 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -52,7 +52,10 @@ under the License.
   <ItemGroup>
     <Compile Include="Group\Codec\GcmMessageProto.cs" />
     <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" />
+    <Compile Include="Group\Config\CodecConfiguration.cs" />
     <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" />
+    <Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" />
+    <Compile Include="Group\Config\ReduceFunctionConfiguration.cs" />
     <Compile Include="Group\Driver\ICommunicationGroupDriver.cs" />
     <Compile Include="Group\Driver\IGroupCommDriver.cs" />
     <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
index 8044599..765944c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -174,9 +174,9 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         private HashSet<string> AssembliesToCopy()
         {
             HashSet<string> appDlls = new HashSet<string>();
-            appDlls.Add(typeof(KMeansDriverHandlers).Assembly.GetName().Name);
             appDlls.Add(typeof(LegacyKMeansTask).Assembly.GetName().Name);
             appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(KMeansDriverHandlers).Assembly.GetName().Name);
             appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
             return appDlls;
         }

Reply via email to