Repository: incubator-reef
Updated Branches:
  refs/heads/master b3c1517bf -> 1d2003346


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index 3ccf31f..ade5834 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -25,19 +25,16 @@ using System.Threading;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Common.Services;
 using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Network.Group.Codec;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
@@ -45,7 +42,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// Used to create Communication Groups for Group Communication Operators 
on the Reef driver.
     /// Also manages configuration for Group Communication tasks/services.
     /// </summary>
-    public class GroupCommDriver : IGroupCommDriver
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public sealed class GroupCommDriver : IGroupCommDriver
     {
         private const string MasterTaskContextName = "MasterTaskContext";
         private const string SlaveTaskContextName = "SlaveTaskContext";
@@ -53,17 +51,16 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         private static Logger LOGGER = 
Logger.GetLogger(typeof(GroupCommDriver));
 
         private readonly string _driverId;
-        private readonly string _nameServerAddr;           
+        private readonly string _nameServerAddr;
         private readonly int _nameServerPort;
         private int _contextIds;
-        private int _fanOut;
-        private string _groupName;
+        private readonly int _fanOut;
+        private readonly string _groupName;
 
-        private readonly Dictionary<string, ICommunicationGroupDriver> 
_commGroups; 
+        private readonly Dictionary<string, ICommunicationGroupDriver> 
_commGroups;
         private readonly AvroConfigurationSerializer _configSerializer;
-        private readonly INameServer _nameServer;
 
-        
+
         /// <summary>
         /// Create a new GroupCommunicationDriver object.
         /// </summary>
@@ -75,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="configSerializer">Used to serialize task 
configuration</param>
         /// <param name="nameServer">Used to map names to ip adresses</param>
         [Inject]
-        public GroupCommDriver(
+        private GroupCommDriver(
             [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string 
driverId,
             [Parameter(typeof(GroupCommConfigurationOptions.MasterTaskId))] 
string masterTaskId,
             [Parameter(typeof(GroupCommConfigurationOptions.FanOut))] int 
fanOut,
@@ -92,9 +89,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
 
             _configSerializer = configSerializer;
             _commGroups = new Dictionary<string, ICommunicationGroupDriver>();
-            _nameServer = nameServer;
 
-            IPEndPoint localEndpoint = _nameServer.LocalEndpoint;
+            IPEndPoint localEndpoint = nameServer.LocalEndpoint;
             _nameServerAddr = localEndpoint.Address.ToString();
             _nameServerPort = localEndpoint.Port;
 
@@ -123,11 +119,13 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             {
                 throw new ArgumentNullException("groupName");
             }
-            else if (numTasks < 1)
+
+            if (numTasks < 1)
             {
-                throw new ArgumentException("NumTasks must be greater than 
0");                
+                throw new ArgumentException("NumTasks must be greater than 0");
             }
-            else if (_commGroups.ContainsKey(groupName))
+            
+            if (_commGroups.ContainsKey(groupName))
             {
                 throw new ArgumentException("Group Name already registered 
with GroupCommunicationDriver");
             }
@@ -144,8 +142,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public IConfiguration GetContextConfiguration()
         {
             int contextNum = Interlocked.Increment(ref _contextIds);
-            string id = (contextNum == 0) 
-                ? MasterTaskContextName 
+            string id = (contextNum == 0)
+                ? MasterTaskContextName
                 : GetSlaveTaskContextName(contextNum);
 
             return ContextConfiguration.ConfigurationModule
@@ -160,16 +158,13 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public IConfiguration GetServiceConfiguration()
         {
             IConfiguration serviceConfig = 
ServiceConfiguration.ConfigurationModule
-                .Set(ServiceConfiguration.Services, 
GenericType<NetworkService<GroupCommunicationMessage>>.Class)
+                .Set(ServiceConfiguration.Services, 
GenericType<WritableNetworkService<GeneralGroupCommunicationMessage>>.Class)
                 .Build();
 
             return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
                 .BindImplementation(
-                    
GenericType<IObserver<NsMessage<GroupCommunicationMessage>>>.Class,
+                    
GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class,
                     GenericType<GroupCommNetworkObserver>.Class)
-                .BindImplementation(
-                    GenericType<ICodec<GroupCommunicationMessage>>.Class,
-                    GenericType<GroupCommunicationMessageCodec>.Class)
                 
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
                     
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
                     _nameServerAddr)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index dd67f8c..33f9c92 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -17,91 +17,191 @@
  * under the License.
  */
 
+using System;
+using System.Threading;
+using Org.Apache.REEF.Network.StreamingCodec;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Annotations;
+
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
-    /// Messages sent by Group Communication Operators
+    /// Messages sent by MPI Operators. This is the Writable version of 
GroupCommunicationMessage
+    ///  class and will eventually replace it once everybody agrees with the 
design
     /// </summary>
-    public class GroupCommunicationMessage
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public sealed class GroupCommunicationMessage<T> : 
GeneralGroupCommunicationMessage
     {
+        private readonly IStreamingCodec<T> _codec;
+
+        /// <summary>
+        /// Empty constructor to allow instantiation by reflection
+        /// </summary>
+        [Inject]
+        private GroupCommunicationMessage(IStreamingCodec<T> codec)
+        {
+            _codec = codec;
+        }
+
         /// <summary>
         /// Create new CommunicationGroupMessage.
         /// </summary>
         /// <param name="groupName">The name of the communication group</param>
-        /// <param name="operatorName">The name of the Group Communication 
operator</param>
+        /// <param name="operatorName">The name of the MPI operator</param>
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
-        /// <param name="data">The actual byte array of data</param>
+        /// <param name="message">The actual Writable message</param>
         /// <param name="messageType">The type of message to send</param>
+        /// <param name="codec">Streaming Codec</param>
         public GroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
             string destination,
-            byte[] data,
-            MessageType messageType)
+            T message,
+            MessageType messageType,
+            IStreamingCodec<T> codec)
+            : base(groupName, operatorName, source, destination, messageType)
         {
-            GroupName = groupName;
-            OperatorName = operatorName;
-            Source = source;
-            Destination = destination;
-            Data = new[] { data };
-            MsgType = messageType;
+            _codec = codec;
+            Data = new T[] { message };
         }
 
         /// <summary>
         /// Create new CommunicationGroupMessage.
         /// </summary>
         /// <param name="groupName">The name of the communication group</param>
-        /// <param name="operatorName">The name of the Group Communication 
operator</param>
+        /// <param name="operatorName">The name of the MPI operator</param>
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
-        /// <param name="data">The actual byte array of data</param>
+        /// <param name="message">The actual Writable message array</param>
         /// <param name="messageType">The type of message to send</param>
+        /// <param name="codec">Streaming Codec</param>
         public GroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
             string destination,
-            byte[][] data,
-            MessageType messageType)
+            T[] message,
+            MessageType messageType,
+            IStreamingCodec<T> codec)
+            : base(groupName, operatorName, source, destination, messageType)
         {
-            GroupName = groupName;
-            OperatorName = operatorName;
-            Source = source;
-            Destination = destination;
-            Data = data;
-            MsgType = messageType;
+            _codec = codec;
+            Data = message;
         }
 
         /// <summary>
-        /// Returns the Communication Group name.
+        /// Returns the array of messages.
         /// </summary>
-        public string GroupName { get; private set; }
+        public T[] Data
+        {
+            get;
+            set;
+        }
 
         /// <summary>
-        /// Returns the Group Communication Operator name.
+        /// Read the class fields.
         /// </summary>
-        public string OperatorName { get; private set; }
+        /// <param name="reader">The reader from which to read </param>
+        public override void Read(IDataReader reader)
+        {
+            GroupName = reader.ReadString();
+            OperatorName = reader.ReadString();
+            Source = reader.ReadString();
+            Destination = reader.ReadString();
 
-        /// <summary>
-        /// Returns the source of the message.
-        /// </summary>
-        public string Source { get; private set; }
+            int dataCount = reader.ReadInt32();
+
+            if (dataCount == 0)
+            {
+                throw new Exception("Data Count in Group COmmunication Message 
cannot be zero");
+            }
+
+            MsgType = (MessageType)Enum.Parse(typeof(MessageType), 
reader.ReadString());
+            Data = new T[dataCount];
+
+            for (int index = 0; index < dataCount; index++)
+            {
+                Data[index] = _codec.Read(reader);
+
+                if (Data[index] == null)
+                {
+                    throw new Exception("message instance cannot be created 
from the IDataReader in Group Communication Message");
+                }
+            }
+        }
 
         /// <summary>
-        /// Returns the destination of the message.
+        /// Writes the class fields.
         /// </summary>
-        public string Destination { get; private set; }
+        /// <param name="writer">The writer to which to write</param>
+        public override void Write(IDataWriter writer)
+        {
+            writer.WriteString(GroupName);
+            writer.WriteString(OperatorName);
+            writer.WriteString(Source);
+            writer.WriteString(Destination);
+            writer.WriteInt32(Data.Length);
+            writer.WriteString(MsgType.ToString());
+
+            foreach (var data in Data)
+            {
+                _codec.Write(data, writer);
+            }
+        }
 
         /// <summary>
-        /// Returns the message data.
+        /// Read the class fields.
         /// </summary>
-        public byte[][] Data { get; private set; }
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        public override async System.Threading.Tasks.Task 
ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            GroupName = await reader.ReadStringAsync(token);
+            OperatorName = await reader.ReadStringAsync(token);
+            Source = await reader.ReadStringAsync(token);
+            Destination = await reader.ReadStringAsync(token);
+
+            int dataCount = await reader.ReadInt32Async(token);
+
+            if (dataCount == 0)
+            {
+                throw new Exception("Data Count in Group COmmunication Message 
cannot be zero");
+            }
+
+            MsgType = (MessageType)Enum.Parse(typeof(MessageType), await 
reader.ReadStringAsync(token));
+            Data = new T[dataCount];
+
+            for (int index = 0; index < dataCount; index++)
+            {
+                Data[index] = await _codec.ReadAsync(reader, token);
+
+                if (Data[index] == null)
+                {
+                    throw new Exception("message instance cannot be created 
from the IDataReader in Group Communication Message");
+                }
+            }
+        }
 
         /// <summary>
-        /// Returns the type of message being sent.
+        /// Writes the class fields.
         /// </summary>
-        public MessageType MsgType { get; private set; }
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public override async System.Threading.Tasks.Task 
WriteAsync(IDataWriter writer, CancellationToken token)
+        {
+            await writer.WriteStringAsync(GroupName, token);
+            await writer.WriteStringAsync(OperatorName, token);
+            await writer.WriteStringAsync(Source, token);
+            await writer.WriteStringAsync(Destination, token);
+            await writer.WriteInt32Async(Data.Length, token);
+            await writer.WriteStringAsync(MsgType.ToString(), token);
+
+            foreach (var data in Data)
+            {
+                await _codec.WriteAsync(data, writer, token);
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index b53929e..e7fcb52 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication Operator used to receive broadcast messages in 
pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The type of message being sent.</typeparam>
-    public class BroadcastReceiver<T> : IBroadcastReceiver<T>
+    public sealed class BroadcastReceiver<T> : IBroadcastReceiver<T>
     {
         private const int PipelineVersion = 2;
         private readonly IOperatorTopology<PipelineMessage<T>> _topology;
@@ -64,7 +64,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             PipelineDataConverter = dataConverter;
             _topology = topology;
 
-            var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
+            var msgHandler = 
Observer.Create<GeneralGroupCommunicationMessage>(message => 
topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index 4af01c7..5457a70 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication Operator used to send messages to child Tasks in 
pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class BroadcastSender<T> : IBroadcastSender<T>
+    public sealed class BroadcastSender<T> : IBroadcastSender<T>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(BroadcastSender<T>));
         private const int PipelineVersion = 2;
@@ -66,7 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = PipelineVersion;
             PipelineDataConverter = dataConverter;
 
-            var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
+            var msgHandler = 
Observer.Create<GeneralGroupCommunicationMessage>(message => 
topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index bea9c83..187bd58 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication operator used to receive and reduce messages in 
pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class ReduceReceiver<T> : IReduceReceiver<T>
+    public sealed class ReduceReceiver<T> : IReduceReceiver<T>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(ReduceReceiver<T>));
         private const int PipelineVersion = 2;
@@ -71,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             _pipelinedReduceFunc = new 
PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
 
-            var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
+            var msgHandler = 
Observer.Create<GeneralGroupCommunicationMessage>(message => 
topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index 5f289da..0c13d3c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -34,7 +34,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication Operator used to send messages to be reduced by 
the ReduceReceiver in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class ReduceSender<T> : IReduceSender<T>
+    public sealed class ReduceSender<T> : IReduceSender<T>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(ReduceSender<T>));
         private const int PipelineVersion = 2;
@@ -72,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             _pipelinedReduceFunc = new 
PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
 
-            var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
+            var msgHandler = 
Observer.Create<GeneralGroupCommunicationMessage>(message => 
topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             PipelineDataConverter = dataConverter;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index a9f4b10..a21656d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// from the IScatterSender.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class ScatterReceiver<T> : IScatterReceiver<T>
+    public sealed class ScatterReceiver<T> : IScatterReceiver<T>
     {
         private const int DefaultVersion = 1;
         private readonly IOperatorTopology<T> _topology;
@@ -60,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = DefaultVersion;
             _topology = topology;
 
-            var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
+            var msgHandler = 
Observer.Create<GeneralGroupCommunicationMessage>(message => 
topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 80bc84e..42e7f6b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// of the IScatterReceivers.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class ScatterSender<T> : IScatterSender<T>
+    public sealed class ScatterSender<T> : IScatterSender<T>
     {
         private const int DefaultVersion = 1;
         private readonly IOperatorTopology<T> _topology;
@@ -60,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = DefaultVersion;
             _topology = topology;
 
-            var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
+            var msgHandler = 
Observer.Create<GeneralGroupCommunicationMessage>(message => 
topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
index 7b37c07..a78b13c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
@@ -27,10 +27,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
     /// Group Communication operator used to do point-to-point communication 
between named Tasks.
+    /// It uses Writable classes
     /// </summary>
-    public class Sender
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public sealed class Sender
     {
-        private readonly INetworkService<GroupCommunicationMessage> 
_networkService;
+        private readonly INetworkService<GeneralGroupCommunicationMessage> 
_networkService;
         private readonly IIdentifierFactory _idFactory;
 
         /// <summary>
@@ -39,8 +41,8 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="networkService">The network services used to send 
messages.</param>
         /// <param name="idFactory">Used to create IIdentifier for 
GroupCommunicationMessages.</param>
         [Inject]
-        public Sender(
-            NetworkService<GroupCommunicationMessage> networkService, 
+        private Sender(
+            WritableNetworkService<GeneralGroupCommunicationMessage> 
networkService,
             IIdentifierFactory idFactory)
         {
             _networkService = networkService;
@@ -52,7 +54,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// included in the message.
         /// </summary>
         /// <param name="message">The message to send.</param>
-        public void Send(GroupCommunicationMessage message)
+        public void Send(GeneralGroupCommunicationMessage message)
         {
             if (message == null)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
index e4f5b40..96c36e1 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
@@ -26,12 +26,14 @@ namespace Org.Apache.REEF.Network.Group.Task
 {
     /// <summary>
     /// Handles incoming messages sent to this Communication Group.
+    /// Writable Version
     /// </summary>
     [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
-    public interface ICommunicationGroupNetworkObserver : 
IObserver<GroupCommunicationMessage>
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public interface ICommunicationGroupNetworkObserver : 
IObserver<GeneralGroupCommunicationMessage>
     {
         /// <summary>
-        /// Registers the handler with the CommunicationGroupNetworkObserver.
+        /// Registers the handler with the 
WritableCommunicationGroupNetworkObserver.
         /// Messages that are to be sent to the operator specified by 
operatorName
         /// are handled by the given observer.
         /// </summary>
@@ -39,6 +41,6 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// will be invoked</param>
         /// <param name="observer">The handler to invoke when messages are sent
         /// to the operator specified by operatorName</param>
-        void Register(string operatorName, 
IObserver<GroupCommunicationMessage> observer);
+        void Register(string operatorName, 
IObserver<GeneralGroupCommunicationMessage> observer);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
index 8416cca..de19754 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
@@ -27,9 +27,11 @@ namespace Org.Apache.REEF.Network.Group.Task
 {
     /// <summary>
     /// Handles all incoming messages for this Task.
+    /// Writable Version
     /// </summary>
     [DefaultImplementation(typeof(GroupCommNetworkObserver))]
-    public interface IGroupCommNetworkObserver : 
IObserver<NsMessage<GroupCommunicationMessage>>
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public interface IGroupCommNetworkObserver : 
IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>
     {
         /// <summary>
         /// Registers the network handler for the given CommunicationGroup.
@@ -39,6 +41,6 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// <param name="groupName">The group name for the network 
handler</param>
         /// <param name="commGroupHandler">The network handler to invoke when
         /// messages are sent to the given group.</param>
-        void Register(string groupName, IObserver<GroupCommunicationMessage> 
commGroupHandler);
+        void Register(string groupName, 
IObserver<GeneralGroupCommunicationMessage> commGroupHandler);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
index 30dddcf..5be1457 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -19,8 +19,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Threading;
-using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Diagnostics;
@@ -30,19 +28,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
     /// <summary>
     /// Handles incoming messages sent to this Communication Group.
+    /// Writable version
     /// </summary>
-    public class CommunicationGroupNetworkObserver : 
ICommunicationGroupNetworkObserver
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public sealed class CommunicationGroupNetworkObserver : 
ICommunicationGroupNetworkObserver
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
-        private readonly Dictionary<string, 
IObserver<GroupCommunicationMessage>> _handlers;
+        private readonly Dictionary<string, 
IObserver<GeneralGroupCommunicationMessage>> _handlers;
 
         /// <summary>
         /// Creates a new CommunicationGroupNetworkObserver.
         /// </summary>
         [Inject]
-        public CommunicationGroupNetworkObserver()
+        private CommunicationGroupNetworkObserver()
         {
-            _handlers = new Dictionary<string, 
IObserver<GroupCommunicationMessage>>();
+            _handlers = new Dictionary<string, 
IObserver<GeneralGroupCommunicationMessage>>();
         }
 
         /// <summary>
@@ -52,9 +52,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         /// <param name="operatorName">The name of the operator whose handler
         /// will be invoked</param>
-        /// <param name="observer">The handler to invoke when messages are sent
+        /// <param name="observer">The writable handler to invoke when 
messages are sent
         /// to the operator specified by operatorName</param>
-        public void Register(string operatorName, 
IObserver<GroupCommunicationMessage> observer)
+        public void Register(string operatorName, 
IObserver<GeneralGroupCommunicationMessage> observer)
         {
             if (string.IsNullOrEmpty(operatorName))
             {
@@ -69,16 +69,15 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         }
 
         /// <summary>
-        /// Handles the incoming GroupCommunicationMessage sent to this 
Communication Group.
+        /// Handles the incoming GeneralGroupCommunicationMessage sent to this 
Communication Group.
         /// Looks for the operator that the message is being sent to and 
invoke its handler.
         /// </summary>
         /// <param name="message">The incoming message</param>
-        public void OnNext(GroupCommunicationMessage message)
+        public void OnNext(GeneralGroupCommunicationMessage message)
         {
             string operatorName = message.OperatorName;
 
-            IObserver<GroupCommunicationMessage> handler = 
GetOperatorHandler(operatorName);
-
+            IObserver<GeneralGroupCommunicationMessage> handler = 
GetOperatorHandler(operatorName);
             if (handler == null)
             {
                 Exceptions.Throw(new ArgumentException("No handler registered 
with the operator name: " + operatorName), LOGGER);
@@ -94,14 +93,15 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         /// <param name="operatorName"></param>
         /// <returns></returns>
-        private IObserver<GroupCommunicationMessage> GetOperatorHandler(string 
operatorName)
+        private IObserver<GeneralGroupCommunicationMessage> 
GetOperatorHandler(string operatorName)
         {
-            IObserver<GroupCommunicationMessage> handler;
+            IObserver<GeneralGroupCommunicationMessage> handler;
             if (!_handlers.TryGetValue(operatorName, out handler))
             {
                 Exceptions.Throw(new ApplicationException("No handler 
registered yet with the operator name: " + operatorName), LOGGER);
             }
             return handler;
+
         }
 
         public void OnError(Exception error)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index 4cf0e06..8a8e696 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -31,28 +31,30 @@ using Org.Apache.REEF.Wake.Remote.Impl;
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
     /// <summary>
-    /// Container of ommunicationGroupClients
+    /// Used by Tasks to fetch CommunicationGroupClients.
+    /// Writable version
     /// </summary>
-    public class GroupCommClient : IGroupCommClient
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public sealed class GroupCommClient : IGroupCommClient
     {
         private readonly Dictionary<string, ICommunicationGroupClient> 
_commGroups;
 
-        private readonly INetworkService<GroupCommunicationMessage> 
_networkService;
+        private readonly INetworkService<GeneralGroupCommunicationMessage> 
_networkService;
 
         /// <summary>
-        /// Creates a new GroupCommClient and registers the task ID with the 
Name Server.
-        /// Currently the GroupCommClient is injected in task constructor. 
When work with REEF-289, we should put the injection at a proepr palce. 
+        /// Creates a new WritableGroupCommClient and registers the task ID 
with the Name Server.
         /// </summary>
         /// <param name="groupConfigs">The set of serialized Group 
Communication configurations</param>
-        /// <param name="taskId">The identifier for this taskfor this 
task</param>
-        /// <param name="networkService">The network service used to send 
messages</param>
+        /// <param name="taskId">The identifier for this task</param>
+        /// <param name="networkService">The writable network service used to 
send messages</param>
         /// <param name="configSerializer">Used to deserialize Group 
Communication configuration</param>
         /// <param name="injector">injector forked from the injector that 
creates this instance</param>
         [Inject]
-        private GroupCommClient(
+        [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please 
see Jira REEF-295 ", false)]
+        public GroupCommClient(
             
[Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] 
ISet<string> groupConfigs,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId,
-            NetworkService<GroupCommunicationMessage> networkService,
+            WritableNetworkService<GeneralGroupCommunicationMessage> 
networkService,
             AvroConfigurationSerializer configSerializer,
             IInjector injector)
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
index 5604885..9d35ff1 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
@@ -29,29 +29,31 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
     /// <summary>
     /// Handles all incoming messages for this Task.
+    /// Writable version
     /// </summary>
-    public class GroupCommNetworkObserver : IGroupCommNetworkObserver
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(GroupCommNetworkObserver));
 
-        private readonly Dictionary<string, 
IObserver<GroupCommunicationMessage>> _commGroupHandlers;
-            
+        private readonly Dictionary<string, 
IObserver<GeneralGroupCommunicationMessage>> _commGroupHandlers;
+
         /// <summary>
         /// Creates a new GroupCommNetworkObserver.
         /// </summary>
         [Inject]
         public GroupCommNetworkObserver()
         {
-            _commGroupHandlers = new Dictionary<string, 
IObserver<GroupCommunicationMessage>>();
+            _commGroupHandlers = new Dictionary<string, 
IObserver<GeneralGroupCommunicationMessage>>();
         }
 
         /// <summary>
-        /// Handles the incoming NsMessage for this Task.
-        /// Delegates the GroupCommunicationMessage to the correct 
-        /// CommunicationGroupNetworkObserver.
+        /// Handles the incoming WritableNsMessage for this Task.
+        /// Delegates the GeneralGroupCommunicationMessage to the correct 
+        /// WritableCommunicationGroupNetworkObserver.
         /// </summary>
         /// <param name="nsMessage"></param>
-        public void OnNext(NsMessage<GroupCommunicationMessage> nsMessage)
+        public void OnNext(WritableNsMessage<GeneralGroupCommunicationMessage> 
nsMessage)
         {
             if (nsMessage == null)
             {
@@ -60,7 +62,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             try
             {
-                GroupCommunicationMessage gcm = nsMessage.Data.First();
+                GeneralGroupCommunicationMessage gcm = nsMessage.Data.First();
                 _commGroupHandlers[gcm.GroupName].OnNext(gcm);
             }
             catch (InvalidOperationException)
@@ -83,7 +85,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="groupName">The group name for the network 
handler</param>
         /// <param name="commGroupHandler">The network handler to invoke when
         /// messages are sent to the given group.</param>
-        public void Register(string groupName, 
IObserver<GroupCommunicationMessage> commGroupHandler)
+        public void Register(string groupName, 
IObserver<GeneralGroupCommunicationMessage> commGroupHandler)
         {
             if (string.IsNullOrEmpty(groupName))
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index 013f76b..aa5de1e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -17,17 +17,22 @@
  * under the License.
  */
 
+using System;
 using System.Collections.Concurrent;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
+
     /// <summary>
     /// Stores all incoming messages sent by a particular Task.
+    /// Writable version
     /// </summary>
-    internal class NodeStruct
+    /// <typeparam name="T"> Generic type of message</typeparam>
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    internal sealed class NodeStruct<T>
     {
-        private readonly BlockingCollection<GroupCommunicationMessage> 
_messageQueue;
+        private readonly BlockingCollection<GroupCommunicationMessage<T>> 
_messageQueue;
 
         /// <summary>
         /// Creates a new NodeStruct.
@@ -36,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         public NodeStruct(string id)
         {
             Identifier = id;
-            _messageQueue = new 
BlockingCollection<GroupCommunicationMessage>();
+            _messageQueue = new 
BlockingCollection<GroupCommunicationMessage<T>>();
         }
 
         /// <summary>
@@ -49,7 +54,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Gets the first message in the message queue.
         /// </summary>
         /// <returns>The first available message.</returns>
-        public byte[][] GetData()
+        public T[] GetData()
         {
             return _messageQueue.Take().Data;
         }
@@ -58,7 +63,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Adds an incoming message to the message queue.
         /// </summary>
         /// <param name="gcm">The incoming message</param>
-        public void AddData(GroupCommunicationMessage gcm)
+        public void AddData(GroupCommunicationMessage<T> gcm)
         {
             _messageQueue.Add(gcm);
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index dd11402..67e1da9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -18,7 +18,6 @@
  */
 
 using System;
-using System.Collections;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
@@ -30,20 +29,22 @@ using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.StreamingCodec;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
     /// <summary>
     /// Contains the Operator's topology graph.
+    /// Writable version
     /// Used to send or receive messages to/from operators in the same
     /// Communication Group.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class OperatorTopology<T> : IOperatorTopology<T>, 
IObserver<GroupCommunicationMessage>
+    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
+    public sealed class OperatorTopology<T> : IOperatorTopology<T>, 
IObserver<GeneralGroupCommunicationMessage>
     {
         private const int DefaultTimeout = 50000;
         private const int RetryCount = 10;
@@ -57,14 +58,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private readonly int _timeout;
         private readonly int _retryCount;
 
-        private readonly NodeStruct _parent;
-        private readonly List<NodeStruct> _children;
-        private readonly Dictionary<string, NodeStruct> _idToNodeMap;
-        private readonly ICodec<T> _codec;
+        private readonly NodeStruct<T> _parent;
+        private readonly List<NodeStruct<T>> _children;
+        private readonly Dictionary<string, NodeStruct<T>> _idToNodeMap;
         private readonly INameClient _nameClient;
         private readonly Sender _sender;
-        private readonly BlockingCollection<NodeStruct> _nodesWithData;
+        private readonly BlockingCollection<NodeStruct<T>> _nodesWithData;
         private readonly Object _thisLock = new Object();
+        private readonly IStreamingCodec<T> _codec;
 
         /// <summary>
         /// Creates a new OperatorTopology object.
@@ -73,37 +74,39 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="groupName">The name of the operator's Communication 
Group</param>
         /// <param name="taskId">The operator's Task identifier</param>
         /// <param name="driverId">The identifer for the driver</param>
+        /// <param name="timeout">Timeout value for cancellation token</param>
+        /// <param name="retryCount">Number of times to retry 
registration</param>
         /// <param name="rootId">The identifier for the root Task in the 
topology graph</param>
         /// <param name="childIds">The set of child Task identifiers in the 
topology graph</param>
         /// <param name="networkService">The network service</param>
-        /// <param name="codec">The codec used to serialize and deserialize 
messages</param>
         /// <param name="sender">The Sender used to do point to point 
communication</param>
+        /// <param name="codec">Streaming codec to encode objects</param>
         [Inject]
-        public OperatorTopology(
+        private OperatorTopology(
             [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] 
string operatorName,
             
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] 
string groupName,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId,
             [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string 
driverId,
-            [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int 
timrout,
+            [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int 
timeout,
             [Parameter(typeof(GroupCommConfigurationOptions.RetryCount))] int 
retryCount,
             
[Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string 
rootId,
             
[Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] 
ISet<string> childIds,
-            NetworkService<GroupCommunicationMessage> networkService,
-            ICodec<T> codec,
-            Sender sender)
+            WritableNetworkService<GeneralGroupCommunicationMessage> 
networkService,
+            Sender sender,
+            IStreamingCodec<T> codec)
         {
             _operatorName = operatorName;
             _groupName = groupName;
             _selfId = taskId;
             _driverId = driverId;
-            _timeout = timrout;
+            _timeout = timeout;
             _retryCount = retryCount;
-            _codec = codec;
             _nameClient = networkService.NamingClient;
             _sender = sender;
-            _nodesWithData = new BlockingCollection<NodeStruct>();
-            _children = new List<NodeStruct>();
-            _idToNodeMap = new Dictionary<string, NodeStruct>();
+            _nodesWithData = new BlockingCollection<NodeStruct<T>>();
+            _children = new List<NodeStruct<T>>();
+            _idToNodeMap = new Dictionary<string, NodeStruct<T>>();
+            _codec = codec;
 
             if (_selfId.Equals(rootId))
             {
@@ -111,12 +114,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             }
             else
             {
-                _parent = new NodeStruct(rootId);
+                _parent = new NodeStruct<T>(rootId);
                 _idToNodeMap[rootId] = _parent;
             }
             foreach (var childId in childIds)
             {
-                var node = new NodeStruct(childId);
+                var node = new NodeStruct<T>(childId);
                 _children.Add(node);
                 _idToNodeMap[childId] = node;
             }
@@ -151,7 +154,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Updates the sending node's message queue.
         /// </summary>
         /// <param name="gcm">The incoming message</param>
-        public void OnNext(GroupCommunicationMessage gcm)
+        public void OnNext(GeneralGroupCommunicationMessage gcm)
         {
             if (gcm == null)
             {
@@ -171,7 +174,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             lock (_thisLock)
             {
                 _nodesWithData.Add(sourceNode);
-                sourceNode.AddData(gcm);
+                var message = gcm as GroupCommunicationMessage<T>;
+
+                if (message == null)
+                {
+                    throw new NullReferenceException("message passed not of 
type GroupCommunicationMessage");
+                }
+
+                sourceNode.AddData(message);
             }
         }
 
@@ -204,7 +214,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             foreach (var child in _children)
             {
-                SendToNode(message, MessageType.Data, child); 
+                SendToNode(message, MessageType.Data, child);
             }
         }
 
@@ -218,14 +228,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             if (messages == null)
             {
-                throw new ArgumentNullException("messages"); 
+                throw new ArgumentNullException("messages");
             }
             if (_children.Count <= 0)
             {
                 return;
             }
 
-            var count = (int) Math.Ceiling(((double) messages.Count) / 
_children.Count);
+            var count = (int)Math.Ceiling(((double)messages.Count) / 
_children.Count);
             ScatterHelper(messages, _children, count);
         }
 
@@ -268,10 +278,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentException("order cannot be null and must 
have the same number of elements as child tasks");
             }
 
-            List<NodeStruct> nodes = new List<NodeStruct>(); 
+            List<NodeStruct<T>> nodes = new List<NodeStruct<T>>();
             foreach (string taskId in order)
             {
-                NodeStruct node = FindNode(taskId);
+                NodeStruct<T> node = FindNode(taskId);
                 if (node == null)
                 {
                     throw new IllegalStateException("Received message from 
invalid task id: " + taskId);
@@ -280,7 +290,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 nodes.Add(node);
             }
 
-            int count = (int) Math.Ceiling(((double) messages.Count) / 
_children.Count);
+            int count = (int)Math.Ceiling(((double)messages.Count) / 
_children.Count);
             ScatterHelper(messages, nodes, count);
         }
 
@@ -290,28 +300,24 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <returns>The parent Task's message</returns>
         public T ReceiveFromParent()
         {
-            byte[][] data = ReceiveFromNode(_parent);
+            T[] data = ReceiveFromNode(_parent);
             if (data == null || data.Length != 1)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
             }
 
-            return _codec.Decode(data[0]);
+            return data[0];
         }
 
-        /// <summary>
-        /// Receive a list of incoming messages from the parent Task.
-        /// </summary>
-        /// <returns>The parent Task's list of messages</returns>
         public IList<T> ReceiveListFromParent()
         {
-            byte[][] data = ReceiveFromNode(_parent);
+            T[] data = ReceiveFromNode(_parent);
             if (data == null || data.Length == 0)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
             }
 
-            return data.Select(b => _codec.Decode(b)).ToList();
+            return data.ToList();
         }
 
         /// <summary>
@@ -336,13 +342,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
                 foreach (var child in childrenWithData)
                 {
-                    byte[][] data = ReceiveFromNode(child);
+                    T[] data = ReceiveFromNode(child);
                     if (data == null || data.Length != 1)
                     {
                         throw new InvalidOperationException("Received invalid 
data from child with id: " + child.Identifier);
                     }
 
-                    receivedData.Add(_codec.Decode(data[0]));
+                    receivedData.Add(data[0]);
                     childrenToReceiveFrom.Remove(child.Identifier);
                 }
             }
@@ -368,10 +374,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         ///<param name="nodeSetIdentifier">Candidate set of nodes from which 
data is to be received</param>
         /// <returns>A Vector of NodeStruct with incoming data.</returns>
-        private IEnumerable<NodeStruct> GetNodeWithData(IEnumerable<string> 
nodeSetIdentifier)
+        private IEnumerable<NodeStruct<T>> GetNodeWithData(IEnumerable<string> 
nodeSetIdentifier)
         {
             CancellationTokenSource timeoutSource = new 
CancellationTokenSource(_timeout);
-            List<NodeStruct> nodesSubsetWithData = new List<NodeStruct>();
+            List<NodeStruct<T>> nodesSubsetWithData = new 
List<NodeStruct<T>>();
 
             try
             {
@@ -408,7 +414,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                     potentialNode = _nodesWithData.Take();
                 }
 
-                return new NodeStruct[] { potentialNode };
+                return new NodeStruct<T>[] { potentialNode };
 
             }
             catch (OperationCanceledException)
@@ -434,10 +440,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="message">The message to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send 
to</param>
-        private void SendToNode(T message, MessageType msgType, NodeStruct 
node)
+        private void SendToNode(T message, MessageType msgType, NodeStruct<T> 
node)
         {
-            GroupCommunicationMessage gcm = new 
GroupCommunicationMessage(_groupName, _operatorName,
-                _selfId, node.Identifier, _codec.Encode(message), msgType);
+            GeneralGroupCommunicationMessage gcm = new 
GroupCommunicationMessage<T>(_groupName, _operatorName,
+                _selfId, node.Identifier, message, msgType, _codec);
 
             _sender.Send(gcm);
         }
@@ -448,16 +454,40 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="messages">The list of messages to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send 
to</param>
-        private void SendToNode(IList<T> messages, MessageType msgType, 
NodeStruct node)
+        private void SendToNode(IList<T> messages, MessageType msgType, 
NodeStruct<T> node)
         {
-            byte[][] encodedMessages = messages.Select(message => 
_codec.Encode(message)).ToArray();
-            GroupCommunicationMessage gcm = new 
GroupCommunicationMessage(_groupName, _operatorName,
-                _selfId, node.Identifier, encodedMessages, msgType);
+            T[] encodedMessages = messages.ToArray();
+
+            GroupCommunicationMessage<T> gcm = new 
GroupCommunicationMessage<T>(_groupName, _operatorName,
+                _selfId, node.Identifier, encodedMessages, msgType, _codec);
 
             _sender.Send(gcm);
         }
 
-        private void ScatterHelper(IList<T> messages, List<NodeStruct> order, 
int count)
+        /// <summary>
+        /// Receive a message from the Task represented by the given 
NodeStruct.
+        /// Removes the NodeStruct from the nodesWithData queue if requested.
+        /// </summary>
+        /// <param name="node">The node to receive from</param>
+        /// <returns>The byte array message from the node</returns>
+        private T[] ReceiveFromNode(NodeStruct<T> node)
+        {
+            var data = node.GetData();
+            return data;
+        }
+
+        /// <summary>
+        /// Find the NodeStruct with the given Task identifier.
+        /// </summary>
+        /// <param name="identifier">The identifier of the Task</param>
+        /// <returns>The NodeStruct</returns>
+        private NodeStruct<T> FindNode(string identifier)
+        {
+            NodeStruct<T> node;
+            return _idToNodeMap.TryGetValue(identifier, out node) ? node : 
null;
+        }
+
+        private void ScatterHelper(IList<T> messages, List<NodeStruct<T>> 
order, int count)
         {
             if (count <= 0)
             {
@@ -465,7 +495,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             }
 
             int i = 0;
-            foreach (NodeStruct nodeStruct in order)
+            foreach (var nodeStruct in order)
             {
                 // The last sublist might be smaller than count if the number 
of
                 // child tasks is not evenly divisible by count
@@ -484,29 +514,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         }
 
         /// <summary>
-        /// Receive a message from the Task represented by the given 
NodeStruct.
-        /// Removes the NodeStruct from the nodesWithData queue if requested.
-        /// </summary>
-        /// <param name="node">The node to receive from</param>
-        /// <returns>The byte array message from the node</returns>
-        private byte[][] ReceiveFromNode(NodeStruct node)
-        {
-            byte[][] data = node.GetData();
-            return data;
-        }
-
-        /// <summary>
-        /// Find the NodeStruct with the given Task identifier.
-        /// </summary>
-        /// <param name="identifier">The identifier of the Task</param>
-        /// <returns>The NodeStruct</returns>
-        private NodeStruct FindNode(string identifier)
-        {
-            NodeStruct node;
-            return _idToNodeMap.TryGetValue(identifier, out node) ? node : 
null;
-        }
-
-        /// <summary>
         /// Checks if the identifier is registered with the Name Server.
         /// Throws exception if the operation fails more than the retry count.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index c36f1ca..b182a39 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -25,8 +25,6 @@ using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 
 namespace Org.Apache.REEF.Network.Group.Topology

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
index 0c7f164..0bd46a2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
@@ -19,7 +19,6 @@
 
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
index d6c6bc6..3e76b64 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -25,18 +25,12 @@ using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Network.Group.Pipelining;
-using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
     public class TreeTopology<T> : ITopology<T> 
     {
-        private readonly Logger LOGGER = 
Logger.GetLogger(typeof(TreeTopology<T>));
-
         private readonly string _groupName;
         private readonly string _operatorName;
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
index 28cd5f9..a9299bb 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
@@ -101,12 +101,13 @@ namespace Org.Apache.REEF.Network.NetworkService
             SourceId = _factory.Create(reader.ReadString());
             DestId = _factory.Create(reader.ReadString());
             int messageCount = reader.ReadInt32();
+            string dataType = reader.ReadString();
 
             Data = new List<T>();
 
             for (int index = 0; index < messageCount; index++)
             {
-                var dataPoint = 
(T)_injection.ForkInjector().GetInstance(typeof(T));
+                var dataPoint = 
(T)_injection.ForkInjector().GetInstance(Type.GetType(dataType));
 
                 if (null == dataPoint)
                 {
@@ -127,6 +128,7 @@ namespace Org.Apache.REEF.Network.NetworkService
             writer.WriteString(SourceId.ToString());
             writer.WriteString(DestId.ToString());
             writer.WriteInt32(Data.Count);
+            writer.WriteString(Data[0].GetType().AssemblyQualifiedName);
 
             foreach (var data in Data)
             {
@@ -144,12 +146,13 @@ namespace Org.Apache.REEF.Network.NetworkService
             SourceId = _factory.Create(await reader.ReadStringAsync(token));
             DestId = _factory.Create(await reader.ReadStringAsync(token));
             int messageCount = await reader.ReadInt32Async(token);
+            string dataType = await reader.ReadStringAsync(token);
 
             Data = new List<T>();
 
             for (int index = 0; index < messageCount; index++)
             {
-                var dataPoint = Activator.CreateInstance<T>();
+                var dataPoint = (T) 
_injection.ForkInjector().GetInstance(Type.GetType(dataType));
 
                 if (null == dataPoint)
                 {
@@ -171,6 +174,7 @@ namespace Org.Apache.REEF.Network.NetworkService
             await writer.WriteStringAsync(SourceId.ToString(), token);
             await writer.WriteStringAsync(DestId.ToString(), token);
             await writer.WriteInt32Async(Data.Count, token);
+            await 
writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token);
 
             foreach (var data in Data)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 6a8fa0b..390bbb4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -50,14 +50,15 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" />
+    <Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" />
     <Compile 
Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" />
     <Compile 
Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" />
     <Compile 
Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" />
     <Compile 
Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" />
     <Compile 
Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" />
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\StringStreamingCodec.cs" />
     <Compile 
Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" />
-    <Compile Include="Group\Codec\GcmMessageProto.cs" />
-    <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" />
     <Compile Include="Group\Config\CodecConfiguration.cs" />
     <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" />
     <Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
new file mode 100644
index 0000000..63036f5
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for string
+    /// </summary>
+    public sealed class StringStreamingCodec : IStreamingCodec<string>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private StringStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The string read from the reader</returns>
+        public string Read(IDataReader reader)
+        {
+            return reader.ReadString();
+        }
+
+        /// <summary>
+        /// Writes the string to the writer.
+        /// </summary>
+        /// <param name="obj">The string to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(string obj, IDataWriter writer)
+        {
+            writer.WriteString(obj);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The string read from the reader</returns>
+        public async Task<string> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            return await reader.ReadStringAsync(token);
+        }
+
+        /// <summary>
+        /// Writes the string to the writer.
+        /// </summary>
+        /// <param name="obj">The string to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(string obj, IDataWriter writer, 
CancellationToken token)
+        {
+            await writer.WriteStringAsync(obj, token);
+        }
+    }
+}

Reply via email to