Repository: incubator-reef
Updated Branches:
  refs/heads/master 35b099caf -> b65ff47fa


[REEF-261]: Introduce StreamingCodec

This change adds `IStreamingCodec` as well as implementations for
(arrays of) base types. Also, a wrapper is provided that turns any
`ICodec` into an (inefficient) `IStreamingCodec`.

JIRA:
  [REEF-261](https://issues.apache.org/jira/browse/REEF-261)

Pull Request:
  This cloes #236


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b65ff47f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b65ff47f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b65ff47f

Branch: refs/heads/master
Commit: b65ff47fa82f8d7ab9c5a37563f841f1d1699e40
Parents: 35b099c
Author: Dhruv <[email protected]>
Authored: Sat Jun 20 11:55:56 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jun 23 10:47:53 2015 -0700

----------------------------------------------------------------------
 .../GroupCommunication/StreamingCodecTests.cs   | 185 +++++++++++++++++++
 .../Org.Apache.REEF.Network.Tests.csproj        |   1 +
 .../Group/Config/StreamingCodecConfiguration.cs |  47 +++++
 .../Pipelining/StreamingPipelineMessageCodec.cs |  97 ++++++++++
 .../Org.Apache.REEF.Network.csproj              |  10 +
 .../StreamingCodec/CodecToStreamingCodec.cs     |  71 +++++++
 .../DoubleArrayStreamingCodec.cs                | 109 +++++++++++
 .../DoubleStreamingCodec.cs                     |  82 ++++++++
 .../FloatArrayStreamingCodec.cs                 | 109 +++++++++++
 .../FloatStreamingCodec.cs                      |  82 ++++++++
 .../IntArrayStreamingCodec.cs                   | 109 +++++++++++
 .../CommonStreamingCodecs/IntStreamingCodec.cs  |  82 ++++++++
 .../StreamingCodec/IStreamingCodec.cs           |  61 ++++++
 .../Remote/Impl/StreamDataReader.cs             |   8 +-
 .../Remote/Impl/StreamDataWriter.cs             |   8 +-
 15 files changed, 1055 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
new file mode 100644
index 0000000..6dea9f1
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Network.StreamingCodec;
+using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Tests.GroupCommunication
+{
+    /// <summary>
+    /// Defines streaming codec tests
+    /// </summary>
+    [TestClass]
+    public class StreamingCodecTests
+    {
+        [TestMethod]
+        public async Task TestCommonStreamingCodecs()
+        {
+            IInjector injector = TangFactory.GetTang().NewInjector();
+            IStreamingCodec<int> intCodec = 
injector.GetInstance<IntStreamingCodec>();
+            IStreamingCodec<double> doubleCodec = 
injector.GetInstance<DoubleStreamingCodec>();
+            IStreamingCodec<float> floatCodec = 
injector.GetInstance<FloatStreamingCodec>();
+
+            IStreamingCodec<int[]> intArrCodec = 
injector.GetInstance<IntArrayStreamingCodec>();
+            IStreamingCodec<double[]> doubleArrCodec = 
injector.GetInstance<DoubleArrayStreamingCodec>();
+            IStreamingCodec<float[]> floatArrCodec = 
injector.GetInstance<FloatArrayStreamingCodec>();
+
+            CancellationToken token = new CancellationToken();
+
+            int obj = 5;
+            int[] intArr = {1, 2};
+            double[] doubleArr = { 1, 2 };
+            float[] floatArr = { 1, 2 };
+
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            intCodec.Write(obj, writer);
+            await intCodec.WriteAsync(obj + 1, writer, token);
+            doubleCodec.Write(obj + 2, writer);
+            await doubleCodec.WriteAsync(obj + 3, writer, token);
+            floatCodec.Write(obj + 4, writer);
+            await floatCodec.WriteAsync(obj + 5, writer, token);
+            intArrCodec.Write(intArr, writer);
+            await intArrCodec.WriteAsync(intArr, writer, token);
+            doubleArrCodec.Write(doubleArr, writer);
+            await doubleArrCodec.WriteAsync(doubleArr, writer, token);
+            floatArrCodec.Write(floatArr, writer);
+            await floatArrCodec.WriteAsync(floatArr, writer, token);
+
+            stream.Position = 0;
+            IDataReader reader = new StreamDataReader(stream);
+            int res1 = intCodec.Read(reader);
+            int res2 = await intCodec.ReadAsync(reader, token);
+            double res3 = doubleCodec.Read(reader);
+            double res4 = await doubleCodec.ReadAsync(reader, token);
+            float res5 = floatCodec.Read(reader);
+            float res6 = await floatCodec.ReadAsync(reader, token);
+            int[] resArr1 = intArrCodec.Read(reader);
+            int[] resArr2 = await intArrCodec.ReadAsync(reader, token);
+            double[] resArr3 = doubleArrCodec.Read(reader);
+            double[] resArr4 = await doubleArrCodec.ReadAsync(reader, token);
+            float[] resArr5 = floatArrCodec.Read(reader);
+            float[] resArr6 = await floatArrCodec.ReadAsync(reader, token);
+            
+            Assert.AreEqual(obj, res1);
+            Assert.AreEqual(obj + 1, res2);
+            Assert.AreEqual(obj + 2, res3);
+            Assert.AreEqual(obj + 3, res4);
+            Assert.AreEqual(obj + 4, res5);
+            Assert.AreEqual(obj + 5, res6);
+
+            for (int i = 0; i < intArr.Length; i++)
+            {
+                Assert.AreEqual(intArr[i], resArr1[i]);
+                Assert.AreEqual(intArr[i], resArr2[i]);
+            }
+
+            for (int i = 0; i < doubleArr.Length; i++)
+            {
+                Assert.AreEqual(doubleArr[i], resArr3[i]);
+                Assert.AreEqual(doubleArr[i], resArr4[i]);
+            }
+
+            for (int i = 0; i < floatArr.Length; i++)
+            {
+                Assert.AreEqual(floatArr[i], resArr5[i]);
+                Assert.AreEqual(floatArr[i], resArr6[i]);
+            }
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentNullException))]
+        public void TestIntArrayStreamingCodecsNullException()
+        {
+            IInjector injector = TangFactory.GetTang().NewInjector();
+            IStreamingCodec<int[]> intArrCodec = 
injector.GetInstance<IntArrayStreamingCodec>();
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            intArrCodec.Write(null, writer);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentNullException))]
+        public void TestFloatArrayStreamingCodecsNullException()
+        {
+            IInjector injector = TangFactory.GetTang().NewInjector();
+            IStreamingCodec<float[]> floatArrCodec = 
injector.GetInstance<FloatArrayStreamingCodec>();
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            floatArrCodec.Write(null, writer);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentNullException))]
+        public void TestDoubleArrayStreamingCodecsNullException()
+        {
+            IInjector injector = TangFactory.GetTang().NewInjector();
+            IStreamingCodec<double[]> doubleArrCodec = 
injector.GetInstance<DoubleArrayStreamingCodec>();
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            doubleArrCodec.Write(null, writer);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentNullException))]
+        public void TestNullStreamException()
+        {
+            IDataWriter writer = new StreamDataWriter(null);
+            writer.WriteFloat(2.0f);
+        }
+
+        [TestMethod]
+        public async Task TestCodecToStreamingCodec()
+        {
+            var config = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(GenericType<ICodec<int>>.Class, 
GenericType<IntCodec>.Class)
+                .BindImplementation(GenericType<IStreamingCodec<int>>.Class,
+                    GenericType<CodecToStreamingCodec<int>>.Class)
+                .Build();
+
+            IStreamingCodec<int> streamingCodec =
+                
TangFactory.GetTang().NewInjector(config).GetInstance<IStreamingCodec<int>>();
+           
+            CancellationToken token = new CancellationToken();
+
+            int obj = 5;
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            streamingCodec.Write(obj, writer);
+            await streamingCodec.WriteAsync(obj + 1, writer, token);
+
+            stream.Position = 0;
+            IDataReader reader = new StreamDataReader(stream);
+            int res1 = streamingCodec.Read(reader);
+            int res2 = await streamingCodec.ReadAsync(reader, token);
+            Assert.AreEqual(obj, res1);
+            Assert.AreEqual(obj + 1, res2);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index 6cd5c05..e0568aa 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -53,6 +53,7 @@ under the License.
     <Compile Include="GroupCommunication\GroupCommuDriverTests.cs" />
     <Compile Include="GroupCommunication\GroupCommunicationTests.cs" />
     <Compile 
Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
+    <Compile Include="GroupCommunication\StreamingCodecTests.cs" />
     <Compile Include="NamingService\NameServerTests.cs" />
     <Compile Include="NetworkService\WritableNetworkServiceTests.cs" />
     <Compile Include="NetworkService\NetworkServiceTests.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
new file mode 100644
index 0000000..f655b91
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.StreamingCodec;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    /// <summary>
+    /// Defines configuration for streaming codecs and pipeline message 
+    /// streaming codecs by taking streaming codec as input.
+    /// </summary>
+    /// <typeparam name="T">Generic type of message</typeparam>
+    public sealed class StreamingCodecConfiguration<T> : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// RequiredImpl for Codec. Client needs to set implementation for 
this paramter
+        /// </summary>
+        public static readonly RequiredImpl<IStreamingCodec<T>> Codec = new 
RequiredImpl<IStreamingCodec<T>>();
+
+        /// <summary>
+        /// Configuration Module for Codec
+        /// </summary>
+        public static ConfigurationModule Conf = new 
StreamingCodecConfiguration<T>()
+            .BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec)
+            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, 
GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
new file mode 100644
index 0000000..aff8558
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.Group.Pipelining
+{
+    /// <summary>
+    /// The streaming codec for PipelineMessage
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public sealed class StreamingPipelineMessageCodec<T> : 
IStreamingCodec<PipelineMessage<T>>
+    {
+        /// <summary>
+        /// Creates new PipelineMessageCodec
+        /// </summary>
+        /// <param name="baseCodec">The codec for actual message in 
PipelineMessage</param>
+        [Inject]
+        private StreamingPipelineMessageCodec(IStreamingCodec<T> baseCodec)
+        {
+            BaseCodec = baseCodec;
+        }
+                
+        /// <summary>
+        /// Codec for actual message T
+        /// </summary>
+        public IStreamingCodec<T> BaseCodec { get; private set; }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The Pipeline Message read from the reader</returns>
+        public PipelineMessage<T> Read(IDataReader reader)
+        {
+            var message = BaseCodec.Read(reader);
+            var isLast = reader.ReadBoolean();
+            return new PipelineMessage<T>(message, isLast);
+        }
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(PipelineMessage<T> obj, IDataWriter writer)
+        {
+            BaseCodec.Write(obj.Data, writer);
+            writer.WriteBoolean(obj.IsLast);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The Pipeline Message  read from the reader</returns>
+        public async Task<PipelineMessage<T>> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            var message = await BaseCodec.ReadAsync(reader, token);
+            var isLast = await reader.ReadBooleanAsync(token);
+            return new PipelineMessage<T>(message, isLast);
+        }
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async System.Threading.Tasks.Task WriteAsync(PipelineMessage<T> 
obj, IDataWriter writer, CancellationToken token)
+        {
+            await BaseCodec.WriteAsync(obj.Data, writer, token);
+            await writer.WriteBooleanAsync(obj.IsLast, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index b8989ee..6a8fa0b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -50,12 +50,19 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" />
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" />
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" />
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" />
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" />
+    <Compile 
Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" />
     <Compile Include="Group\Codec\GcmMessageProto.cs" />
     <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" />
     <Compile Include="Group\Config\CodecConfiguration.cs" />
     <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" />
     <Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" />
     <Compile Include="Group\Config\ReduceFunctionConfiguration.cs" />
+    <Compile Include="Group\Config\StreamingCodecConfiguration.cs" />
     <Compile Include="Group\Driver\ICommunicationGroupDriver.cs" />
     <Compile Include="Group\Driver\IGroupCommDriver.cs" />
     <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />
@@ -79,6 +86,7 @@ under the License.
     <Compile Include="Group\Operators\Impl\ScatterSender.cs" />
     <Compile Include="Group\Operators\Impl\Sender.cs" />
     <Compile Include="Group\Operators\IOperatorSpec.cs" />
+    <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
     <Compile Include="Group\Task\IOperatorTopology.cs" />
     <Compile Include="Group\Operators\IReduceFunction.cs" />
     <Compile Include="Group\Operators\IReduceReceiver.cs" />
@@ -147,6 +155,8 @@ under the License.
     <Compile Include="NetworkService\WritableNsConnection.cs" />
     <Compile Include="NetworkService\WritableNsMessage.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="StreamingCodec\CodecToStreamingCodec.cs" />
+    <Compile Include="StreamingCodec\IStreamingCodec.cs" />
     <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
     <Compile Include="Utilities\Utils.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs
new file mode 100644
index 0000000..1964bb1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec
+{
+    /// <summary>
+    /// Converts codec to streaming codec
+    /// </summary>
+    /// <typeparam name="T">Message type</typeparam>
+    public sealed class CodecToStreamingCodec<T> : IStreamingCodec<T>
+    {
+        private readonly ICodec<T> _codec;
+            
+        [Inject]
+        private CodecToStreamingCodec(ICodec<T> codec)
+        {
+            _codec = codec;
+        }
+
+        public T Read(IDataReader reader)
+        {
+            int length = reader.ReadInt32();
+            byte[] byteArr = new byte[length];
+            reader.Read(ref byteArr, 0, length);
+            return _codec.Decode(byteArr);
+        }
+
+        public void Write(T obj, IDataWriter writer)
+        {
+            var byteArr = _codec.Encode(obj);
+            writer.WriteInt32(byteArr.Length);
+            writer.Write(byteArr, 0, byteArr.Length);
+        }
+
+        public async Task<T> ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            int length = await reader.ReadInt32Async(token);
+            byte[] byteArr = new byte[length];
+            await reader.ReadAsync(byteArr, 0, length, token);
+            return _codec.Decode(byteArr);
+        }
+
+        public async Task WriteAsync(T obj, IDataWriter writer, 
CancellationToken token)
+        {
+            var byteArr = _codec.Encode(obj);
+            await writer.WriteInt32Async(byteArr.Length, token);
+            await writer.WriteAsync(byteArr, 0, byteArr.Length, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
new file mode 100644
index 0000000..5caaaee
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for double array
+    /// </summary>
+    public sealed class DoubleArrayStreamingCodec : IStreamingCodec<double[]>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private DoubleArrayStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The double array read from the reader</returns>
+        public double[] Read(IDataReader reader)
+        {
+            int length = reader.ReadInt32();
+            byte[] buffer = new byte[sizeof(double)*length];
+            reader.Read(ref buffer, 0, buffer.Length);
+            double[] doubleArr = new double[length];
+            Buffer.BlockCopy(buffer, 0, doubleArr, 0, buffer.Length);
+            return doubleArr;
+        }
+
+        /// <summary>
+        /// Writes the double array to the writer.
+        /// </summary>
+        /// <param name="obj">The double array to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(double[] obj, IDataWriter writer)
+        {
+            if (obj == null)
+            {
+                throw new ArgumentNullException("obj", "double array is null");
+            }
+
+            writer.WriteInt32(obj.Length);
+            byte[] buffer = new byte[sizeof(double) * obj.Length];
+            Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
+            writer.Write(buffer, 0, buffer.Length);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The double array read from the reader</returns>
+        public async Task<double[]> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            int length = await reader.ReadInt32Async(token);
+            byte[] buffer = new byte[sizeof(double) * length];
+            await reader.ReadAsync(buffer, 0, buffer.Length, token);
+            double[] doubleArr = new double[length];
+            Buffer.BlockCopy(buffer, 0, doubleArr, 0, sizeof(double) * length);
+            return doubleArr;
+        }
+
+        /// <summary>
+        /// Writes the double array to the writer.
+        /// </summary>
+        /// <param name="obj">The double array to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(double[] obj, IDataWriter writer, 
CancellationToken token)
+        {
+            if (obj == null)
+            {
+                throw new ArgumentNullException("obj", "double array is null");
+            }
+
+            await writer.WriteInt32Async(obj.Length, token);
+            byte[] buffer = new byte[sizeof(double) * obj.Length];
+            Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(double) * obj.Length);
+            await writer.WriteAsync(buffer, 0, buffer.Length, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
new file mode 100644
index 0000000..5f19f00
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for double
+    /// </summary>
+    public sealed class DoubleStreamingCodec : IStreamingCodec<double>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private DoubleStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The double read from the reader</returns>
+        public double Read(IDataReader reader)
+        {
+            return reader.ReadDouble();
+        }
+
+        /// <summary>
+        /// Writes the double to the writer.
+        /// </summary>
+        /// <param name="obj">The double to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(double obj, IDataWriter writer)
+        {
+            writer.WriteDouble(obj);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The double read from the reader</returns>
+        public async Task<double> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            return await reader.ReadDoubleAsync(token);
+        }
+
+        /// <summary>
+        /// Writes the double to the writer.
+        /// </summary>
+        /// <param name="obj">The double to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(double obj, IDataWriter writer, 
CancellationToken token)
+        {
+            await writer.WriteDoubleAsync(obj, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
new file mode 100644
index 0000000..8d68749
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for float array
+    /// </summary>
+    public sealed class FloatArrayStreamingCodec : IStreamingCodec<float[]>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private FloatArrayStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The float array read from the reader</returns>
+        public float[] Read(IDataReader reader)
+        {
+            int length = reader.ReadInt32();
+            byte[] buffer = new byte[sizeof(float)*length];
+            reader.Read(ref buffer, 0, buffer.Length);
+            float[] floatArr = new float[length];
+            Buffer.BlockCopy(buffer, 0, floatArr, 0, buffer.Length);
+            return floatArr;
+        }
+
+        /// <summary>
+        /// Writes the float array to the writer.
+        /// </summary>
+        /// <param name="obj">The float array to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(float[] obj, IDataWriter writer)
+        {
+            if (obj == null)
+            {
+                throw new ArgumentNullException("obj", "float array is null");
+            }
+
+            writer.WriteInt32(obj.Length);
+            byte[] buffer = new byte[sizeof(float) * obj.Length];
+            Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
+            writer.Write(buffer, 0, buffer.Length);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The float array read from the reader</returns>
+        public async Task<float[]> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            int length = await reader.ReadInt32Async(token);
+            byte[] buffer = new byte[sizeof(float) * length];
+            await reader.ReadAsync(buffer, 0, buffer.Length, token);
+            float[] floatArr = new float[length];
+            Buffer.BlockCopy(buffer, 0, floatArr, 0, sizeof(float) * length);
+            return floatArr;
+        }
+
+        /// <summary>
+        /// Writes the float array to the writer.
+        /// </summary>
+        /// <param name="obj">The float array to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(float[] obj, IDataWriter writer, 
CancellationToken token)
+        {
+            if (obj == null)
+            {
+                throw new ArgumentNullException("obj", "float array is null");
+            }
+
+            await writer.WriteInt32Async(obj.Length, token);
+            byte[] buffer = new byte[sizeof(float) * obj.Length];
+            Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(float) * obj.Length);
+            await writer.WriteAsync(buffer, 0, buffer.Length, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
new file mode 100644
index 0000000..22ed947
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for float
+    /// </summary>
+    public sealed class FloatStreamingCodec : IStreamingCodec<float>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private FloatStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The float read from the reader</returns>
+        public float Read(IDataReader reader)
+        {
+            return reader.ReadFloat();
+        }
+
+        /// <summary>
+        /// Writes the float to the writer.
+        /// </summary>
+        /// <param name="obj">The float to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(float obj, IDataWriter writer)
+        {
+            writer.WriteFloat(obj);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The float read from the reader</returns>
+        public async Task<float> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            return await reader.ReadFloatAsync(token);
+        }
+
+        /// <summary>
+        /// Writes the float to the writer.
+        /// </summary>
+        /// <param name="obj">The float to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(float obj, IDataWriter writer, 
CancellationToken token)
+        {
+            await writer.WriteFloatAsync(obj, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
new file mode 100644
index 0000000..090d99f
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for integer array
+    /// </summary>
+    public sealed class IntArrayStreamingCodec : IStreamingCodec<int[]>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private IntArrayStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The integer array read from the reader</returns>
+        public int[] Read(IDataReader reader)
+        {
+            int length = reader.ReadInt32();
+            byte[] buffer = new byte[sizeof(int)*length];
+            reader.Read(ref buffer, 0, buffer.Length);
+            int[] intArr = new int[length];
+            Buffer.BlockCopy(buffer, 0, intArr, 0, buffer.Length);
+            return intArr;
+        }
+
+        /// <summary>
+        /// Writes the integer array to the writer.
+        /// </summary>
+        /// <param name="obj">The integer array to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(int[] obj, IDataWriter writer)
+        {
+            if (obj == null)
+            {
+                throw new ArgumentNullException("obj", "integer array is 
null");
+            }
+
+            writer.WriteInt32(obj.Length);
+            byte[] buffer = new byte[sizeof(int) * obj.Length];
+            Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
+            writer.Write(buffer, 0, buffer.Length);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The integer array read from the reader</returns>
+        public async Task<int[]> ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            int length = await reader.ReadInt32Async(token);
+            byte[] buffer = new byte[sizeof(int) * length];
+            await reader.ReadAsync(buffer, 0, buffer.Length, token);
+            int[] intArr = new int[length];
+            Buffer.BlockCopy(buffer, 0, intArr, 0, sizeof(int) * length);
+            return intArr;
+        }
+
+        /// <summary>
+        /// Writes the integer array to the writer.
+        /// </summary>
+        /// <param name="obj">The integer array to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(int[] obj, IDataWriter writer, 
CancellationToken token)
+        {
+            if (obj == null)
+            {
+                throw new ArgumentNullException("obj", "integer array is 
null");
+            }
+
+            await writer.WriteInt32Async(obj.Length, token);
+            byte[] buffer = new byte[sizeof(int) * obj.Length];
+            Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(int) * obj.Length);
+            await writer.WriteAsync(buffer, 0, buffer.Length, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
new file mode 100644
index 0000000..22e5490
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
+{
+    /// <summary>
+    /// Streaming codec for integer
+    /// </summary>
+    public sealed class IntStreamingCodec : IStreamingCodec<int>
+    {
+        /// <summary>
+        /// Injectable constructor
+        /// </summary>
+        [Inject]
+        private IntStreamingCodec()
+        {
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The iinteger read from the reader</returns>
+        public int Read(IDataReader reader)
+        {
+            return reader.ReadInt32();
+        }
+
+        /// <summary>
+        /// Writes the integer to the writer.
+        /// </summary>
+        /// <param name="obj">The integer to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(int obj, IDataWriter writer)
+        {
+            writer.WriteInt32(obj);
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The integer read from the reader</returns>
+        public async Task<int> ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            return await reader.ReadInt32Async(token);
+        }
+
+        /// <summary>
+        /// Writes the integer to the writer.
+        /// </summary>
+        /// <param name="obj">The integer to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(int obj, IDataWriter writer, 
CancellationToken token)
+        {
+            await writer.WriteInt32Async(obj, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs 
b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs
new file mode 100644
index 0000000..440b1c0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.StreamingCodec
+{
+    /// <summary>
+    /// Codec Interface that external users should implement to directly write 
to the stream
+    /// </summary>
+    public interface IStreamingCodec<T>
+    {
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The instance of type T read from the reader</returns>
+        T Read(IDataReader reader);
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object of type T to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        void Write(T obj, IDataWriter writer);
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The instance of type T read from the reader</returns>
+        Task<T> ReadAsync(IDataReader reader, CancellationToken token);
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object of type T to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        Task WriteAsync(T obj, IDataWriter writer, CancellationToken token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
index 67098d0..2e88573 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
@@ -18,10 +18,7 @@
  */
 
 using System;
-using System.Collections.Generic;
 using System.IO;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 using Org.Apache.REEF.Utilities.Diagnostics;
@@ -47,6 +44,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="stream">Stream from which to read</param>
         public StreamDataReader(Stream stream)
         {
+            if (stream == null)
+            {
+                throw new ArgumentNullException("stream", "input stream cannot 
be null");
+            }
+
             _stream = stream;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
index b702e1c..976253b 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
@@ -18,10 +18,7 @@
  */
 
 using System;
-using System.Collections.Generic;
 using System.IO;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -40,6 +37,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="stream">Stream from which to read</param>
         public StreamDataWriter(Stream stream)
         {
+            if (stream == null)
+            {
+                throw new ArgumentNullException("stream", "input stream cannot 
be null");
+            }
+
             _stream = stream;
         }
 

Reply via email to