[REEF-260]: Make the GroupCommunicationLayer memory efficient by introducing 
streaming and building on top of StreamingNetworkService Layer
This addressed the issue by
  * Adding Writable GroupCommunicationMessage. There are two classes one 
GeneralGroupCommunicationMessage which is independent of message type and is 
visible to Network service and other type specific communication message 
visible to operators.
  * Making the current operators Writable.

JIRA: [REEF-260](https://issues.apache.org/jira/browse/REEF-260)

This CLoses #234

Author:    Dhruv <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1d200334
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1d200334
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1d200334

Branch: refs/heads/master
Commit: 1d2003346d5dd6a4a9cf39c46178bf0f1f755c76
Parents: b3c1517
Author: Dhruv <[email protected]>
Authored: Tue Jun 23 12:34:25 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Fri Jun 26 15:14:22 2015 -0700

----------------------------------------------------------------------
 .../KMeans/KMeansDriverHandlers.cs              |  22 +-
 .../Run.cs                                      |   3 +-
 .../BroadcastReduceDriver.cs                    |  14 +-
 .../GroupCommunication/GroupTestConfig.cs       |   4 +-
 .../PipelinedBroadcastReduceDriver.cs           |  39 ++-
 .../ScatterReduceDriver.cs                      |  11 +-
 .../GroupCommunicationTests.cs                  | 253 +++++++++----------
 .../GroupCommunicationTreeTopologyTests.cs      |  62 +++--
 .../GroupCommunication/StreamingCodecTests.cs   |  11 +-
 .../Group/Codec/GcmMessageProto.cs              |  70 -----
 .../Codec/GroupCommunicationMessageCodec.cs     |  70 -----
 .../CodecToStreamingCodecConfiguration.cs       |  44 ++++
 .../Group/Driver/ICommunicationGroupDriver.cs   |   5 -
 .../Driver/Impl/CommunicationGroupDriver.cs     |  20 +-
 .../Impl/GeneralGroupCommunicationMessage.cs    | 114 +++++++++
 .../Group/Driver/Impl/GroupCommDriver.cs        |  41 ++-
 .../Driver/Impl/GroupCommunicationMessage.cs    | 172 ++++++++++---
 .../Group/Operators/Impl/BroadcastReceiver.cs   |   4 +-
 .../Group/Operators/Impl/BroadcastSender.cs     |   4 +-
 .../Group/Operators/Impl/ReduceReceiver.cs      |   4 +-
 .../Group/Operators/Impl/ReduceSender.cs        |   4 +-
 .../Group/Operators/Impl/ScatterReceiver.cs     |   4 +-
 .../Group/Operators/Impl/ScatterSender.cs       |   4 +-
 .../Group/Operators/Impl/Sender.cs              |  12 +-
 .../Task/ICommunicationGroupNetworkObserver.cs  |   8 +-
 .../Group/Task/IGroupCommNetworkObserver.cs     |   6 +-
 .../Impl/CommunicationGroupNetworkObserver.cs   |  28 +-
 .../Group/Task/Impl/GroupCommClient.cs          |  20 +-
 .../Group/Task/Impl/GroupCommNetworkObserver.cs |  22 +-
 .../Group/Task/Impl/NodeStruct.cs               |  15 +-
 .../Group/Task/Impl/OperatorTopology.cs         | 155 ++++++------
 .../Group/Topology/FlatTopology.cs              |   2 -
 .../Group/Topology/ITopology.cs                 |   1 -
 .../Group/Topology/TreeTopology.cs              |   6 -
 .../NetworkService/WritableNsMessage.cs         |   8 +-
 .../Org.Apache.REEF.Network.csproj              |   5 +-
 .../StringStreamingCodec.cs                     |  82 ++++++
 37 files changed, 772 insertions(+), 577 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index f4c1ca8..ae22701 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -33,12 +33,10 @@ using 
Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Network.NetworkService.Codec;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
@@ -65,6 +63,10 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
         private readonly IGroupCommDriver _groupCommDriver;
         private readonly ICommunicationGroupDriver _commGroup;
         private readonly TaskStarter _groupCommTaskStarter;
+        private readonly IConfiguration _centroidCodecConf;
+        private readonly IConfiguration _controlMessageCodecConf;
+        private readonly IConfiguration _processedResultsCodecConf;
+
 
         [Inject]
         public KMeansDriverHandlers([Parameter(typeof(NumPartitions))] int 
numPartitions, GroupCommDriver groupCommDriver)
@@ -83,7 +85,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 
             _groupCommDriver = groupCommDriver;
 
-            IConfiguration conf1 = CodecConfiguration<Centroids>.Conf
+            _centroidCodecConf = 
CodecToStreamingCodecConfiguration<Centroids>.Conf
                 .Set(CodecConfiguration<Centroids>.Codec, 
GenericType<CentroidsCodec>.Class)
                 .Build();
 
@@ -91,7 +93,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 
.Set(PipelineDataConverterConfiguration<Centroids>.DataConverter, 
GenericType<DefaultPipelineDataConverter<Centroids>>.Class)
                 .Build();
 
-            IConfiguration conf2 = CodecConfiguration<ControlMessage>.Conf
+            _controlMessageCodecConf = 
CodecToStreamingCodecConfiguration<ControlMessage>.Conf
                 .Set(CodecConfiguration<ControlMessage>.Codec, 
GenericType<ControlMessageCodec>.Class)
                 .Build();
 
@@ -99,7 +101,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 
.Set(PipelineDataConverterConfiguration<ControlMessage>.DataConverter, 
GenericType<DefaultPipelineDataConverter<ControlMessage>>.Class)
                 .Build();
 
-            IConfiguration conf3 = CodecConfiguration<ProcessedResults>.Conf
+            _processedResultsCodecConf = 
CodecToStreamingCodecConfiguration<ProcessedResults>.Conf
                 .Set(CodecConfiguration<ProcessedResults>.Codec, 
GenericType<ProcessedResultsCodec>.Class)
                 .Build();
 
@@ -112,9 +114,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 .Build();
 
             _commGroup = _groupCommDriver.DefaultGroup
-                   
.AddBroadcast<Centroids>(Constants.CentroidsBroadcastOperatorName, 
Constants.MasterTaskId, TopologyTypes.Flat, conf1, dataConverterConfig1)
-                   
.AddBroadcast<ControlMessage>(Constants.ControlMessageBroadcastOperatorName, 
Constants.MasterTaskId, TopologyTypes.Flat, conf2, dataConverterConfig2)
-                   
.AddReduce<ProcessedResults>(Constants.MeansReduceOperatorName, 
Constants.MasterTaskId, TopologyTypes.Flat, conf3, reduceFunctionConfig, 
dataConverterConfig3)
+                   
.AddBroadcast<Centroids>(Constants.CentroidsBroadcastOperatorName, 
Constants.MasterTaskId, TopologyTypes.Flat, dataConverterConfig1)
+                   
.AddBroadcast<ControlMessage>(Constants.ControlMessageBroadcastOperatorName, 
Constants.MasterTaskId, TopologyTypes.Flat, dataConverterConfig2)
+                   
.AddReduce<ProcessedResults>(Constants.MeansReduceOperatorName, 
Constants.MasterTaskId, TopologyTypes.Flat, reduceFunctionConfig, 
dataConverterConfig3)
                    .Build();
 
             _groupCommTaskStarter = new TaskStarter(_groupCommDriver, 
_totalEvaluators);
@@ -149,10 +151,10 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                     partitionNum = _partitionInex;
                     _partitionInex++;
                 }
