Repository: incubator-reef
Updated Branches:
  refs/heads/master ec9b497d4 -> ba2653d6b


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
new file mode 100644
index 0000000..6eb0190
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Manages incoming and outgoing messages between remote hosts.
+    /// </summary>
+    /// <typeparam name="T">Message type T.</typeparam>
+    internal sealed class StreamingRemoteManager<T> : IRemoteManager<T>
+    {
+        private readonly ObserverContainer<T> _observerContainer;
+        private readonly StreamingTransportServer<IRemoteEvent<T>> _server;
+        private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+        private readonly IStreamingCodec<IRemoteEvent<T>> _remoteEventCodec;
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified 
address and
+        /// a specific port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="tcpPortProvider">Tcp port provider</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+        public StreamingRemoteManager(IPAddress localAddress, ITcpPortProvider 
tcpPortProvider, IStreamingCodec<T> streamingCodec)
+        {
+            if (localAddress == null)
+            {
+                throw new ArgumentNullException("localAddress");
+            }
+
+            _observerContainer = new ObserverContainer<T>();
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+            _remoteEventCodec = new 
RemoteEventStreamingCodec<T>(streamingCodec);
+
+            // Begin to listen for incoming messages
+            _server = new 
StreamingTransportServer<IRemoteEvent<T>>(localAddress, _observerContainer, 
tcpPortProvider, _remoteEventCodec);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;  
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the RemoteIdentifier for the DefaultRemoteManager
+        /// </summary>
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        /// <summary>
+        /// Gets the local IPEndPoint for the DefaultRemoteManager
+        /// </summary>
+        public IPEndPoint LocalEndpoint { get; private set; }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
+        /// <returns>An IObserver used to send messages to the remote 
host</returns>
+        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> 
remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetRemoteObserver(id.Addr);
+        }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
+        /// <returns>An IObserver used to send messages to the remote 
host</returns>
+        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver;
+            if (!_cachedClients.TryGetValue(remoteEndpoint, out 
remoteObserver))
+            {
+                StreamingTransportClient<IRemoteEvent<T>> client =
+                    new 
StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, _observerContainer, 
_remoteEventCodec);
+
+                remoteObserver = new ProxyObserver(client);
+                _cachedClients[remoteEndpoint] = remoteObserver;
+            }
+
+            return remoteObserver;
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the 
IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(RemoteEventEndPoint<T> 
remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return RegisterObserver(id.Addr, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the 
IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, 
IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(remoteEndpoint, 
observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the 
IObserver.
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> 
observer)
+        {
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(observer);
+        }
+        
+        /// <summary>
+        /// Release all resources for the DefaultRemoteManager.
+        /// </summary>
+        public void Dispose()
+        {
+            foreach (ProxyObserver cachedClient in _cachedClients.Values)
+            {
+                cachedClient.Dispose();
+            }
+
+            if (_server != null)
+            {
+                _server.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Observer to send messages to connected remote host
+        /// </summary>
+        private class ProxyObserver : IObserver<T>, IDisposable
+        {
+            private readonly StreamingTransportClient<IRemoteEvent<T>> _client;
+
+            /// <summary>
+            /// Create new ProxyObserver
+            /// </summary>
+            /// <param name="client">The connected WritableTransport client 
used to send
+            /// messages to remote host</param>
+            public ProxyObserver(StreamingTransportClient<IRemoteEvent<T>> 
client)
+            {
+                _client = client;
+            }
+
+            /// <summary>
+            /// Send the message to the remote host
+            /// </summary>
+            /// <param name="message">The message to send</param>
+            public void OnNext(T message)
+            {
+                IRemoteEvent<T> remoteEvent = new 
RemoteEvent<T>(_client.Link.LocalEndpoint,
+                    _client.Link.RemoteEndpoint,
+                    message);
+
+                _client.Send(remoteEvent);
+            }
+
+            /// <summary>
+            /// Close underlying WritableTransport client
+            /// </summary>
+            public void Dispose()
+            {
+                _client.Dispose();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw error;
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
new file mode 100644
index 0000000..90e3aca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// StreamingRemoteManagerFactory for StreamingRemoteManager.
+    /// </summary>
+    public sealed class StreamingRemoteManagerFactory
+    {
+        private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly IInjector _injector;
+
+        [Inject]  
+        private StreamingRemoteManagerFactory(ITcpPortProvider 
tcpPortProvider, IInjector injector)
+        {
+            _tcpPortProvider = tcpPortProvider;
+            _injector = injector;
+        }
+
+        //ToDo: The port argument will be removed once changes are made in 
WritableNetworkService [REEF-447]
+        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int 
port) where T : IWritable
+        {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling 
the warning is OK.
+            var codec = 
_injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
+            return new StreamingRemoteManager<T>(localAddress, 
_tcpPortProvider, codec);
+#pragma warning disable 618
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
new file mode 100644
index 0000000..111be5d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Establish connections to TransportServer for remote message passing
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message.</typeparam>
+    internal sealed class StreamingTransportClient<T> : IDisposable
+    {
+        private readonly ILink<T> _link;
+        private readonly IObserver<TransportEvent<T>> _observer;
+        private readonly CancellationTokenSource _cancellationSource;
+        private bool _disposed;
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(StreamingTransportClient<T>));
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingTransportClient(IPEndPoint remoteEndpoint, 
IStreamingCodec<T> streamingCodec)
+        {
+            Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", 
Logger);
+
+            _link = new StreamingLink<T>(remoteEndpoint, streamingCodec);
+            _cancellationSource = new CancellationTokenSource();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
+        /// <param name="observer">Callback used when receiving responses from 
remote host</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingTransportClient(IPEndPoint remoteEndpoint,
+            IObserver<TransportEvent<T>> observer,
+            IStreamingCodec<T> streamingCodec)
+            : this(remoteEndpoint, streamingCodec)
+        {
+            _observer = observer;
+            Task.Run(() => ResponseLoop());
+        }
+
+        /// <summary>
+        /// Gets the underlying transport link.
+        /// </summary>
+        public ILink<T> Link
+        {
+            get { return _link; }
+        }
+
+        /// <summary>
+        /// Send the remote message.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Send(T message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+
+            _link.Write(message);
+        }
+
+        /// <summary>
+        /// Close all opened connections
+        /// </summary>
+        public void Dispose()
+        {
+            if (!_disposed)
+            {
+                _link.Dispose();
+                _disposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Continually read responses from remote host
+        /// </summary>
+        private async Task ResponseLoop()
+        {
+            while (!_cancellationSource.IsCancellationRequested)
+            {
+                T message = await _link.ReadAsync(_cancellationSource.Token);
+                if (message == null)
+                {
+                    break;
+                }
+
+                TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, _link);
+                _observer.OnNext(transportEvent);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
new file mode 100644
index 0000000..3f05c17
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Server to handle incoming remote messages.
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
+    internal sealed class StreamingTransportServer<T> : IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(TransportServer<>));
+
+        private TcpListener _listener;
+        private readonly CancellationTokenSource _cancellationSource;
+        private readonly IObserver<TransportEvent<T>> _remoteObserver;
+        private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly IStreamingCodec<T> _streamingCodec;
+        private bool _disposed;
+        private Task _serverTask;
+
+        /// <summary>
+        /// Constructs a TransportServer to listen for remote events.  
+        /// Listens on the specified remote endpoint.  When it recieves a 
remote
+        /// event, it will envoke the specified remote handler.
+        /// </summary>
+        /// <param name="address">Endpoint addres to listen on</param>
+        /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
+        /// remote messages</param>
+        /// <param name="tcpPortProvider">Find port numbers if listenport is 
0</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingTransportServer(
+            IPAddress address,
+            IObserver<TransportEvent<T>> remoteHandler,
+            ITcpPortProvider tcpPortProvider,
+            IStreamingCodec<T> streamingCodec)
+        {
+            _listener = new TcpListener(address, 0);
+            _remoteObserver = remoteHandler;
+            _tcpPortProvider = tcpPortProvider;
+            _cancellationSource = new CancellationTokenSource();
+            _cancellationSource.Token.ThrowIfCancellationRequested();
+            _streamingCodec = streamingCodec;
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Returns the listening endpoint for the TransportServer 
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _listener.LocalEndpoint as IPEndPoint; }
+        }
+
+        /// <summary>
+        /// Starts listening for incoming remote messages.
+        /// </summary>
+        public void Run()
+        {
+            FindAPortAndStartListener();
+            _serverTask = Task.Run(() => StartServer());
+        }
+
+        private void FindAPortAndStartListener()
+        {
+            var foundAPort = false;
+            var exception = new 
SocketException((int)SocketError.AddressAlreadyInUse);
+            for (var enumerator = _tcpPortProvider.GetEnumerator();
+                !foundAPort && enumerator.MoveNext();
+                )
+            {
+                _listener = new TcpListener(LocalEndpoint.Address, 
enumerator.Current);
+                try
+                {
+                    _listener.Start();
+                    foundAPort = true;
+                }
+                catch (SocketException e)
+                {
+                    exception = e;
+                }
+            }
+            if (!foundAPort)
+            {
+                Exceptions.Throw(exception, "Could not find a port to listen 
on", LOGGER);
+            }
+            LOGGER.Log(Level.Info,
+                String.Format("Listening on {0}", 
_listener.LocalEndpoint.ToString()));
+        }
+
+
+        /// <summary>
+        /// Close the TransportServer and all open connections
+        /// </summary>
+        public void Dispose()
+        {
+            if (!_disposed)
+            {
+                _cancellationSource.Cancel();
+
+                try
+                {
+                    _listener.Stop();
+                }
+                catch (SocketException)
+                {
+                    LOGGER.Log(Level.Info, "Disposing of transport server 
before listener is created.");
+                }
+
+                if (_serverTask != null)
+                {
+                    _serverTask.Wait();
+
+                    // Give the TransportServer Task 500ms to shut down, 
ignore any timeout errors
+                    try
+                    {
+                        CancellationTokenSource serverDisposeTimeout = new 
CancellationTokenSource(500);
+                        _serverTask.Wait(serverDisposeTimeout.Token);
+                    }
+                    catch (Exception e)
+                    {
+                        Console.Error.WriteLine(e);
+                    }
+                    finally
+                    {
+                        _serverTask.Dispose();
+                    }
+                }
+            }
+
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Helper method to start TransportServer.  This will
+        /// be run in an asynchronous Task.
+        /// </summary>
+        /// <returns>An asynchronous Task for the running server.</returns>
+        private async Task StartServer()
+        {
+            try
+            {
+                while (!_cancellationSource.Token.IsCancellationRequested)
+                {
+                    TcpClient client = await 
_listener.AcceptTcpClientAsync().ConfigureAwait(false);
+                    ProcessClient(client).Forget();
+                }
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+            catch (OperationCanceledException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+        }
+
+        /// <summary>
+        /// Recieves event from connected TcpClient and invokes handler on the 
event.
+        /// </summary>
+        /// <param name="client">The connected client</param>
+        private async Task ProcessClient(TcpClient client)
+        {
+            // Keep reading messages from client until they disconnect or 
timeout
+            CancellationToken token = _cancellationSource.Token;
+            using (ILink<T> link = new StreamingLink<T>(client, 
_streamingCodec))
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    T message = await link.ReadAsync(token);
+
+                    if (message == null)
+                    {
+                        break;
+                    }
+
+                    TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, link);
+                    _remoteObserver.OnNext(transportEvent);
+                }
+                LOGGER.Log(Level.Error,
+                    "ProcessClient close the Link. IsCancellationRequested: " 
+ token.IsCancellationRequested);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
new file mode 100644
index 0000000..1af9b86
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    // TODO: This class will be removed shortly and is used to reduce the size 
of pull request.
+    internal sealed class TemporaryWritableToStreamingCodec<T> : 
IStreamingCodec<T> where T:IWritable
+    {
+        private readonly IInjector _injector;
+
+        [Inject]
+        public TemporaryWritableToStreamingCodec(IInjector injector)
+        {
+            _injector = injector;
+        }
+
+        public T Read(IDataReader reader)
+        {
+            string type = reader.ReadString();
+            var value = (T) _injector.ForkInjector().GetInstance(type);
+            value.Read(reader);
+            return value;
+        }
+
+        public void Write(T obj, IDataWriter writer)
+        {
+            writer.WriteString(obj.GetType().AssemblyQualifiedName);
+            obj.Write(writer);
+        }
+
+        public async Task<T> ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            string type = await reader.ReadStringAsync(token);
+            var value = (T) _injector.ForkInjector().GetInstance(type);
+            await value.ReadAsync(reader, token);
+            return value;
+        }
+
+        public async Task WriteAsync(T obj, IDataWriter writer, 
CancellationToken token)
+        {
+            await writer.WriteStringAsync(obj.GetType().AssemblyQualifiedName, 
token);
+            await obj.WriteAsync(writer, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
deleted file mode 100644
index 9859338..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Represents an open connection between remote hosts. This class is not 
thread safe
-    /// </summary>
-    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableLink<T> : ILink<T> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof 
(WritableLink<T>));
-
-        private readonly IPEndPoint _localEndpoint;
-        private bool _disposed;
-        private readonly NetworkStream _stream;
-        private readonly IInjector _injector;
-       
-
-        /// <summary>
-        /// Stream reader to be passed to IWritable
-        /// </summary>
-        private readonly StreamDataReader _reader;
-
-        /// <summary>
-        /// Stream writer from which to read from IWritable
-        /// </summary>
-        private readonly StreamDataWriter _writer;
-
-        /// <summary>
-        /// Constructs a Link object.
-        /// Connects to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        public WritableLink(IPEndPoint remoteEndpoint, IInjector injector)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            Client = new TcpClient();
-            Client.Connect(remoteEndpoint);
-
-            _stream = Client.GetStream();
-            _localEndpoint = GetLocalEndpoint();
-            _disposed = false;
-            _reader = new StreamDataReader(_stream);
-            _writer = new StreamDataWriter(_stream);
-            _injector = injector;
-        }
-
-        /// <summary>
-        /// Constructs a Link object.
-        /// Uses the already connected TcpClient.
-        /// </summary>
-        /// <param name="client">The already connected client</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        public WritableLink(TcpClient client, IInjector injector)
-        {
-            if (client == null)
-            {
-                throw new ArgumentNullException("client");
-            }
-
-            Client = client;
-            _stream = Client.GetStream();
-            _localEndpoint = GetLocalEndpoint();
-            _disposed = false;
-            _reader = new StreamDataReader(_stream);
-            _writer = new StreamDataWriter(_stream);
-            _injector = injector;
-        }
-
-        /// <summary>
-        /// Returns the local socket address
-        /// </summary>
-        public IPEndPoint LocalEndpoint
-        {
-            get { return _localEndpoint; }
-        }
-
-        /// <summary>
-        /// Returns the remote socket address
-        /// </summary>
-        public IPEndPoint RemoteEndpoint
-        {
-            get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
-        }
-
-        /// <summary>
-        /// Gets the underlying TcpClient
-        /// </summary>
-        public TcpClient Client { get; private set; }
-
-        /// <summary>
-        /// Writes the message to the remote host
-        /// </summary>
-        /// <param name="value">The data to write</param>
-        public void Write(T value)
-        {
-            if (value == null)
-            {
-                throw new ArgumentNullException("value");
-            }
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), Logger);
-            }
-
-            _writer.WriteString(value.GetType().AssemblyQualifiedName);
-            value.Write(_writer);
-        }
-
-        /// <summary>
-        /// Writes the value to this link asynchronously
-        /// </summary>
-        /// <param name="value">The data to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(T value, CancellationToken token)
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), Logger);
-            }
-
-            await 
_writer.WriteStringAsync(value.GetType().AssemblyQualifiedName, token);
-            await value.WriteAsync(_writer, token);
-        }
-
-        /// <summary>
-        /// Reads the value from the link synchronously
-        /// </summary>
-        public T Read()
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), Logger);
-            }
-
-            string dataType = _reader.ReadString();
-
-            if (dataType == null)
-            {
-                return default(T);
-            }
-
-            try
-            {
-                T value = (T) _injector.ForkInjector().GetInstance(dataType);
-                value.Read(_reader);
-                return value;
-            }
-            catch (InjectionException)
-            {
-                return default(T);
-            }
-        }
-
-        /// <summary>
-        /// Reads the value from the link asynchronously
-        /// </summary>
-        /// <param name="token">The cancellation token</param>
-        public async Task<T> ReadAsync(CancellationToken token)
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), Logger);
-            }
-
-            string dataType = "";
-
-            dataType = await _reader.ReadStringAsync(token);
-
-            if (dataType == null)
-            {
-                return default(T);
-            }
-
-            try
-            {
-                T value = (T) _injector.ForkInjector().GetInstance(dataType);
-                await value.ReadAsync(_reader, token);
-                return value;
-            }
-            catch (InjectionException)
-            {
-                return default(T);
-            }
-        }
-
-        /// <summary>
-        /// Close the client connection
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        /// <summary>
-        /// Subclasses of Links should overwrite this to handle disposing
-        /// of the link
-        /// </summary>
-        /// <param name="disposing">To dispose or not</param>
-        public virtual void Dispose(bool disposing)
-        {
-            if (_disposed)
-            {
-                return;
-            }
-
-            if (disposing)
-            {
-                try
-                {
-                    Client.GetStream().Close();
-                }
-                catch (InvalidOperationException)
-                {
-                    Logger.Log(Level.Warning, "failed to close stream on a 
non-connected socket.");
-                }
-
-                Client.Close();
-            }
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Overrides Equals. Two Link objects are equal if they are connected
-        /// to the same remote endpoint.
-        /// </summary>
-        /// <param name="obj">The object to compare</param>
-        /// <returns>True if the object is equal to this Link, otherwise 
false</returns>
-        public override bool Equals(object obj)
-        {
-            Link<T> other = obj as Link<T>;
-            if (other == null)
-            {
-                return false;
-            }
-
-            return other.RemoteEndpoint.Equals(RemoteEndpoint);
-        }
-
-        /// <summary>
-        /// Gets the hash code for the Link object.
-        /// </summary>
-        /// <returns>The object's hash code</returns>
-        public override int GetHashCode()
-        {
-            return RemoteEndpoint.GetHashCode();
-        }
-
-        /// <summary>
-        /// Discovers the IPEndpoint for the current machine.
-        /// </summary>
-        /// <returns>The local IPEndpoint</returns>
-        private IPEndPoint GetLocalEndpoint()
-        {
-            IPAddress address = NetworkUtils.LocalIPAddress;
-            int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
-            return new IPEndPoint(address, port);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
deleted file mode 100644
index 9790e3e..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Net;
-using Org.Apache.REEF.Wake.Util;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Stores registered IObservers for DefaultRemoteManager.
-    /// Can register and look up IObservers by remote IPEndPoint.
-    /// </summary>
-    /// <typeparam name="T">Message type T. It is assumed to be 
IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    internal class WritableObserverContainer<T> : 
IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable
-    {
-        private static readonly Logger Logger = 
Logger.GetLogger(typeof(WritableObserverContainer<>));
-        private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> 
_endpointMap;
-        private readonly ConcurrentDictionary<Type, 
IObserver<IRemoteMessage<T>>> _typeMap;
-        private IObserver<T> _universalObserver;
-
-        /// <summary>
-        /// Constructs a new ObserverContainer used to manage remote 
IObservers.
-        /// </summary>
-        public WritableObserverContainer()
-        {
-            _endpointMap = new ConcurrentDictionary<IPEndPoint, 
IObserver<T>>(new IPEndPointComparer());
-            _typeMap = new ConcurrentDictionary<Type, 
IObserver<IRemoteMessage<T>>>();
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the 
remote host
-        /// at the specified IPEndPoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
-        /// <param name="observer">The IObserver to handle incoming 
messages</param>
-        /// <returns>An IDisposable used to unregister the observer 
with</returns>
-        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, 
IObserver<T> observer) 
-        {
-            if (remoteEndpoint.Address.Equals(IPAddress.Any))
-            {
-                _universalObserver = observer;
-                return Disposable.Create(() => { _universalObserver = null; });
-            }
-
-            _endpointMap[remoteEndpoint] = observer;
-            return Disposable.Create(() => 
_endpointMap.TryRemove(remoteEndpoint, out observer));
-        }
-
-        /// <summary>
-        /// Registers an IObserver to handle incoming messages from a remote 
host
-        /// </summary>
-        /// <param name="observer">The IObserver to handle incoming 
messages</param>
-        /// <returns>An IDisposable used to unregister the observer 
with</returns>
-        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> 
observer)
-        {
-            _typeMap[typeof(T)] = observer;
-            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out 
observer));
-        }
-
-        /// <summary>
-        /// Look up the IObserver for the registered IPEndPoint or event type 
-        /// and execute the IObserver.
-        /// </summary>
-        /// <param name="transportEvent">The incoming remote event</param>
-        public void OnNext(TransportEvent<IWritableRemoteEvent<T>> 
transportEvent)
-        {
-            IWritableRemoteEvent<T> remoteEvent = transportEvent.Data;
-            remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
-            remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
-            T value = remoteEvent.Value;
-            bool handled = false;
-
-            IObserver<T> observer1;
-            IObserver<IRemoteMessage<T>> observer2;
-            if (_universalObserver != null)
-            {
-                _universalObserver.OnNext(value);
-                handled = true;
-            }
-            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out 
observer1))
-            {
-                // IObserver was registered by IPEndpoint
-                observer1.OnNext(value);
-                handled = true;
-            } 
-            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
-            {
-                // IObserver was registered by event type
-                IRemoteIdentifier id = new 
SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
-                IRemoteMessage<T> remoteMessage = new 
DefaultRemoteMessage<T>(id, value);
-                observer2.OnNext(remoteMessage);
-                handled = true;
-            }
-
-            if (!handled)
-            {
-                throw new WakeRuntimeException("Unrecognized Wake RemoteEvent 
message");
-            }
-        }
-
-        public void OnError(Exception error)
-        {
-            throw error;
-        }
-
-        public void OnCompleted()
-        {
-            Logger.Log(Level.Info, "Exiting the Writable Observer Container");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
deleted file mode 100644
index b3664d0..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Writable remote event class
-    /// </summary>
-    /// <typeparam name="T">Type of remote event message. It is assumed that T 
implements IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> 
where T : IWritable
-    {
-        private readonly IInjector _injector;
-
-        /// <summary>
-        /// Creates the Remote Event
-        /// </summary>
-        /// <param name="localEndpoint">Local Address</param>
-        /// <param name="remoteEndpoint">Remote Address</param>
-        /// <param name="value">Actual message</param>
-        public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint 
remoteEndpoint, T value)
-        {
-            LocalEndPoint = localEndpoint;
-            RemoteEndPoint = remoteEndpoint;
-            Value = value;
-        }
-
-        /// <summary>
-        /// Creates empty Remote Event
-        /// </summary>
-        [Inject]
-        public WritableRemoteEvent(IInjector injector)
-        {
-            _injector = injector;
-        }
-
-        /// <summary>
-        /// Local Address
-        /// </summary>
-        public IPEndPoint LocalEndPoint { get; set; }
-
-        /// <summary>
-        /// Remote Address
-        /// </summary>
-        public IPEndPoint RemoteEndPoint { get; set; }
-
-        /// <summary>
-        /// The actual message
-        /// </summary>
-        public T Value { get; set; }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public void Read(IDataReader reader)
-        {
-            Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
-            Value.Read(reader);         
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public void Write(IDataWriter writer)
-        {
-            Value.Write(writer);           
-        }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
-        {
-            Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
-            await Value.ReadAsync(reader, token);      
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
-        {
-            await Value.WriteAsync(writer, token);    
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
deleted file mode 100644
index 73d8bb6..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Manages incoming and outgoing messages between remote hosts.
-    /// </summary>
-    /// <typeparam name="T">Message type T. It is assumed to be 
IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : 
IWritable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(WritableRemoteManager<T>));
-
-        private readonly WritableObserverContainer<T> _observerContainer;
-        private readonly WritableTransportServer<IWritableRemoteEvent<T>> 
_server;
-        private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
-        private readonly IInjector _injector;
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager listening on the specified 
address and
-        /// a specific port.
-        /// </summary>
-        /// <param name="localAddress">The address to listen on</param>
-        /// <param name="port">The port to listen on</param>
-        /// <param name="tcpPortProvider">Tcp port provider</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
-        public WritableRemoteManager(IPAddress localAddress, int port, 
ITcpPortProvider tcpPortProvider, IInjector injector)
-        {
-            if (localAddress == null)
-            {
-                throw new ArgumentNullException("localAddress");
-            }
-            if (port < 0)
-            {
-                throw new ArgumentException("Listening port must be greater 
than or equal to zero");
-            }
-
-            _observerContainer = new WritableObserverContainer<T>();
-            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-            _injector = injector;
-
-            IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
-
-            // Begin to listen for incoming messages
-            _server = new 
WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, 
_observerContainer, tcpPortProvider, injector);
-            _server.Run();
-
-            LocalEndpoint = _server.LocalEndpoint;  
-            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-        }
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager. Does not listen for incoming 
messages.
-        /// </summary>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
-        public WritableRemoteManager(IInjector injector)
-        {
-            using 
(LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager"))
-            {
-                _observerContainer = new WritableObserverContainer<T>();
-                _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-                _injector = injector;
-
-                LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
-                Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-            }
-        }
-
-        /// <summary>
-        /// Gets the RemoteIdentifier for the DefaultRemoteManager
-        /// </summary>
-        public IRemoteIdentifier Identifier { get; private set; }
-
-        /// <summary>
-        /// Gets the local IPEndPoint for the DefaultRemoteManager
-        /// </summary>
-        public IPEndPoint LocalEndpoint { get; private set; }
-
-        /// <summary>
-        /// Returns an IObserver used to send messages to the remote host at
-        /// the specified IPEndpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
-        /// <returns>An IObserver used to send messages to the remote 
host</returns>
-        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> 
remoteEndpoint)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
-            if (id == null)
-            {
-                throw new ArgumentException("ID not supported");
-            }
-
-            return GetRemoteObserver(id.Addr);
-        }
-
-        /// <summary>
-        /// Returns an IObserver used to send messages to the remote host at
-        /// the specified IPEndpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
-        /// <returns>An IObserver used to send messages to the remote 
host</returns>
-        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            ProxyObserver remoteObserver;
-            if (!_cachedClients.TryGetValue(remoteEndpoint, out 
remoteObserver))
-            {
-                WritableTransportClient<IWritableRemoteEvent<T>> client =
-                    new 
WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, 
_observerContainer, _injector);
-
-                remoteObserver = new ProxyObserver(client);
-                _cachedClients[remoteEndpoint] = remoteObserver;
-            }
-
-            return remoteObserver;
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the 
remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the 
IObserver.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
-        /// <param name="observer">The IObserver to handle incoming 
messages</param>
-        /// <returns>An IDisposable used to unregister the observer 
with</returns>
-        public IDisposable RegisterObserver(RemoteEventEndPoint<T> 
remoteEndpoint, IObserver<T> observer)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
-            if (id == null)
-            {
-                throw new ArgumentException("ID not supported");
-            }
-
-            return RegisterObserver(id.Addr, observer);
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the 
remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the 
IObserver.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
-        /// <param name="observer">The IObserver to handle incoming 
messages</param>
-        /// <returns>An IDisposable used to unregister the observer 
with</returns>
-        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, 
IObserver<T> observer)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            return _observerContainer.RegisterObserver(remoteEndpoint, 
observer);
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the 
remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the 
IObserver.
-        /// </summary>
-        /// <param name="observer">The IObserver to handle incoming 
messages</param>
-        /// <returns>An IDisposable used to unregister the observer 
with</returns>
-        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> 
observer)
-        {
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            return _observerContainer.RegisterObserver(observer);
-        }
-        
-        /// <summary>
-        /// Release all resources for the DefaultRemoteManager.
-        /// </summary>
-        public void Dispose()
-        {
-            foreach (ProxyObserver cachedClient in _cachedClients.Values)
-            {
-                cachedClient.Dispose();
-            }
-
-            if (_server != null)
-            {
-                _server.Dispose();
-            }
-        }
-
-        /// <summary>
-        /// Observer to send messages to connected remote host
-        /// </summary>
-        private class ProxyObserver : IObserver<T>, IDisposable
-        {
-            private readonly WritableTransportClient<IWritableRemoteEvent<T>> 
_client;
-
-            /// <summary>
-            /// Create new ProxyObserver
-            /// </summary>
-            /// <param name="client">The connected WritableTransport client 
used to send
-            /// messages to remote host</param>
-            public 
ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client)
-            {
-                _client = client;
-            }
-
-            /// <summary>
-            /// Send the message to the remote host
-            /// </summary>
-            /// <param name="message">The message to send</param>
-            public void OnNext(T message)
-            {
-                IWritableRemoteEvent<T> remoteEvent = new 
WritableRemoteEvent<T>(_client.Link.LocalEndpoint,
-                    _client.Link.RemoteEndpoint,
-                    message);
-
-                _client.Send(remoteEvent);
-            }
-
-            /// <summary>
-            /// Close underlying WritableTransport client
-            /// </summary>
-            public void Dispose()
-            {
-                _client.Dispose();
-            }
-
-            public void OnError(Exception error)
-            {
-                throw error;
-            }
-
-            public void OnCompleted()
-            {
-                throw new NotImplementedException();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
deleted file mode 100644
index 52fef8d..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// WritableRemoteManagerFactory for WritableRemoteManager.
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public sealed class WritableRemoteManagerFactory
-    {
-        private readonly ITcpPortProvider _tcpPortProvider;
-        private readonly IInjector _injector;
-
-        [Inject]  
-        private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider, 
IInjector injector)
-        {
-            _tcpPortProvider = tcpPortProvider;
-            _injector = injector;
-        }
-
-        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int 
port) where T : IWritable
-        {
-#pragma warning disable 618
-// This is the one place allowed to call this constructor. Hence, disabling 
the warning is OK.
-            return new WritableRemoteManager<T>(localAddress, port, 
_tcpPortProvider, _injector);
-#pragma warning disable 618
-        }
-
-        public IRemoteManager<T> GetInstance<T>() where T : IWritable
-        {
-#pragma warning disable 618
-// This is the one place allowed to call this constructor. Hence, disabling 
the warning is OK.
-            return new WritableRemoteManager<T>(_injector);
-#pragma warning disable 618
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
deleted file mode 100644
index b245f0f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Establish connections to TransportServer for remote message passing
-    /// </summary>
-    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableTransportClient<T> : IDisposable where T : IWritable
-    {
-        private readonly ILink<T> _link;
-        private readonly IObserver<TransportEvent<T>> _observer;
-        private readonly CancellationTokenSource _cancellationSource;
-        private bool _disposed;
-        private static readonly Logger Logger = 
Logger.GetLogger(typeof(WritableTransportClient<T>));
-
-        /// <summary>
-        /// Construct a TransportClient.
-        /// Used to send messages to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        public WritableTransportClient(IPEndPoint remoteEndpoint, IInjector 
injector)
-        {
-            Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", 
Logger);
-
-            _link = new WritableLink<T>(remoteEndpoint, injector);
-            _cancellationSource = new CancellationTokenSource();
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Construct a TransportClient.
-        /// Used to send messages to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
-        /// <param name="observer">Callback used when receiving responses from 
remote host</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        public WritableTransportClient(IPEndPoint remoteEndpoint,
-            IObserver<TransportEvent<T>> observer,
-            IInjector injector)
-            : this(remoteEndpoint, injector)
-        {
-            _observer = observer;
-            Task.Run(() => ResponseLoop());
-        }
-
-        /// <summary>
-        /// Gets the underlying transport link.
-        /// </summary>
-        public ILink<T> Link
-        {
-            get { return _link; }
-        }
-
-        /// <summary>
-        /// Send the remote message.
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        public void Send(T message)
-        {
-            if (message == null)
-            {
-                throw new ArgumentNullException("message");
-            }
-
-            _link.Write(message);
-        }
-
-        /// <summary>
-        /// Close all opened connections
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        protected void Dispose(bool disposing)
-        {
-            if (!_disposed && disposing)
-            {
-                _link.Dispose();
-                _disposed = true;
-            }
-        }
-
-        /// <summary>
-        /// Continually read responses from remote host
-        /// </summary>
-        private async Task ResponseLoop()
-        {
-            while (!_cancellationSource.IsCancellationRequested)
-            {
-                T message = await _link.ReadAsync(_cancellationSource.Token);
-                if (message == null)
-                {
-                    break;
-                }
-
-                TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, _link);
-                _observer.OnNext(transportEvent);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
deleted file mode 100644
index 6b5961f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Server to handle incoming remote messages.
-    /// </summary>
-    /// <typeparam name="T">Generic Type of message. It is constrained to have 
implemented IWritable and IType interface</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableTransportServer<T> : IDisposable where T : IWritable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(TransportServer<>));
-
-        private TcpListener _listener;
-        private readonly CancellationTokenSource _cancellationSource;
-        private readonly IObserver<TransportEvent<T>> _remoteObserver;
-        private readonly ITcpPortProvider _tcpPortProvider;
-        private readonly IInjector _injector;
-        private bool _disposed;
-        private Task _serverTask;
-        /// <summary>
-        /// Constructs a TransportServer to listen for remote events.  
-        /// Listens on the specified remote endpoint.  When it recieves a 
remote
-        /// event, it will envoke the specified remote handler.
-        /// </summary>
-        /// <param name="port">Port to listen on</param>
-        /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
-        /// remote messages</param>
-        /// <param name="tcpPortProvider">Find port numbers if listenport is 
0</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        public WritableTransportServer(int port, IObserver<TransportEvent<T>> 
remoteHandler, ITcpPortProvider tcpPortProvider, IInjector injector)
-            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), 
remoteHandler, tcpPortProvider, injector)
-        {
-        }
-
-        /// <summary>
-        /// Constructs a TransportServer to listen for remote events.  
-        /// Listens on the specified remote endpoint.  When it recieves a 
remote
-        /// event, it will envoke the specified remote handler.
-        /// </summary>
-        /// <param name="localEndpoint">Endpoint to listen on</param>
-        /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
-        /// remote messages</param>
-        /// <param name="tcpPortProvider">Find port numbers if listenport is 
0</param>
-        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
-        public WritableTransportServer(
-            IPEndPoint localEndpoint,
-            IObserver<TransportEvent<T>> remoteHandler,
-            ITcpPortProvider tcpPortProvider,
-            IInjector injector)
-        {
-            _listener = new TcpListener(localEndpoint.Address, 
localEndpoint.Port);
-            _remoteObserver = remoteHandler;
-            _tcpPortProvider = tcpPortProvider;
-            _cancellationSource = new CancellationTokenSource();
-            _cancellationSource.Token.ThrowIfCancellationRequested();
-            _injector = injector;
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Returns the listening endpoint for the TransportServer 
-        /// </summary>
-        public IPEndPoint LocalEndpoint
-        {
-            get { return _listener.LocalEndpoint as IPEndPoint; }
-        }
-
-        /// <summary>
-        /// Starts listening for incoming remote messages.
-        /// </summary>
-        public void Run()
-        {
-            if (LocalEndpoint.Port == 0)
-            {
-                FindAPortAndStartListener();
-            }
-            else
-            {
-                _listener.Start();
-            }
-
-            _serverTask = Task.Run(() => StartServer());
-        }
-
-        private void FindAPortAndStartListener()
-        {
-            var foundAPort = false;
-            var exception = new 
SocketException((int)SocketError.AddressAlreadyInUse);
-            for (var enumerator = _tcpPortProvider.GetEnumerator();
-                !foundAPort && enumerator.MoveNext();
-                )
-            {
-                _listener = new TcpListener(LocalEndpoint.Address, 
enumerator.Current);
-                try
-                {
-                    _listener.Start();
-                    foundAPort = true;
-                }
-                catch (SocketException e)
-                {
-                    exception = e;
-                }
-            }
-            if (!foundAPort)
-            {
-                Exceptions.Throw(exception, "Could not find a port to listen 
on", LOGGER);
-            }
-            LOGGER.Log(Level.Info,
-                String.Format("Listening on {0}", 
_listener.LocalEndpoint.ToString()));
-        }
-
-
-        /// <summary>
-        /// Close the TransportServer and all open connections
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        public void Dispose(bool disposing)
-        {
-            if (!_disposed && disposing)
-            {
-                _cancellationSource.Cancel();
-
-                try
-                {
-                    _listener.Stop();
-                }
-                catch (SocketException)
-                {
-                    LOGGER.Log(Level.Info, "Disposing of transport server 
before listener is created.");
-                }
-
-                if (_serverTask != null)
-                {
-                    _serverTask.Wait();
-
-                    // Give the TransportServer Task 500ms to shut down, 
ignore any timeout errors
-                    try
-                    {
-                        CancellationTokenSource serverDisposeTimeout = new 
CancellationTokenSource(500);
-                        _serverTask.Wait(serverDisposeTimeout.Token);
-                    }
-                    catch (Exception e)
-                    {
-                        Console.Error.WriteLine(e);
-                    }
-                    finally
-                    {
-                        _serverTask.Dispose();
-                    }
-                }
-            }
-
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Helper method to start TransportServer.  This will
-        /// be run in an asynchronous Task.
-        /// </summary>
-        /// <returns>An asynchronous Task for the running server.</returns>
-        private async Task StartServer()
-        {
-            try
-            {
-                while (!_cancellationSource.Token.IsCancellationRequested)
-                {
-                    TcpClient client = await 
_listener.AcceptTcpClientAsync().ConfigureAwait(false);
-                    ProcessClient(client).Forget();
-                }
-            }
-            catch (InvalidOperationException)
-            {
-                LOGGER.Log(Level.Info, "TransportServer has been closed.");
-            }
-            catch (OperationCanceledException)
-            {
-                LOGGER.Log(Level.Info, "TransportServer has been closed.");
-            }
-        }
-
-        /// <summary>
-        /// Recieves event from connected TcpClient and invokes handler on the 
event.
-        /// </summary>
-        /// <param name="client">The connected client</param>
-        private async Task ProcessClient(TcpClient client)
-        {
-            
-            // Keep reading messages from client until they disconnect or 
timeout
-            CancellationToken token = _cancellationSource.Token;
-            using (ILink<T> link = new WritableLink<T>(client, _injector))
-            {
-                while (!token.IsCancellationRequested)
-                {
-                    T message = await link.ReadAsync(token);
-
-                    if (message == null)
-                    {
-                        break;
-                    }
-
-                    TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, link);
-                    _remoteObserver.OnNext(transportEvent);
-                }
-                LOGGER.Log(Level.Error,
-                    "ProcessClient close the Link. IsCancellationRequested: " 
+ token.IsCancellationRequested);
-            }
-        }
-    }
-}
\ No newline at end of file

Reply via email to