Repository: incubator-reef
Updated Branches:
  refs/heads/master 9e3e09051 -> 33b6edf1d


[REEF-257] Improve memory efficiency in Wake.NET

This adds the IWritable interface as well as supporting classes to allow
for a late serialization of messages. This improves upon the current
scheme where all messages are `byte[]` and need to be copied several
times.

JIRA:
  [REEF-257](https://issues.apache.org/jira/browse/REEF-257)

Pull Request:
  This closes #149


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/33b6edf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/33b6edf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/33b6edf1

Branch: refs/heads/master
Commit: 33b6edf1d5c182129df029aeba7f7badbe4f4a28
Parents: 9e3e090
Author: Dhruv <[email protected]>
Authored: Fri Apr 17 10:45:05 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri May 1 10:58:15 2015 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Wake.Tests.csproj           |   2 +
 .../WritableString.cs                           |  93 +++++
 .../WritableTransportTest.cs                    | 166 +++++++++
 .../Org.Apache.REEF.Wake.csproj                 |   9 +
 .../Org.Apache.REEF.Wake/Remote/IDataReader.cs  | 147 ++++++++
 .../Org.Apache.REEF.Wake/Remote/IDataWriter.cs  | 150 ++++++++
 .../cs/Org.Apache.REEF.Wake/Remote/IWritable.cs |  61 ++++
 .../Remote/Impl/StreamDataReader.cs             | 353 +++++++++++++++++++
 .../Remote/Impl/StreamDataWriter.cs             | 218 ++++++++++++
 .../Remote/Impl/WritableLink.cs                 | 293 +++++++++++++++
 .../Remote/Impl/WritableTransportClient.cs      | 128 +++++++
 .../Remote/Impl/WritableTransportServer.cs      | 198 +++++++++++
 .../cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs | 104 ++++++
 13 files changed, 1922 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index 911bd00..d709933 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -49,7 +49,9 @@ under the License.
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="PubSubSubjectTest.cs" />
     <Compile Include="RemoteManagerTest.cs" />
+    <Compile Include="WritableTransportTest.cs" />
     <Compile Include="TransportTest.cs" />
+    <Compile Include="WritableString.cs" />
   </ItemGroup>
   <ItemGroup>
     <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
new file mode 100644
index 0000000..8a6d041
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    /// <summary>
+    /// Writable wrapper around the string class
+    /// </summary>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableString : IWritable
+    {
+        /// <summary>
+        /// Returns the actual string data
+        /// </summary>
+        public string Data { get; set; }
+        
+        /// <summary>
+        /// Empty constructor for instantiation with reflection
+        /// </summary>
+        public WritableString()
+        {
+        }
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="data">The string data</param>
+        public WritableString(string data)
+        {
+            Data = data;
+        }
+
+        /// <summary>
+        /// Reads the string
+        /// </summary>
+        /// <param name="reader">reader to read from</param>
+        public void Read(IDataReader reader)
+        {
+            Data = reader.ReadString();
+        }
+
+        /// <summary>
+        /// Writes the string
+        /// </summary>
+        /// <param name="writer">Writer to write</param>
+        public void Write(IDataWriter writer)
+        {
+            writer.WriteString(Data);
+        }
+
+        /// <summary>
+        /// Reads the string
+        /// </summary>
+        /// <param name="reader">reader to read from</param>
+        /// <param name="token">the cancellation token</param>
+        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            Data = await reader.ReadStringAsync(token);
+        }
+
+        /// <summary>
+        /// Writes the string
+        /// </summary>
+        /// <param name="writer">Writer to write</param>
+        /// <param name="token">the cancellation token</param>
+        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
+        {
+            await writer.WriteStringAsync(Data, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
new file mode 100644
index 0000000..03de24e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Reactive;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    /// <summary>
+    /// Tests the WritableTransportServer, WritableTransportClient and 
WritableLink.
+    /// Basically the Wake transport layer.
+    /// </summary>
+    [TestClass]
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableTransportTest
+    {
+        /// <summary>
+        /// Tests whether WritableTransportServer receives 
+        /// string messages from WritableTransportClient
+        /// </summary>
+        [TestMethod]
+        public void TestWritableTransportServer()
+        {
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+            var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
+
+            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler))
+            {
+                server.Run();
+
+                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint))
+                {
+                    client.Send(new WritableString("Hello"));
+                    client.Send(new WritableString(", "));
+                    client.Send(new WritableString("World!"));
+
+                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take().Data);
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello");
+            Assert.AreEqual(events[1], ", ");
+            Assert.AreEqual(events[2], "World!");
+        }
+
+       
+        /// <summary>
+        /// Checks whether WritableTransportClient is able to receive messages 
from remote host
+        /// </summary>
+        [TestMethod]
+        public void TestWritableTransportSenderStage()
+        {
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+
+            List<string> events = new List<string>();
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+
+            // Server echoes the message back to the client
+            var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
tEvent.Link.Write(tEvent.Data));
+
+            using (WritableTransportServer<WritableString> server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler))
+            {
+                server.Run();
+
+                var clientHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
+                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, clientHandler))
+                {
+                    client.Send(new WritableString("Hello"));
+                    client.Send(new WritableString(", "));
+                    client.Send(new WritableString(" World"));
+
+                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take().Data);
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello");
+            Assert.AreEqual(events[1], ", ");
+            Assert.AreEqual(events[2], " World");
+        }
+
+        /// <summary>
+        /// Checks whether WritableTransportClient and WritableTransportServer 
works 
+        /// in asynchronous condition while sending messages asynchronously 
from different 
+        /// threads
+        /// </summary>
+        [TestMethod]
+        public void TestWritableRaceCondition()
+        {
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+            int numEventsExpected = 150;
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+            var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
+
+            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler))
+            {
+                server.Run();
+
+                for (int i = 0; i < numEventsExpected / 3; i++)
+                {
+                    Task.Run(() =>
+                    {
+                        IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                        using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint))
+                        {
+                            client.Send(new WritableString("Hello"));
+                            client.Send(new WritableString(", "));
+                            client.Send(new WritableString("World!"));
+                        }
+                    });
+                }
+
+                for (int i = 0; i < numEventsExpected; i++)
+                {
+                    events.Add(queue.Take().Data);
+                }
+            }
+
+            Assert.AreEqual(numEventsExpected, events.Count);
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj 
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index f9d1617..cc1ec2e 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -60,6 +60,14 @@ under the License.
     <Compile Include="IObserverFactory.cs" />
     <Compile Include="IStage.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Remote\IDataReader.cs" />