-            } 
+            }
 
             IConfiguration gcServiceConfiguration = 
_groupCommDriver.GetServiceConfiguration();
-
+            gcServiceConfiguration = 
Configurations.Merge(gcServiceConfiguration, _centroidCodecConf, 
_controlMessageCodecConf, _processedResultsCodecConf);
             IConfiguration commonServiceConfiguration = 
TangFactory.GetTang().NewConfigurationBuilder(gcServiceConfiguration)
                 .BindNamedParameter<DataPartitionCache.PartitionIndex, 
int>(GenericType<DataPartitionCache.PartitionIndex>.Class, 
partitionNum.ToString(CultureInfo.InvariantCulture))
                 
.BindNamedParameter<KMeansConfiguratioinOptions.ExecutionDirectory, 
string>(GenericType<KMeansConfiguratioinOptions.ExecutionDirectory>.Class, 
_executionDirectory)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
index e4d10da..639335b 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
@@ -34,7 +34,8 @@ namespace Org.Apache.REEF.Network.Examples.Client
             int startPort = 8900;
             int portRange = 1000;
             string testToRun = "RunBroadcastAndReduce";
-            
+            testToRun = testToRun.ToLower();
+
             if (args != null)
             {
                 if (args.Length > 0)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index 1d91e63..85ae3c0 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -57,6 +58,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
         private readonly ICommunicationGroupDriver _commGroup;
         private readonly TaskStarter _groupCommTaskStarter;
         private readonly IConfiguration _tcpPortProviderConfig;
+        private readonly IConfiguration _codecConfig;
 
         [Inject]
         public BroadcastReduceDriver(
@@ -78,8 +80,9 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
                     portRange.ToString(CultureInfo.InvariantCulture))
                 .Build();
 
-            IConfiguration codecConfig = CodecConfiguration<int>.Conf
-                .Set(CodecConfiguration<int>.Codec, 
GenericType<IntCodec>.Class)
+
+            _codecConfig = StreamingCodecConfiguration<int>.Conf
+                .Set(StreamingCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
                 .Build();
 
             IConfiguration reduceFunctionConfig = 
ReduceFunctionConfiguration<int>.Conf
@@ -95,13 +98,11 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
                         GroupTestConstants.BroadcastOperatorName,
                         GroupTestConstants.MasterTaskId, 
                         TopologyTypes.Tree,
-                        codecConfig,
                         dataConverterConfig)
                     .AddReduce<int>(
                         GroupTestConstants.ReduceOperatorName,
                         GroupTestConstants.MasterTaskId,
                         TopologyTypes.Tree,
-                        codecConfig,
                         reduceFunctionConfig,
                         dataConverterConfig)
                     .Build();
@@ -123,12 +124,14 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
         {
             IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
             IConfiguration serviceConf = 
_groupCommDriver.GetServiceConfiguration();
-            serviceConf = Configurations.Merge(serviceConf, 
_tcpPortProviderConfig);
+            serviceConf = Configurations.Merge(serviceConf, 
_tcpPortProviderConfig, _codecConfig);
             allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
         }
 
         public void OnNext(IActiveContext activeContext)
         {
+            IConfiguration serviceConfig = 
_groupCommDriver.GetServiceConfiguration();
+
             if (_groupCommDriver.IsMasterTaskContext(activeContext))
             {
                 // Configure Master Task
@@ -190,7 +193,6 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
             clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
             clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
             ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
index e329788..8a53dc8 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
@@ -43,12 +43,12 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication
         {
         }
 
-        [NamedParameter("Size of the array")]
+        [NamedParameter(Documentation = "Size of the array", DefaultValue = 
"6")]
         public class ArraySize : Name<int>
         {
         }
 
-        [NamedParameter("Chunk size for pipelining")]
+        [NamedParameter(Documentation = "Chunk size for pipelining", 
DefaultValue = "2")]
         public class ChunkSize : Name<int>
         {
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index 4cf8486..f32de61 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -27,6 +27,8 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
+using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -56,6 +58,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         private readonly int _numEvaluators;
         private readonly int _numIterations;
         private readonly IConfiguration _tcpPortProviderConfig;
+        private readonly IConfiguration _codecConfig;
 
         [Inject]
         public PipelinedBroadcastReduceDriver(
@@ -81,8 +84,8 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
                     portRange.ToString(CultureInfo.InvariantCulture))
                 .Build();
 
-            var codecConfig = CodecConfiguration<int[]>.Conf
-                .Set(CodecConfiguration<int[]>.Codec, 
GenericType<IntArrayCodec>.Class)
+            _codecConfig = StreamingCodecConfiguration<int[]>.Conf
+                .Set(StreamingCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
                 .Build();
 
             var reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
@@ -106,13 +109,11 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
                     GroupTestConstants.BroadcastOperatorName,
                     GroupTestConstants.MasterTaskId,
                     TopologyTypes.Tree,
-                    codecConfig,
                     dataConverterConfig)
                 .AddReduce<int[]>(
                     GroupTestConstants.ReduceOperatorName,
                     GroupTestConstants.MasterTaskId,
                     TopologyTypes.Tree,
-                    codecConfig,
                     reduceFunctionConfig,
                     dataConverterConfig)
                 .Build();
@@ -121,7 +122,21 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
             CreateClassHierarchy();
         }
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
 
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
+            IConfiguration serviceConf = 
_groupCommDriver.GetServiceConfiguration();
+            serviceConf = Configurations.Merge(serviceConf, _codecConfig, 
_tcpPortProviderConfig);
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
         public void OnNext(IActiveContext activeContext)
         {
             if (_groupCommDriver.IsMasterTaskContext(activeContext))
@@ -173,20 +188,6 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             }
         }
 
-        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
-        {
-            var contextConf = _groupCommDriver.GetContextConfiguration();
-            var serviceConf = _groupCommDriver.GetServiceConfiguration();
-            serviceConf = Configurations.Merge(serviceConf, 
_tcpPortProviderConfig);
-            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
-        }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            var request = new EvaluatorRequest(_numEvaluators, 512, 2, 
"WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
-
         public void OnError(Exception error)
         {
         }
@@ -199,8 +200,6 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         {
         }
 
-        public string Identifier { get; set; }
-
         private void CreateClassHierarchy()
         {
             var clrDlls = new HashSet<string>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
index ba812e3..8a9d3e2 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
@@ -55,6 +56,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
         private readonly IGroupCommDriver _groupCommDriver;
         private readonly ICommunicationGroupDriver _commGroup;
         private readonly TaskStarter _groupCommTaskStarter;
+        private readonly IConfiguration _codecConfig;
 
         [Inject]
         public ScatterReduceDriver(
@@ -65,9 +67,9 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
             _numEvaluators = numEvaluators;
             _groupCommDriver = groupCommDriver;
 
-            IConfiguration codecConfig = CodecConfiguration<int>.Conf
-                .Set(CodecConfiguration<int>.Codec, 
GenericType<IntCodec>.Class)
-                .Build();
+            _codecConfig = StreamingCodecConfiguration<int>.Conf
+               .Set(StreamingCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
+               .Build();
 
             IConfiguration reduceFunctionConfig = 
ReduceFunctionConfiguration<int>.Conf
                 .Set(ReduceFunctionConfiguration<int>.ReduceFunction, 
GenericType<SumFunction>.Class)
@@ -82,13 +84,11 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
                         GroupTestConstants.ScatterOperatorName,
                             GroupTestConstants.MasterTaskId,
                             TopologyTypes.Tree, 
-                            codecConfig,
                             dataConverterConfig)
                     .AddReduce<int>(
                         GroupTestConstants.ReduceOperatorName,
                         GroupTestConstants.MasterTaskId,
                         TopologyTypes.Tree, 
-                        codecConfig,
                         reduceFunctionConfig,
                         dataConverterConfig)
 
@@ -111,6 +111,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
         {
             IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
             IConfiguration serviceConf = 
_groupCommDriver.GetServiceConfiguration();
+            serviceConf = Configurations.Merge(serviceConf, _codecConfig);
             allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 53b8cb5..f2bec7c 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -21,17 +21,17 @@ using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Globalization;
+using System.IO;
 using System.Linq;
 using System.Net;
 using System.Reactive;
-using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Examples.MachineLearning.KMeans;
-using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
+using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
 using Org.Apache.REEF.Network.Examples.GroupCommunication;
-using Org.Apache.REEF.Network.Group.Codec;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -43,7 +43,7 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Network.StreamingCodec;
 using Org.Apache.REEF.Network.Tests.NamingService;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
@@ -53,7 +53,6 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
-using Constants = Org.Apache.REEF.Common.Constants;
 
 namespace Org.Apache.REEF.Network.Tests.GroupCommunication
 {
@@ -66,34 +65,41 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             using (var nameServer = NameServerTests.BuildNameServer())
             {
                 IPEndPoint endpoint = nameServer.LocalEndpoint;
-                BlockingCollection<GroupCommunicationMessage> messages1 = new 
BlockingCollection<GroupCommunicationMessage>();
-                BlockingCollection<GroupCommunicationMessage> messages2 = new 
BlockingCollection<GroupCommunicationMessage>();
 
-                var handler1 = 
Observer.Create<NsMessage<GroupCommunicationMessage>>(
-                    msg => messages1.Add(msg.Data.First()));
-                var handler2 = 
Observer.Create<NsMessage<GroupCommunicationMessage>>(
-                    msg => messages2.Add(msg.Data.First()));
+                BlockingCollection<GeneralGroupCommunicationMessage> messages1 
=
+                    new BlockingCollection<GeneralGroupCommunicationMessage>();
+                BlockingCollection<GeneralGroupCommunicationMessage> messages2 
=
+                    new BlockingCollection<GeneralGroupCommunicationMessage>();
 
-                var networkService1 = BuildNetworkService(endpoint, handler1);
-                var networkService2 = BuildNetworkService(endpoint, handler2);
+                var handler1 =
+                    
Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => 
messages1.Add(msg.Data.First()));
+                var handler2 =
+                    
Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => 
messages2.Add(msg.Data.First()));
 
+                var networkServiceInjector1 = 
BuildNetworkServiceInjector(endpoint, handler1);
+                var networkServiceInjector2 = 
BuildNetworkServiceInjector(endpoint, handler2);
+
+                var networkService1 = networkServiceInjector1.GetInstance<
+                  WritableNetworkService<GeneralGroupCommunicationMessage>>();
+                var networkService2 = networkServiceInjector2.GetInstance<
+                    
WritableNetworkService<GeneralGroupCommunicationMessage>>();
                 networkService1.Register(new StringIdentifier("id1"));
                 networkService2.Register(new StringIdentifier("id2"));
 
-                Sender sender1 = new Sender(networkService1, new 
StringIdentifierFactory());
-                Sender sender2 = new Sender(networkService2, new 
StringIdentifierFactory());
+                Sender sender1 = networkServiceInjector1.GetInstance<Sender>();
+                Sender sender2 = networkServiceInjector2.GetInstance<Sender>();
 
-                sender1.Send(CreateGcm("abc", "id1", "id2"));
-                sender1.Send(CreateGcm("def", "id1", "id2"));
+                sender1.Send(CreateGcmStringType("abc", "id1", "id2"));
+                sender1.Send(CreateGcmStringType("def", "id1", "id2"));
+                sender2.Send(CreateGcmStringType("ghi", "id2", "id1"));
 
-                sender2.Send(CreateGcm("ghi", "id2", "id1"));
+                string msg1 = (messages2.Take() as 
GroupCommunicationMessage<string>).Data[0];
+                string msg2 = (messages2.Take() as 
GroupCommunicationMessage<string>).Data[0];
 
-                string msg1 = 
Encoding.UTF8.GetString(messages2.Take().Data[0]);
-                string msg2 = 
Encoding.UTF8.GetString(messages2.Take().Data[0]);
                 Assert.AreEqual("abc", msg1);
                 Assert.AreEqual("def", msg2);
 
-                string msg3 = 
Encoding.UTF8.GetString(messages1.Take().Data[0]);
+                string msg3 = (messages1.Take() as 
GroupCommunicationMessage<string>).Data[0];
                 Assert.AreEqual("ghi", msg3);
             }
         }
@@ -115,18 +121,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                     broadcastOperatorName,
                     masterTaskId,
                     TopologyTypes.Flat,
-                    GetDefaulCodecConfig(),
                     GetDefaulDataConverterConfig())
                 .AddReduce<int>(
                     reduceOperatorName,
                     masterTaskId,
                     TopologyTypes.Flat,
-                    GetDefaulCodecConfig(),
                     GetDefaulDataConverterConfig(),
                     GetDefaulReduceFuncConfig())
                 .Build();
 
-            var commGroups = CommGroupClients(groupName, numTasks, 
groupCommunicationDriver, commGroup);
+            var commGroups = CommGroupClients(groupName, numTasks, 
groupCommunicationDriver, commGroup, GetDefaultCodecConfig());
 
             //for master task
             IBroadcastSender<int> broadcastSender = 
commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
@@ -172,8 +176,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             const int numTasks = 3;
             const int fanOut = 2;
 
-            IConfiguration codecConfig = CodecConfiguration<int[]>.Conf
-                .Set(CodecConfiguration<int[]>.Codec, 
GenericType<IntArrayCodec>.Class)
+            IConfiguration codecConfig = 
StreamingCodecConfiguration<int[]>.Conf
+                .Set(StreamingCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
                 .Build();
 
             IConfiguration reduceFunctionConfig = 
ReduceFunctionConfiguration<int[]>.Conf
@@ -196,18 +200,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                         broadcastOperatorName,
                         masterTaskId,
                         TopologyTypes.Flat,
-                        codecConfig,
                         dataConverterConfig)
                     .AddReduce<int[]>(
                         reduceOperatorName,
                         masterTaskId,
                         TopologyTypes.Flat,
-                        codecConfig,
                         dataConverterConfig,
                         reduceFunctionConfig)
                     .Build();
 
-            var commGroups = CommGroupClients(groupName, numTasks, 
groupCommunicationDriver, commGroup);
+            var commGroups = CommGroupClients(groupName, numTasks, 
groupCommunicationDriver, commGroup, codecConfig);
 
             //for master task
             IBroadcastSender<int[]> broadcastSender = 
commGroups[0].GetBroadcastSender<int[]>(broadcastOperatorName);
@@ -245,18 +247,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                     scatterOperatorName,
                     masterTaskId,
                     TopologyTypes.Flat,
-                    GetDefaulCodecConfig(),
                     GetDefaulDataConverterConfig())
                 .AddReduce<int>(
                         reduceOperatorName,
                         masterTaskId,
                         TopologyTypes.Flat,
-                        GetDefaulCodecConfig(),
                         GetDefaulReduceFuncConfig(),
                         GetDefaulDataConverterConfig())
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(scatterOperatorName);
             IReduceReceiver<int> sumReducer = 
commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
@@ -295,43 +295,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         }
 
         [TestMethod]
-        public void TestCodecConfig()
-        {
-            string groupName = "group1";
-            string masterTaskId = "task0";
-            string driverId = "Driver Id";
-            string meansReduceOperatorName = "MeansReduce";
-            string centroidsBroadcastOperatorName = "CentroidsBroadcast";
-            string controlMessageBroadcastOperatorName = 
"ControlMessageBroadcast";
-
-
-            IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, 2, 5);
-
-            IConfiguration conf1 = CodecConfiguration<Centroids>.Conf
-                .Set(CodecConfiguration<Centroids>.Codec, 
GenericType<CentroidsCodec>.Class)
-                .Build();
-
-            IConfiguration conf2 = CodecConfiguration<ControlMessage>.Conf
-                .Set(CodecConfiguration<ControlMessage>.Codec, 
GenericType<ControlMessageCodec>.Class)
-                .Build();
-
-            IConfiguration conf3 = CodecConfiguration<ProcessedResults>.Conf
-                .Set(CodecConfiguration<ProcessedResults>.Codec, 
GenericType<ProcessedResultsCodec>.Class)
-                .Build();
-
-            IConfiguration reduceFunctionConfig = 
ReduceFunctionConfiguration<ProcessedResults>.Conf
-                
.Set(ReduceFunctionConfiguration<ProcessedResults>.ReduceFunction, 
GenericType<KMeansMasterTask.AggregateMeans>.Class)
-                .Build();
-
-            IConfiguration merged = Configurations.Merge(conf3, 
reduceFunctionConfig);
-            var group = groupCommDriver.DefaultGroup
-                   .AddBroadcast<Centroids>(centroidsBroadcastOperatorName, 
masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), 
GetDefaulDataConverterConfig())
-                   
.AddBroadcast<ControlMessage>(controlMessageBroadcastOperatorName, 
masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), 
GetDefaulDataConverterConfig())
-                   .AddReduce<ProcessedResults>(meansReduceOperatorName, 
masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), 
GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
-                   .Build();
-        }
-
-        [TestMethod]
         public void TestBroadcastOperator()
         {
             string groupName = "group1";
@@ -348,7 +311,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                 .AddBroadcast(operatorName, masterTaskId)
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IBroadcastSender<int> sender = 
commGroups[0].GetBroadcastSender<int>(operatorName);
             IBroadcastReceiver<int> receiver1 = 
commGroups[1].GetBroadcastReceiver<int>(operatorName);
@@ -382,7 +345,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                 .AddBroadcast(operatorName, masterTaskId)
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IBroadcastSender<int> sender = 
commGroups[0].GetBroadcastSender<int>(operatorName);
             IBroadcastReceiver<int> receiver1 = 
commGroups[1].GetBroadcastReceiver<int>(operatorName);
@@ -416,7 +379,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
               .AddBroadcast(operatorName, masterTaskId)
               .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IBroadcastSender<int> sender = 
commGroups[0].GetBroadcastSender<int>(operatorName);
             IBroadcastReceiver<int> receiver1 = 
commGroups[1].GetBroadcastReceiver<int>(operatorName);
@@ -452,10 +415,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, 
GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), 
GetDefaulReduceFuncConfig())
+                .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, 
GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IReduceReceiver<int> receiver = 
commGroups[0].GetReduceReceiver<int>(operatorName);
             IReduceSender<int> sender1 = 
commGroups[1].GetReduceSender<int>(operatorName);
@@ -487,10 +450,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, 
GetDefaulCodecConfig(),GetDefaulDataConverterConfig(), 
GetDefaulReduceFuncConfig())
+                .AddReduce<int>(operatorName, "task0", 
TopologyTypes.Flat,GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IReduceReceiver<int> receiver = 
commGroups[0].GetReduceReceiver<int>(operatorName);
             IReduceSender<int> sender1 = 
commGroups[1].GetReduceSender<int>(operatorName);
@@ -534,7 +497,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                 .AddScatter(operatorName, masterTaskId)
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -573,7 +536,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                 .AddScatter(operatorName, masterTaskId)
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -609,10 +572,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), 
GetDefaulReduceFuncConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Flat, GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -659,10 +622,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Flat, GetDefaulDataConverterConfig())
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -706,10 +669,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Flat, GetDefaulDataConverterConfig())
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
             IScatterReceiver<int> receiver2 = 
commGroups[2].GetScatterReceiver<int>(operatorName);
@@ -744,20 +707,27 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         public void TestConfigurationBroadcastSpec()
         {
             FlatTopology<int> topology = new FlatTopology<int>("Operator", 
"Operator", "task1", "driverid",
-                new BroadcastOperatorSpec("Sender", GetDefaulCodecConfig(), 
GetDefaulDataConverterConfig()));
+                new BroadcastOperatorSpec("Sender", GetDefaultCodecConfig(), 
GetDefaulDataConverterConfig()));
 
             topology.AddTask("task1");
             var conf = topology.GetTaskConfiguration("task1");
 