+    <Compile Include="Remote\IDataWriter.cs" />
+    <Compile Include="Remote\Impl\StreamDataReader.cs" />
+    <Compile Include="Remote\Impl\StreamDataWriter.cs" />
+    <Compile Include="Remote\Impl\WritableLink.cs" />
+    <Compile Include="Remote\Impl\WritableTransportClient.cs" />
+    <Compile Include="Remote\Impl\WritableTransportServer.cs" />
+    <Compile Include="Remote\IWritable.cs" />
     <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
     <Compile Include="Remote\ICodec.cs" />
     <Compile Include="Remote\ICodecFactory.cs" />
@@ -99,6 +107,7 @@ under the License.
     <Compile Include="Remote\Proto\WakeRemoteProtos.cs" />
     <Compile Include="Remote\RemoteConfiguration.cs" />
     <Compile Include="Remote\RemoteRuntimeException.cs" />
+    <Compile Include="Remote\TypeCache.cs" />
     <Compile Include="RX\AbstractObserver.cs" />
     <Compile Include="RX\AbstractRxStage.cs" />
     <Compile Include="RX\Impl\PubSubSubject.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs
new file mode 100644
index 0000000..8c78a89
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Interface for reading standard primitives
+    /// </summary>
+    public interface IDataReader
+    {
+        /// <summary>
+        /// Reads double
+        /// </summary>
+        /// <returns>read double</returns>
+        double ReadDouble();
+
+        /// <summary>
+        /// Reads float
+        /// </summary>
+        /// <returns>read float</returns>
+        float ReadFloat();
+
+        /// <summary>
+        /// Reads long
+        /// </summary>
+        /// <returns>read long</returns>
+        long ReadLong();
+
+        /// <summary>
+        /// Reads bool
+        /// </summary>
+        /// <returns>read bool</returns>
+        bool ReadBoolean();
+
+        /// <summary>
+        /// Reads integer
+        /// </summary>
+        /// <returns>read integer</returns>
+        int ReadInt32();
+
+        /// <summary>
+        /// Reads short
+        /// </summary>
+        /// <returns>read short</returns>
+        short ReadInt16();
+
+        /// <summary>
+        /// Reads string
+        /// </summary>
+        /// <returns>read string</returns>
+        string ReadString();
+
+        /// <summary>
+        /// Reads data in to the buffer
+        /// </summary>
+        /// <param name="buffer">byte array to which to write</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="bytesToRead">number of bytes to write</param>
+        /// <returns>Task handler that reads bytes and returns that number of 
bytes read 
+        /// if success, otherwise -1</returns>
+        int Read(ref byte[] buffer, int index, int bytesToRead);
+
+        /// <summary>
+        /// Reads double asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads double</returns>
+        Task<double> ReadDoubleAsync(CancellationToken token);
+
+        /// <summary>
+        /// Reads float asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads float</returns>
+        Task<float> ReadFloatAsync(CancellationToken token);
+
+        /// <summary>
+        /// Reads long asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads long</returns>
+        Task<long> ReadLongAsync(CancellationToken token);
+
+        /// <summary>
+        /// Reads bool asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads bool</returns>
+        Task<bool> ReadBooleanAsync(CancellationToken token);
+
+        /// <summary>
+        /// Reads integer asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads integer</returns>
+        Task<int> ReadInt32Async(CancellationToken token);
+
+        /// <summary>
+        /// Reads short asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads short</returns>
+        Task<short> ReadInt16Async(CancellationToken token);
+
+        /// <summary>
+        /// Reads string asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads string</returns>
+        Task<string> ReadStringAsync(CancellationToken token);
+
+
+        /// <summary>
+        /// Reads data in to the buffer asynchronously
+        /// </summary>
+        /// <param name="buffer">byte array to which to write</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="bytesToRead">number of bytes to write</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads bytes and returns that number of 
bytes read 
+        /// if success, otherwise -1</returns>
+        Task<int> ReadAsync(byte[] buffer, int index, int bytesToRead, 
CancellationToken token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs
new file mode 100644
index 0000000..13872ff
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Interface for writing standard primitives
+    /// </summary>
+    public interface IDataWriter
+    {
+        /// <summary>
+        /// write double
+        /// </summary>
+        /// <param name="obj">double to be written</param>
+        void WriteDouble(double obj);
+
+        /// <summary>
+        /// write float
+        /// </summary>
+        /// <param name="obj">float to be written</param>
+        void WriteFloat(float obj);
+
+        /// <summary>
+        /// write long
+        /// </summary>
+        /// <param name="obj">long to be written</param>
+        void WriteLong(long obj);
+
+        /// <summary>
+        /// write boolean
+        /// </summary>
+        /// <param name="obj">bool to be written</param>
+        void WriteBoolean(bool obj);
+
+        /// <summary>
+        /// write integer
+        /// </summary>
+        /// <param name="obj">int to be written</param>
+        void WriteInt32(int obj);
+
+        /// <summary>
+        /// write short
+        /// </summary>
+        /// <param name="obj">short to be written</param>
+        void WriteInt16(short obj);
+
+        /// <summary>
+        /// write string
+        /// </summary>
+        /// <param name="obj">string to be written</param>
+        void WriteString(string obj);
+
+        /// <summary>
+        /// write bytes to the byte array
+        /// </summary>
+        /// <param name="buffer">byte array from which to read</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="count">number of bytes to write</param>
+        void Write(byte[] buffer, int index, int count);
+
+        /// <summary>
+        /// write double asynchronously
+        /// </summary>
+        /// <param name="obj">double to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteDoubleAsync(double obj, CancellationToken token);
+
+        /// <summary>
+        /// write float asynchronously
+        /// </summary>
+        /// <param name="obj">float to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteFloatAsync(float obj, CancellationToken token);
+
+        /// <summary>
+        /// write long asynchronously
+        /// </summary>
+        /// <param name="obj">long to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteLongAsync(long obj, CancellationToken token);
+
+        /// <summary>
+        /// write bool asynchronously
+        /// </summary>
+        /// <param name="obj">bool to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteBooleanAsync(bool obj, CancellationToken token);
+
+        /// <summary>
+        /// write integer asynchronously
+        /// </summary>
+        /// <param name="obj">integer to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteInt32Async(int obj, CancellationToken token);
+
+        /// <summary>
+        /// write short asynchronously
+        /// </summary>
+        /// <param name="obj">short to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteInt16Async(short obj, CancellationToken token);
+
+        /// <summary>
+        /// write string asynchronously
+        /// </summary>
+        /// <param name="obj">string to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteStringAsync(string obj, CancellationToken token);
+
+        /// <summary>
+        /// write bytes to the byte array asynchronously
+        /// </summary>
+        /// <param name="buffer">byte array from which to read</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="count">number of bytes to write</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        Task WriteAsync(byte[] buffer, int index, int count, CancellationToken 
token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
new file mode 100644
index 0000000..644cf82
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Interface that classes should implement if they need to be readable to 
and writable 
+    /// from the stream. It is assumed that the classes inheriting this 
interface will have a 
+    /// default empty constructor
+    /// </summary>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public interface IWritable
+    {
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        void Read(IDataReader reader);
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        void Write(IDataWriter writer);
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        Task ReadAsync(IDataReader reader, CancellationToken token);
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        Task WriteAsync(IDataWriter writer, CancellationToken token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
new file mode 100644
index 0000000..67098d0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Class with functions to Read from stream
+    /// </summary>
+    public class StreamDataReader : IDataReader
+    {
+        Logger Logger = Logger.GetLogger(typeof(StreamDataReader));
+        
+        /// <summary>
+        /// Stream from which to read
+        /// </summary>
+        private readonly Stream _stream;
+        
+        /// <summary>
+        /// Constructs the StreamDataReader
+        /// </summary>
+        /// <param name="stream">Stream from which to read</param>
+        public StreamDataReader(Stream stream)
+        {
+            _stream = stream;
+        }
+
+        /// <summary>
+        /// Reads double
+        /// </summary>
+        /// <returns>read double</returns>
+        public double ReadDouble()
+        {
+            byte[] doubleBytes = new byte[sizeof(double)];
+            int readBytes = Read(ref doubleBytes, 0, sizeof(double));
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToDouble(doubleBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads float
+        /// </summary>
+        /// <returns>read float</returns>
+        public float ReadFloat()
+        {
+            byte[] floatBytes = new byte[sizeof(float)];
+            int readBytes = Read(ref floatBytes, 0, sizeof(float));
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToSingle(floatBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads long
+        /// </summary>
+        /// <returns>read long</returns>
+        public long ReadLong()
+        {
+            byte[] longBytes = new byte[sizeof(long)];
+            int readBytes = Read(ref longBytes, 0, sizeof (long));
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToInt64(longBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads bool
+        /// </summary>
+        /// <returns>read bool</returns>
+        public bool ReadBoolean()
+        {
+            byte[] boolBytes = new byte[sizeof(bool)];
+            int readBytes = Read(ref boolBytes, 0, sizeof(bool));
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToBoolean(boolBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads integer
+        /// </summary>
+        /// <returns>read integer</returns>
+        public int ReadInt32()
+        {
+            byte[] intBytes = new byte[sizeof(int)];
+            int readBytes = Read(ref intBytes, 0, sizeof (int));
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+
+            return BitConverter.ToInt32(intBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads short
+        /// </summary>
+        /// <returns>read short</returns>
+        public short ReadInt16()
+        {
+            byte[] intBytes = new byte[sizeof(Int16)];
+            int readBytes = Read(ref intBytes, 0, sizeof(Int16));
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToInt16(intBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads string
+        /// </summary>
+        /// <returns>read string</returns>
+        public string ReadString()
+        {
+            int length = ReadInt32();
+
+            byte[] stringByte = new byte[length];
+            int readBytes = Read(ref stringByte, 0, stringByte.Length);
+
+            if (readBytes == -1)
+            {
+                return null;
+            }
+
+            char[] stringChar = new char[stringByte.Length / sizeof(char)];
+            Buffer.BlockCopy(stringByte, 0, stringChar, 0, stringByte.Length);
+            return new string(stringChar);
+        }
+
+        /// <summary>
+        /// Reads data in to the buffer
+        /// </summary>
+        /// <param name="buffer">byte array to which to write</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="bytesToRead">number of bytes to write</param>
+        /// <returns>Task handler that reads bytes and returns that number of 
bytes read 
+        /// if success, otherwise -1</returns>
+        public int Read(ref byte[] buffer, int index, int bytesToRead)
+        {
+            if (buffer == null || buffer.Length < bytesToRead)
+            {
+                buffer = new byte[bytesToRead];
+            }
+
+            int totalBytesRead = 0;
+            while (totalBytesRead < bytesToRead)
+            {
+                int bytesRead = _stream.Read(buffer, index + totalBytesRead, 
bytesToRead - totalBytesRead);
+                if (bytesRead == 0)
+                {
+                    // Read timed out or connection was closed
+                    return -1;
+                }
+
+                totalBytesRead += bytesRead;
+            }
+
+            return totalBytesRead;
+        }
+
+        /// <summary>
+        /// Reads double asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads double</returns>
+        public async Task<double> ReadDoubleAsync(CancellationToken token)
+        {
+            byte[] boolBytes = new byte[sizeof(double)];
+            int readBytes = await ReadAsync(boolBytes, 0, sizeof(double), 
token);
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToDouble(boolBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads float asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads float</returns>
+        public async Task<float> ReadFloatAsync(CancellationToken token)
+        {
+            byte[] boolBytes = new byte[sizeof(float)];
+            int readBytes = await ReadAsync(boolBytes, 0, sizeof(float), 
token);
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToSingle(boolBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads long asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads long</returns>
+        public async Task<long> ReadLongAsync(CancellationToken token)
+        {
+            byte[] longBytes = new byte[sizeof(long)];
+            int readBytes = await ReadAsync(longBytes, 0, sizeof (long), 
token);
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToInt64(longBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads bool asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads bool</returns>
+        public async Task<bool> ReadBooleanAsync(CancellationToken token)
+        {
+            byte[] boolBytes = new byte[sizeof(bool)];
+            int readBytes = await ReadAsync(boolBytes, 0, sizeof(bool), token);
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToBoolean(boolBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads integer asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads integer</returns>
+        public async Task<int> ReadInt32Async(CancellationToken token)
+        {
+            byte[] intBytes = new byte[sizeof(int)];
+            int readBytes = await ReadAsync(intBytes, 0, sizeof (int), token);
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToInt32(intBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads short asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads short</returns>
+        public async Task<short> ReadInt16Async(CancellationToken token)
+        {
+            byte[] intBytes = new byte[sizeof(Int16)];
+            int readBytes = await ReadAsync(intBytes, 0, sizeof (Int16), 
token);
+
+            if (readBytes == -1)
+            {
+                Exceptions.Throw(new Exception("No bytes read"), Logger);
+            }
+            return BitConverter.ToInt16(intBytes, 0);
+        }
+
+        /// <summary>
+        /// Reads string asynchronously
+        /// </summary>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads string</returns>
+        public async Task<string> ReadStringAsync(CancellationToken token)
+        {
+            int length = ReadInt32();
+
+            byte[] stringByte = new byte[length];
+            int readBytes = await ReadAsync(stringByte, 0, stringByte.Length, 
token);
+
+            if (readBytes == -1)
+            {
+                return null;
+            }
+
+            char[] stringChar = new char[stringByte.Length / sizeof(char)];
+            Buffer.BlockCopy(stringByte, 0, stringChar, 0, stringByte.Length);
+            return new string(stringChar);
+        }
+
+        /// <summary>
+        /// Reads data in to the buffer asynchronously
+        /// </summary>
+        /// <param name="buffer">byte array to which to write</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="bytesToRead">number of bytes to write</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Task handler that reads bytes and returns that number of 
bytes read 
+        /// if success, otherwise -1</returns>
+        public async Task<int> ReadAsync(byte[] buffer, int index, int 
bytesToRead, CancellationToken token)
+        {
+            int totalBytesRead = 0;
+            while (totalBytesRead < bytesToRead)
+            {
+                int bytesRead = await _stream.ReadAsync(buffer, totalBytesRead 
+ index, bytesToRead - totalBytesRead, token);
+                if (bytesRead == 0)
+                {
+                    // Read timed out or connection was closed
+                    return -1;
+                }
+
+                totalBytesRead += bytesRead;
+            }
+
+            return bytesToRead;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
new file mode 100644
index 0000000..b702e1c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class StreamDataWriter : IDataWriter
+    {
+         /// <summary>
+        /// Stream to which to write
+        /// </summary>
+        private readonly Stream _stream;
+        
+        /// <summary>
+        /// Constructs the StreamDataReader
+        /// </summary>
+        /// <param name="stream">Stream from which to read</param>
+        public StreamDataWriter(Stream stream)
+        {
+            _stream = stream;
+        }
+
+        /// <summary>
+        /// write double
+        /// </summary>
+        /// <param name="obj">double to be written</param>
+        public void WriteDouble(double obj)
+        {
+            _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(double));
+        }
+
+        /// <summary>
+        /// write float
+        /// </summary>
+        /// <param name="obj">float to be written</param>
+        public void WriteFloat(float obj)
+        {
+            _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(float));
+        }
+
+        /// <summary>
+        /// write long
+        /// </summary>
+        /// <param name="obj">long to be written</param>
+        public void WriteLong(long obj)
+        {
+            _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(long));
+        }
+
+        /// <summary>
+        /// write boolean
+        /// </summary>
+        /// <param name="obj">bool to be written</param>
+        public void WriteBoolean(bool obj)
+        {
+            _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(bool));
+        }
+
+        /// <summary>
+        /// write integer
+        /// </summary>
+        /// <param name="obj">int to be written</param>
+        public void WriteInt32(int obj)
+        {
+            _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(int));
+        }
+
+        /// <summary>
+        /// write short
+        /// </summary>
+        /// <param name="obj">short to be written</param>
+        public void WriteInt16(short obj)
+        {
+            _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(short));
+        }
+
+        /// <summary>
+        /// write string
+        /// </summary>
+        /// <param name="obj">string to be written</param>
+        public void WriteString(string obj)
+        {
+            var charString = obj.ToCharArray();
+            byte[] byteString = new byte[charString.Length * sizeof(char)];
+            WriteInt32(byteString.Length);
+            Buffer.BlockCopy(charString, 0, byteString, 0, byteString.Length);
+            _stream.Write(byteString, 0, byteString.Length);
+        }
+
+        /// <summary>
+        /// write bytes to the byte array
+        /// </summary>
+        /// <param name="buffer">byte array from which to read</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="count">number of bytes to write</param>
+        public void Write(byte[] buffer, int index, int count)
+        {
+            _stream.Write(buffer, index, count);
+        }
+
+        /// <summary>
+        /// write double asynchronously
+        /// </summary>
+        /// <param name="obj">double to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteDoubleAsync(double obj, CancellationToken token)
+        {
+            await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, 
sizeof(double), token);
+        }
+
+        /// <summary>
+        /// write float asynchronously
+        /// </summary>
+        /// <param name="obj">float to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteFloatAsync(float obj, CancellationToken token)
+        {
+            await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, 
sizeof(float), token);
+        }
+
+        /// <summary>
+        /// write long asynchronously
+        /// </summary>
+        /// <param name="obj">long to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteLongAsync(long obj, CancellationToken token)
+        {
+            await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, 
sizeof(long), token);
+        }
+
+        /// <summary>
+        /// write bool asynchronously
+        /// </summary>
+        /// <param name="obj">bool to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteBooleanAsync(bool obj, CancellationToken token)
+        {
+            await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, 
sizeof(bool), token);
+        }
+
+        /// <summary>
+        /// write integer asynchronously
+        /// </summary>
+        /// <param name="obj">integer to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteInt32Async(int obj, CancellationToken token)
+        {
+            await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, 
sizeof(int), token);
+        }
+
+        /// <summary>
+        /// write short asynchronously
+        /// </summary>
+        /// <param name="obj">short to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteInt16Async(short obj, CancellationToken token)
+        {
+            await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, 
sizeof(short), token);
+        }
+
+        /// <summary>
+        /// write string asynchronously
+        /// </summary>
+        /// <param name="obj">string to be written</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteStringAsync(string obj, CancellationToken token)
+        {
+            var charString = obj.ToCharArray();
+            byte[] byteString = new byte[charString.Length * sizeof(char)];
+            await WriteInt32Async(byteString.Length, token);
+            Buffer.BlockCopy(charString, 0, byteString, 0, byteString.Length);
+            await _stream.WriteAsync(byteString, 0, byteString.Length, token);
+        }
+
+        /// <summary>
+        /// write bytes to the byte array asynchronously
+        /// </summary>
+        /// <param name="buffer">byte array from which to read</param>
+        /// <param name="index">starting index in the array</param>
+        /// <param name="count">number of bytes to write</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>the handler to the task</returns>
+        public async Task WriteAsync(byte[] buffer, int index, int count, 
CancellationToken token)
+        {
+            await _stream.WriteAsync(buffer, index, count, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
new file mode 100644
index 0000000..f867240
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Represents an open connection between remote hosts. This class is not 
thread safe
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableLink<T> : ILink<T> where T : IWritable
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(WritableLink<T>));
+
+        private readonly IPEndPoint _localEndpoint;
+        private bool _disposed;
+        private readonly NetworkStream _stream;
+        
+        /// <summary>
+        /// Cache structure to store the constructor functions for various 
types.
+        /// </summary>
+        private readonly TypeCache<T> _cache;
+
+        /// <summary>
+        /// Stream reader to be passed to IWritable
+        /// </summary>
+        private readonly StreamDataReader _reader;
+
+        /// <summary>
+        /// Stream writer from which to read from IWritable
+        /// </summary>
+        private readonly StreamDataWriter _writer;
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Connects to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
+        public WritableLink(IPEndPoint remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            Client = new TcpClient();
+            Client.Connect(remoteEndpoint);
+
+            _stream = Client.GetStream();
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+            _cache = new TypeCache<T>();
+            _reader = new StreamDataReader(_stream);
+            _writer = new StreamDataWriter(_stream);
+        }
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Uses the already connected TcpClient.
+        /// </summary>
+        /// <param name="client">The already connected client</param>
+        public WritableLink(TcpClient client)
+        {
+            if (client == null)
+            {
+                throw new ArgumentNullException("client");
+            }
+
+            Client = client;
+            _stream = Client.GetStream();
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+            _cache = new TypeCache<T>();
+            _reader = new StreamDataReader(_stream);
+            _writer = new StreamDataWriter(_stream);
+        }
+
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _localEndpoint; }
+        }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        public IPEndPoint RemoteEndpoint
+        {
+            get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
+        }
+
+        /// <summary>
+        /// Gets the underlying TcpClient
+        /// </summary>
+        public TcpClient Client { get; private set; }
+
+        /// <summary>
+        /// Writes the message to the remote host
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        public void Write(T value)
+        {
+            if (value == null)
+            {
+                throw new ArgumentNullException("value");
+            }
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), Logger);
+            }
+
+            _writer.WriteString(value.GetType().AssemblyQualifiedName);
+            value.Write(_writer);
+        }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(T value, CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), Logger);
+            }
+
+            await 
_writer.WriteStringAsync(value.GetType().AssemblyQualifiedName, token);
+            await value.WriteAsync(_writer, token);
+        }
+
+        /// <summary>
+        /// Reads the value from the link synchronously
+        /// </summary>
+        public T Read()
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), Logger);
+            }
+
+            string dataType = _reader.ReadString();
+
+            if (dataType == null)
+            {
+                return default(T);
+            }
+
+            T value = _cache.GetInstance(dataType);
+
+            if (value == null)
+            {
+                return default(T);
+            }
+            
+            value.Read(_reader);
+            return value;
+        }
+
+        /// <summary>
+        /// Reads the value from the link asynchronously
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        public async Task<T> ReadAsync(CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), Logger);
+            }
+
+            string dataType = "";
+
+            dataType = await _reader.ReadStringAsync(token);
+
+            if (dataType == null)
+            {
+                return default(T);
+            }
+
+            T value = _cache.GetInstance(dataType);
+
+            if(value==null)
+            {
+                return default(T);
+            }
+
+            await value.ReadAsync(_reader, token);
+            return value;
+        }
+
+        /// <summary>
+        /// Close the client connection
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Subclasses of Links should overwrite this to handle disposing
+        /// of the link
+        /// </summary>
+        /// <param name="disposing">To dispose or not</param>
+        public virtual void Dispose(bool disposing)
+        {
+            if (_disposed)
+            {
+                return;
+            }
+
+            if (disposing)
+            {
+                try
+                {
+                    Client.GetStream().Close();
+                }
+                catch (InvalidOperationException)
+                {
+                    Logger.Log(Level.Warning, "failed to close stream on a 
non-connected socket.");
+                }
+
+                Client.Close();
+            }
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Overrides Equals. Two Link objects are equal if they are connected
+        /// to the same remote endpoint.
+        /// </summary>
+        /// <param name="obj">The object to compare</param>
+        /// <returns>True if the object is equal to this Link, otherwise 
false</returns>
+        public override bool Equals(object obj)
+        {
+            Link<T> other = obj as Link<T>;
+            if (other == null)
+            {
+                return false;
+            }
+
+            return other.RemoteEndpoint.Equals(RemoteEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the hash code for the Link object.
+        /// </summary>
+        /// <returns>The object's hash code</returns>
+        public override int GetHashCode()
+        {
+            return RemoteEndpoint.GetHashCode();
+        }
+
+        /// <summary>
+        /// Discovers the IPEndpoint for the current machine.
+        /// </summary>
+        /// <returns>The local IPEndpoint</returns>
+        private IPEndPoint GetLocalEndpoint()
+        {
+            IPAddress address = NetworkUtils.LocalIPAddress;
+            int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
+            return new IPEndPoint(address, port);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
new file mode 100644
index 0000000..5e42fb7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Establish connections to TransportServer for remote message passing
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableTransportClient<T> : IDisposable where T : IWritable
+    {
+        private readonly ILink<T> _link;
+        private readonly IObserver<TransportEvent<T>> _observer;
+        private readonly CancellationTokenSource _cancellationSource;
+        private bool _disposed;
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(WritableTransportClient<T>));
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
+        public WritableTransportClient(IPEndPoint remoteEndpoint)
+        {
+            Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", 
Logger);
+
+            _link = new WritableLink<T>(remoteEndpoint);
+            _cancellationSource = new CancellationTokenSource();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
+        /// <param name="observer">Callback used when receiving responses from 
remote host</param>
+        public WritableTransportClient(IPEndPoint remoteEndpoint,
+            IObserver<TransportEvent<T>> observer)
+            : this(remoteEndpoint)
+        {
+            _observer = observer;
+            Task.Run(() => ResponseLoop());
+        }
+
+        /// <summary>
+        /// Gets the underlying transport link.
+        /// </summary>
+        public ILink<T> Link
+        {
+            get { return _link; }
+        }
+
+        /// <summary>
+        /// Send the remote message.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Send(T message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+
+            _link.Write(message);
+        }
+
+        /// <summary>
+        /// Close all opened connections
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if (!_disposed && disposing)
+            {
+                _link.Dispose();
+                _disposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Continually read responses from remote host
+        /// </summary>
+        private async Task ResponseLoop()
+        {
+            while (!_cancellationSource.IsCancellationRequested)
+            {
+                T message = await _link.ReadAsync(_cancellationSource.Token);
+                if (message == null)
+                {
+                    break;
+                }
+
+                TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, _link);
+                _observer.OnNext(transportEvent);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
new file mode 100644
index 0000000..05a520d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Server to handle incoming remote messages.
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableTransportServer<T> : IDisposable where T : IWritable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(TransportServer<>));
+
+        private readonly TcpListener _listener;
+        private readonly CancellationTokenSource _cancellationSource;
+        private readonly IObserver<TransportEvent<T>> _remoteObserver;
+        private bool _disposed;
+        private Task _serverTask;
+
+        /// <summary>
+        /// Constructs a TransportServer to listen for remote events.  
+        /// Listens on the specified remote endpoint.  When it recieves a 
remote
+        /// event, it will envoke the specified remote handler.
+        /// </summary>
+        /// <param name="port">Port to listen on</param>
+        /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
+        /// remote messages</param>
+        public WritableTransportServer(int port, IObserver<TransportEvent<T>> 
remoteHandler)
+            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), 
remoteHandler)
+        {
+        }
+
+        /// <summary>
+        /// Constructs a TransportServer to listen for remote events.  
+        /// Listens on the specified remote endpoint.  When it recieves a 
remote
+        /// event, it will envoke the specified remote handler.
+        /// </summary>
+        /// <param name="localEndpoint">Endpoint to listen on</param>
+        /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
+        /// remote messages</param>
+        public WritableTransportServer(IPEndPoint localEndpoint,
+            IObserver<TransportEvent<T>> remoteHandler)
+        {
+            _listener = new TcpListener(localEndpoint.Address, 
localEndpoint.Port);
+            _remoteObserver = remoteHandler;
+            _cancellationSource = new CancellationTokenSource();
+            _cancellationSource.Token.ThrowIfCancellationRequested();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Returns the listening endpoint for the TransportServer 
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _listener.LocalEndpoint as IPEndPoint; }
+        }
+
+        /// <summary>
+        /// Starts listening for incoming remote messages.
+        /// </summary>
+        public void Run()
+        {
+            _listener.Start();
+            _serverTask = Task.Run(() => StartServer());
+        }
+
+        /// <summary>
+        /// Close the TransportServer and all open connections
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public void Dispose(bool disposing)
+        {
+            if (!_disposed && disposing)
+            {
+                _cancellationSource.Cancel();
+
+                try
+                {
+                    _listener.Stop();
+                }
+                catch (SocketException)
+                {
+                    LOGGER.Log(Level.Info, "Disposing of transport server 
before listener is created.");
+                }
+
+                if (_serverTask != null)
+                {
+                    _serverTask.Wait();
+
+                    // Give the TransportServer Task 500ms to shut down, 
ignore any timeout errors
+                    try
+                    {
+                        CancellationTokenSource serverDisposeTimeout = new 
CancellationTokenSource(500);
+                        _serverTask.Wait(serverDisposeTimeout.Token);
+                    }
+                    catch (Exception e)
+                    {
+                        Console.Error.WriteLine(e);
+                    }
+                    finally
+                    {
+                        _serverTask.Dispose();
+                    }
+                }
+            }
+
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Helper method to start TransportServer.  This will
+        /// be run in an asynchronous Task.
+        /// </summary>
+        /// <returns>An asynchronous Task for the running server.</returns>
+        private async Task StartServer()
+        {
+            try
+            {
+                while (!_cancellationSource.Token.IsCancellationRequested)
+                {
+                    TcpClient client = await 
_listener.AcceptTcpClientAsync().ConfigureAwait(false);
+                    ProcessClient(client).Forget();
+                }
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+            catch (OperationCanceledException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+        }
+
+        /// <summary>
+        /// Recieves event from connected TcpClient and invokes handler on the 
event.
+        /// </summary>
+        /// <param name="client">The connected client</param>
+        private async Task ProcessClient(TcpClient client)
+        {
+            // Keep reading messages from client until they disconnect or 
timeout
+            CancellationToken token = _cancellationSource.Token;
+            using (ILink<T> link = new WritableLink<T>(client))
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    //T message = link.Read();
+                    T message = await link.ReadAsync(token);
+
+                    if (message == null)
+                    {
+                        //LOGGER.Log(Level.Error,
+                   //         "ProcessClient, no message received, break." + 
link.RemoteEndpoint + " - " +
+                      //      link.LocalEndpoint);
+                        break;
+                    }
+
+                    TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, link);
+
+                    _remoteObserver.OnNext(transportEvent);
+                }
+                LOGGER.Log(Level.Error,
+                    "ProcessClient close the Link. IsCancellationRequested: " 
+ token.IsCancellationRequested);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs
new file mode 100644
index 0000000..9bbe549
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Linq.Expressions;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Cache used to store the constructor functions to instantiate various 
Types.
+    /// It is assumed that all types are inherited from the base type T
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public class TypeCache<T>
+    {
+        private const BindingFlags ConstructorFlags =
+            BindingFlags.Public | BindingFlags.NonPublic | 
BindingFlags.Instance;
+
+        /// <summary>
+        /// Cache that stores the constructors for already used types using 
the assmebly name
+        /// </summary>
+        private readonly Dictionary<string, Func<T>> _typeConstructorMapping = 
new Dictionary<string, Func<T>>();
+
+        public T GetInstance(string typeString)
+        {
+            if (!_typeConstructorMapping.ContainsKey(typeString))
+            {
+                var type = Type.GetType(typeString);
+
+                if (type != null)
+                {
+                    _typeConstructorMapping[typeString] = GetActivator(type);
+                }
+            }
+
+            return _typeConstructorMapping[typeString]();
+        }
+
+        /// <summary>
+        /// Returns the constructor for type T given actual type. Type can be
+        /// that of inherited class.
+        /// <param name="actualType">The actual type for which we want to 
create the constructor.</param>
+        /// <returns>The constructor function</returns>
+        /// </summary>
+        private Func<T> GetActivator(Type actualType)
+        {
+            ConstructorInfo constructor;
+            if (actualType.IsValueType)
+            {
+                // For struct types, there is an implicit default constructor.
+                constructor = null;
+            }
+            else if (!TryGetDefaultConstructor(actualType, out constructor))
+            {
+                throw new Exception("could not get default constructor");
+            }
+            NewExpression nex = constructor == null ? 
Expression.New(actualType) : Expression.New(constructor);
+            var body = Expression.Convert(nex, typeof (T));
+            Expression<Func<T>> lambda = Expression.Lambda<Func<T>>(body);
+
+            return lambda.Compile();
+        }
+
+        /// <summary>
+        /// Fills the constructor information and meta-data
+        /// </summary>
+        /// <param name="type">The type for which constructor needs to be 
created</param>
+        /// <param name="constructor">The information and meta data for the 
constructor creation</param>
+        /// <returns></returns>
+        private bool TryGetDefaultConstructor(Type type, out ConstructorInfo 
constructor)
+        {
+            // first, determine if there is a suitable constructor
+            if (type.IsAbstract || type.IsInterface)
+            {
+                constructor = null;
+                return false;
+            }
+
+            constructor = type.GetConstructor(ConstructorFlags, null, 
Type.EmptyTypes, null);
+            return null != constructor;
+        }
+    }
+}

Reply via email to