-            ICodec<int> codec = 
TangFactory.GetTang().NewInjector(conf).GetInstance<ICodec<int>>();
-            Assert.AreEqual(3, codec.Decode(codec.Encode(3)));
+            IStreamingCodec<int> codec = 
TangFactory.GetTang().NewInjector(conf).GetInstance<IStreamingCodec<int>>();
+            
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            codec.Write(3, writer);
+            stream.Position = 0;
+            IDataReader reader = new StreamDataReader(stream);
+            int res = codec.Read(reader);
+            Assert.AreEqual(3, res);
         }
 
         [TestMethod]
         public void TestConfigurationReduceSpec()
         {
             FlatTopology<int> topology = new FlatTopology<int>("Operator", 
"Group", "task1", "driverid",
-                new ReduceOperatorSpec("task1", 
Configurations.Merge(GetDefaulCodecConfig(), GetDefaulDataConverterConfig(),  
GetDefaulReduceFuncConfig())));
+                new ReduceOperatorSpec("task1", 
Configurations.Merge(GetDefaultCodecConfig(), GetDefaulDataConverterConfig(),  
GetDefaulReduceFuncConfig())));
 
             topology.AddTask("task1");
             var conf2 = topology.GetTaskConfiguration("task1");
@@ -781,10 +751,41 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             return groupCommDriver;
         }
 
-        public static List<ICommunicationGroupClient> CommGroupClients(string 
groupName, int numTasks, IGroupCommDriver groupCommDriver, 
ICommunicationGroupDriver commGroupDriver)
+        [TestMethod]
+        public async Task TestCodecToStreamingCodecConfiguration()
+        {
+            var config = CodecToStreamingCodecConfiguration<int>.Conf
+               .Set(CodecToStreamingCodecConfiguration<int>.Codec, 
GenericType<IntCodec>.Class)
+               .Build();
+
+            IStreamingCodec<PipelineMessage<int>> streamingCodec =
+                
TangFactory.GetTang().NewInjector(config).GetInstance<IStreamingCodec<PipelineMessage<int>>>();
+
+            CancellationToken token = new CancellationToken();
+
+            int obj = 5;
+            PipelineMessage<int> message = new PipelineMessage<int>(obj, true);
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            streamingCodec.Write(message, writer);
+            PipelineMessage<int> message1 = new PipelineMessage<int>(obj + 1, 
false);
+            await streamingCodec.WriteAsync(message1, writer, token);
+
+            stream.Position = 0;
+            IDataReader reader = new StreamDataReader(stream);
+            var res1 = streamingCodec.Read(reader);
+            var res2 = await streamingCodec.ReadAsync(reader, token);
+            Assert.AreEqual(obj, res1.Data);
+            Assert.AreEqual(obj + 1, res2.Data);
+            Assert.AreEqual(true, res1.IsLast);
+            Assert.AreEqual(false, res2.IsLast);
+        }
+
+        public static List<ICommunicationGroupClient> CommGroupClients(string 
groupName, int numTasks, IGroupCommDriver groupCommDriver, 
ICommunicationGroupDriver commGroupDriver, IConfiguration userServiceConfig)
         {
             List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
             IConfiguration serviceConfig = 
groupCommDriver.GetServiceConfiguration();
+            serviceConfig = Configurations.Merge(serviceConfig, 
userServiceConfig);
 
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
@@ -820,24 +821,31 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             return commGroups;
         }
 
-        public static NetworkService<GroupCommunicationMessage> 
BuildNetworkService(
-            IPEndPoint nameServerEndpoint, 
IObserver<NsMessage<GroupCommunicationMessage>> handler)
+        public static IInjector BuildNetworkServiceInjector(
+            IPEndPoint nameServerEndpoint, 
IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> handler)
         {
-            var injector = TangFactory.GetTang().NewInjector();
-            var remoteManagerFactory = 
injector.GetInstance<IRemoteManagerFactory>();
-            return new NetworkService<GroupCommunicationMessage>(
-                0, handler, new StringIdentifierFactory(),
-                new GroupCommunicationMessageCodec(),
-                new NameClient(nameServerEndpoint.Address.ToString(),
-                    nameServerEndpoint.Port),
-                    remoteManagerFactory);
-                    
+            var config = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter(typeof 
(NamingConfigurationOptions.NameServerAddress),
+                    nameServerEndpoint.Address.ToString())
+                .BindNamedParameter(typeof 
(NamingConfigurationOptions.NameServerPort),
+                    nameServerEndpoint.Port.ToString())
+                .BindNamedParameter(typeof 
(NetworkServiceOptions.NetworkServicePort),
+                    (0).ToString(CultureInfo.InvariantCulture))
+                .BindImplementation(GenericType<INameClient>.Class, 
GenericType<NameClient>.Class)
+                
.BindImplementation(GenericType<IStreamingCodec<string>>.Class, 
GenericType<StringStreamingCodec>.Class)
+                .Build();
+
+            var injector = TangFactory.GetTang().NewInjector(config);
+            injector.BindVolatileInstance(
+                
GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class,
 handler);
+
+            return injector;
         }
 
-        private GroupCommunicationMessage CreateGcm(string message, string 
from, string to)
+        private GroupCommunicationMessage<string> CreateGcmStringType(string 
message, string from, string to)
         {
-            byte[] data = Encoding.UTF8.GetBytes(message);
-            return new GroupCommunicationMessage("g1", "op1", from, to, data, 
MessageType.Data);
+            var stringCodec = 
TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+            return new GroupCommunicationMessage<string>("g1", "op1", from, 
to, message, MessageType.Data, stringCodec);
         }
 
         private static void ScatterReceiveReduce(IScatterReceiver<int> 
receiver, IReduceSender<int> sumSender)
@@ -852,21 +860,21 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             return Enumerable.Range(1, n).Sum();
         }
 
-        private IConfiguration GetDefaulCodecConfig()
+        private static IConfiguration GetDefaultCodecConfig()
         {
-            return CodecConfiguration<int>.Conf
-                .Set(CodecConfiguration<int>.Codec, 
GenericType<IntCodec>.Class)
+            return StreamingCodecConfiguration<int>.Conf
+                .Set(StreamingCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
                 .Build();
         }
 
-        private IConfiguration GetDefaulReduceFuncConfig()
+        private static IConfiguration GetDefaulReduceFuncConfig()
         {
             return ReduceFunctionConfiguration<int>.Conf
                 .Set(ReduceFunctionConfiguration<int>.ReduceFunction, 
GenericType<SumFunction>.Class)
                 .Build();
         }
 
-        private IConfiguration GetDefaulDataConverterConfig()
+        private static IConfiguration GetDefaulDataConverterConfig()
         {
             return PipelineDataConverterConfiguration<int>.Conf
                 .Set(PipelineDataConverterConfiguration<int>.DataConverter, 
GenericType<DefaultPipelineDataConverter<int>>.Class)
@@ -937,33 +945,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         }
     }
 
-    class IntArrayCodec : ICodec<int[]>
-    {
-        [Inject]
-        private IntArrayCodec()
-        {
-        }
-
-        public byte[] Encode(int[] obj)
-        {
-            byte[] result = new byte[sizeof(Int32) * obj.Length];
-            Buffer.BlockCopy(obj, 0, result, 0, result.Length);
-            return result;
-        }
-
-        public int[] Decode(byte[] data)
-        {
-            if (data.Length % sizeof(Int32) != 0)
-            {
-                throw new Exception("error inside integer array decoder, byte 
array length not a multiple of interger size");
-            }
-
-            int[] result = new int[data.Length / sizeof(Int32)];
-            Buffer.BlockCopy(data, 0, result, 0, data.Length);
-            return result;
-        }
-    }
-
     class PipelineIntDataConverter : IPipelineDataConverter<int[]>
     {
         readonly int _chunkSize;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index f8af722..88ae9b3 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -21,8 +21,10 @@ using System.Collections.Generic;
 using System.Globalization;
 using System.Linq;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
@@ -42,7 +44,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         public void TestTreeTopology()
         {
             TreeTopology<int> topology = new TreeTopology<int>("Operator", 
"Operator", "task1", "driverid",
-                new BroadcastOperatorSpec("task1", GetDefaulCodecConfig(), 
GetDefaulDataConverterConfig()), 2);
+                new BroadcastOperatorSpec("task1", 
GetDefaultDataConverterConfig()), 2);
             for (int i = 1; i < 8; i++)
             {
                 string taskid = "task" + i;
@@ -68,10 +70,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddReduce<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), 
GetDefaulReduceFuncConfig())
+                .AddReduce<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig(), 
GetDefaultReduceFuncConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IReduceReceiver<int> receiver = 
commGroups[0].GetReduceReceiver<int>(operatorName);
             IReduceSender<int> sender1 = 
commGroups[1].GetReduceSender<int>(operatorName);
@@ -124,10 +126,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddBroadcast<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), 
GetDefaulReduceFuncConfig())
+                .AddBroadcast<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig(), 
GetDefaultReduceFuncConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IBroadcastSender<int> sender = 
commGroups[0].GetBroadcastSender<int>(operatorName);
             IBroadcastReceiver<int> receiver1 = 
commGroups[1].GetBroadcastReceiver<int>(operatorName);
@@ -204,18 +206,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                     broadcastOperatorName,
                     masterTaskId,
                     TopologyTypes.Tree,
-                    GetDefaulCodecConfig(),
-                    GetDefaulDataConverterConfig())
+                    GetDefaultDataConverterConfig())
                 .AddReduce<int>(
                     reduceOperatorName,
                     masterTaskId,
                     TopologyTypes.Tree,
-                    GetDefaulCodecConfig(),
-                    GetDefaulDataConverterConfig(),
-                    GetDefaulReduceFuncConfig())
+                    GetDefaultDataConverterConfig(),
+                    GetDefaultReduceFuncConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             //for master task
             IBroadcastSender<int> broadcastSender = 
commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
@@ -317,10 +317,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -362,10 +362,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -416,10 +416,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -467,10 +467,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -519,10 +519,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                .AddScatter<int>(operatorName, masterTaskId, 
TopologyTypes.Tree, GetDefaultDataConverterConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -586,18 +586,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                     scatterOperatorName,
                     masterTaskId,
                     TopologyTypes.Tree,
-                    GetDefaulCodecConfig(),
-                    GetDefaulDataConverterConfig())
+                    GetDefaultDataConverterConfig())
                 .AddReduce<int>(
                     reduceOperatorName,
                     masterTaskId,
                     TopologyTypes.Tree,
-                    GetDefaulCodecConfig(),
-                    GetDefaulDataConverterConfig(),
-                    GetDefaulReduceFuncConfig())
+                    GetDefaultDataConverterConfig(),
+                    GetDefaultReduceFuncConfig())
                 .Build();
 
-            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup);
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(scatterOperatorName);
             IReduceReceiver<int> sumReducer = 
commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
@@ -646,21 +644,21 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.AreEqual(sum, 6325);
         }
 
-        private IConfiguration GetDefaulCodecConfig()
+        private IConfiguration GetDefaultCodecConfig()
         {
-            return CodecConfiguration<int>.Conf
-                .Set(CodecConfiguration<int>.Codec, 
GenericType<IntCodec>.Class)
+            return StreamingCodecConfiguration<int>.Conf
+                .Set(StreamingCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
                 .Build();
         }
 
-        private IConfiguration GetDefaulReduceFuncConfig()
+        private IConfiguration GetDefaultReduceFuncConfig()
         {
             return ReduceFunctionConfiguration<int>.Conf
                 .Set(ReduceFunctionConfiguration<int>.ReduceFunction, 
GenericType<SumFunction>.Class)
                 .Build();
         }
 
-        private IConfiguration GetDefaulDataConverterConfig()
+        private IConfiguration GetDefaultDataConverterConfig()
         {
             return PipelineDataConverterConfiguration<int>.Conf
                 .Set(PipelineDataConverterConfiguration<int>.DataConverter, 
GenericType<DefaultPipelineDataConverter<int>>.Class)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
index 6dea9f1..d4f0647 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
@@ -50,12 +50,15 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IStreamingCodec<double[]> doubleArrCodec = 
injector.GetInstance<DoubleArrayStreamingCodec>();
             IStreamingCodec<float[]> floatArrCodec = 
injector.GetInstance<FloatArrayStreamingCodec>();
 
+            IStreamingCodec<string> stringCodec = 
injector.GetInstance<StringStreamingCodec>();
+
             CancellationToken token = new CancellationToken();
 
             int obj = 5;
             int[] intArr = {1, 2};
             double[] doubleArr = { 1, 2 };
             float[] floatArr = { 1, 2 };
+            string stringObj = "hello";
 
             var stream = new MemoryStream();
             IDataWriter writer = new StreamDataWriter(stream);
@@ -71,6 +74,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             await doubleArrCodec.WriteAsync(doubleArr, writer, token);
             floatArrCodec.Write(floatArr, writer);
             await floatArrCodec.WriteAsync(floatArr, writer, token);
+            stringCodec.Write(stringObj, writer);
+            await stringCodec.WriteAsync(stringObj, writer, token);
 
             stream.Position = 0;
             IDataReader reader = new StreamDataReader(stream);
@@ -86,13 +91,17 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             double[] resArr4 = await doubleArrCodec.ReadAsync(reader, token);
             float[] resArr5 = floatArrCodec.Read(reader);
             float[] resArr6 = await floatArrCodec.ReadAsync(reader, token);
-            
+            string resArr7 = stringCodec.Read(reader);
+            string resArr8 = await stringCodec.ReadAsync(reader, token);
+
             Assert.AreEqual(obj, res1);
             Assert.AreEqual(obj + 1, res2);
             Assert.AreEqual(obj + 2, res3);
             Assert.AreEqual(obj + 3, res4);
             Assert.AreEqual(obj + 4, res5);
             Assert.AreEqual(obj + 5, res6);
+            Assert.AreEqual(stringObj, resArr7);
+            Assert.AreEqual(stringObj, resArr8);
 
             for (int i = 0; i < intArr.Length; i++)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs
deleted file mode 100644
index 755851a..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using ProtoBuf;
-
-namespace Org.Apache.REEF.Network.Group.Codec
-{
-    [ProtoContract]
-    public class GcmMessageProto
-    {
-        [ProtoMember(1)]
-        public byte[][] Data { get; set; }
-
-        [ProtoMember(2)]
-        public string OperatorName { get; set; }
-
-        [ProtoMember(3)]
-        public string GroupName { get; set; }
-
-        [ProtoMember(4)]
-        public string Source { get; set; }
-
-        [ProtoMember(5)]
-        public string Destination { get; set; }
-
-        [ProtoMember(6)]
-        public MessageType MsgType { get; set; }
-
-        public static GcmMessageProto Create(GroupCommunicationMessage gcm)
-        {
-            return new GcmMessageProto()
-            {
-                Data = gcm.Data,
-                OperatorName = gcm.OperatorName,
-                GroupName = gcm.GroupName,
-                Source = gcm.Source,
-                Destination = gcm.Destination,
-                MsgType = gcm.MsgType,
-            };
-        }
-
-        public GroupCommunicationMessage ToGcm()
-        {
-            return new GroupCommunicationMessage(
-                GroupName,
-                OperatorName,
-                Source,
-                Destination,
-                Data,
-                MsgType);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs
deleted file mode 100644
index 081804f..0000000
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.IO;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-using ProtoBuf;
-
-namespace Org.Apache.REEF.Network.Group.Codec
-{
-    /// <summary>
-    /// Used to serialize GroupCommunicationMessages.
-    /// </summary>
-    public class GroupCommunicationMessageCodec : 
ICodec<GroupCommunicationMessage>
-    {
-        /// <summary>
-        /// Create a new GroupCommunicationMessageCodec.
-        /// </summary>
-        [Inject]
-        public GroupCommunicationMessageCodec()
-        {
-        }
-
-        /// <summary>
-        /// Serialize the GroupCommunicationObject into a byte array using 
Protobuf.
-        /// </summary>
-        /// <param name="obj">The object to serialize.</param>
-        /// <returns>The serialized GroupCommunicationMessage in byte array 
form</returns>
-        public byte[] Encode(GroupCommunicationMessage obj)
-        {
-            GcmMessageProto proto = GcmMessageProto.Create(obj);
-            using (var stream = new MemoryStream())
-            {
-                Serializer.Serialize(stream, proto);
-                return stream.ToArray();
-            }
-        }
-
-        /// <summary>
-        /// Deserialize the byte array into a GroupCommunicationMessage using 
Protobuf.
-        /// </summary>
-        /// <param name="data">The byte array to deserialize</param>
-        /// <returns>The deserialized GroupCommunicationMessage 
object.</returns>
-        public GroupCommunicationMessage Decode(byte[] data)
-        {
-            using (var stream = new MemoryStream(data))
-            {
-                GcmMessageProto proto = 
Serializer.Deserialize<GcmMessageProto>(stream);
-                return proto.ToGcm();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
new file mode 100644
index 0000000..a270272
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.StreamingCodec;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    public sealed class CodecToStreamingCodecConfiguration<T> : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// RequiredImpl for Codec. Client needs to set implementation for 
this paramter
+        /// </summary>
+        public static readonly RequiredImpl<ICodec<T>> Codec = new 
RequiredImpl<ICodec<T>>();
+
+        /// <summary>
+        /// Configuration Module for Codec
+        /// </summary>
+        public static ConfigurationModule Conf = new 
CodecToStreamingCodecConfiguration<T>()
+            .BindImplementation(GenericType<ICodec<T>>.Class, Codec)
+            .BindImplementation(GenericType<IStreamingCodec<T>>.Class, 
GenericType<CodecToStreamingCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, 
GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
index 3cc7270..240d244 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -17,14 +17,9 @@
  * under the License.
  */
 
-using System;
 using System.Collections.Generic;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Driver
 {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 07581ba..5ebb357 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -17,23 +17,20 @@
  * under the License.
  */
 
+using System;
 using System.Collections.Generic;
 using System.Reflection;
-using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
+
 
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
@@ -42,7 +39,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// All operators in the same Communication Group run on the the 
     /// same set of tasks.
     /// </summary>
-    public class CommunicationGroupDriver : ICommunicationGroupDriver
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.]
+    public sealed class CommunicationGroupDriver : ICommunicationGroupDriver
     {
         private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(CommunicationGroupDriver));
 
@@ -141,7 +139,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
         public ICommunicationGroupDriver AddBroadcast(string operatorName, 
string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddBroadcast<int>( operatorName, masterTaskId, 
topologyType, GetDefaulConfiguration());
+            return AddBroadcast<int>( operatorName, masterTaskId, 
topologyType, GetDefaultConfiguration());
         }
 
         /// <summary>
@@ -236,7 +234,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public ICommunicationGroupDriver AddScatter(string operatorName, 
string senderId,
             TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddScatter<int>(operatorName, senderId, topologyType, 
GetDefaulConfiguration());
+            return AddScatter<int>(operatorName, senderId, topologyType, 
GetDefaultConfiguration());
         }
 
         /// <summary>
@@ -343,18 +341,14 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             return (IConfiguration) info.Invoke(topology, new[] {(object) 
taskId});
         }
 
-        private IConfiguration[] GetDefaulConfiguration()
+        private IConfiguration[] GetDefaultConfiguration()
         {
             List<IConfiguration> list = new List<IConfiguration>(); 
-            IConfiguration codecConfig = CodecConfiguration<int>.Conf
-                .Set(CodecConfiguration<int>.Codec, 
GenericType<IntCodec>.Class)
-                .Build();
 
             IConfiguration dataConverterConfig = 
PipelineDataConverterConfiguration<int>.Conf
                 .Set(PipelineDataConverterConfiguration<int>.DataConverter, 
GenericType<DefaultPipelineDataConverter<int>>.Class)
                 .Build();
 
-            list.Add(codecConfig);
             list.Add(dataConverterConfig);
 
             return list.ToArray();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
new file mode 100644
index 0000000..e807a4e
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Messages sent by MPI Operators. This is the abstract class inherited 
by 
+    /// WritableGroupCommunicationMessage but seen by Network Service
+    /// </summary>
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public abstract class GeneralGroupCommunicationMessage : IWritable
+    {        
+        /// <summary>
+        /// Empty constructor to allow instantiation by reflection
+        /// </summary>
+        protected GeneralGroupCommunicationMessage()
+        {
+        }
+
+        /// <summary>
+        /// Create new CommunicationGroupMessage.
+        /// </summary>
+        /// <param name="groupName">The name of the communication group</param>
+        /// <param name="operatorName">The name of the MPI operator</param>
+        /// <param name="source">The message source</param>
+        /// <param name="destination">The message destination</param>
+        /// <param name="messageType">The type of message to send</param>
+        protected GeneralGroupCommunicationMessage(
+            string groupName,
+            string operatorName,
+            string source,
+            string destination,
+            MessageType messageType)
+        {
+            GroupName = groupName;
+            OperatorName = operatorName;
+            Source = source;
+            Destination = destination;
+            MsgType = messageType;
+        }
+
+        /// <summary>
+        /// Returns the Communication Group name.
+        /// </summary>
+        public string GroupName { get; internal set; }
+
+        /// <summary>
+        /// Returns the MPI Operator name.
+        /// </summary>
+        public string OperatorName { get; internal set; }
+
+        /// <summary>
+        /// Returns the source of the message.
+        /// </summary>
+        public string Source { get; internal set; }
+
+        /// <summary>
+        /// Returns the destination of the message.
+        /// </summary>
+        public string Destination { get; internal set; }
+
+        /// <summary>
+        /// Returns the type of message being sent.
+        /// </summary>
+        public MessageType MsgType { get; internal set; }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        public abstract void Read(IDataReader reader);
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        public abstract void Write(IDataWriter writer);
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        public abstract System.Threading.Tasks.Task ReadAsync(IDataReader 
reader, CancellationToken token);
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public abstract System.Threading.Tasks.Task WriteAsync(IDataWriter 
writer, CancellationToken token);
+    }
+}
\ No newline at end of file

Reply via email to