Repository: incubator-reef Updated Branches: refs/heads/master ec9b497d4 -> ba2653d6b
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs new file mode 100644 index 0000000..6eb0190 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Net; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Manages incoming and outgoing messages between remote hosts. + /// </summary> + /// <typeparam name="T">Message type T.</typeparam> + internal sealed class StreamingRemoteManager<T> : IRemoteManager<T> + { + private readonly ObserverContainer<T> _observerContainer; + private readonly StreamingTransportServer<IRemoteEvent<T>> _server; + private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients; + private readonly IStreamingCodec<IRemoteEvent<T>> _remoteEventCodec; + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and + /// a specific port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="tcpPortProvider">Tcp port provider</param> + /// <param name="streamingCodec">Streaming codec</param> + [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] + public StreamingRemoteManager(IPAddress localAddress, ITcpPortProvider tcpPortProvider, IStreamingCodec<T> streamingCodec) + { + if (localAddress == null) + { + throw new ArgumentNullException("localAddress"); + } + + _observerContainer = new ObserverContainer<T>(); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + _remoteEventCodec = new RemoteEventStreamingCodec<T>(streamingCodec); + + // Begin to listen for incoming messages + _server = new StreamingTransportServer<IRemoteEvent<T>>(localAddress, _observerContainer, tcpPortProvider, _remoteEventCodec); + _server.Run(); + + LocalEndpoint = _server.LocalEndpoint; + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + + /// <summary> + /// Gets the RemoteIdentifier for the DefaultRemoteManager + /// </summary> + public IRemoteIdentifier Identifier { get; private set; } + + /// <summary> + /// Gets the local IPEndPoint for the DefaultRemoteManager + /// </summary> + public IPEndPoint LocalEndpoint { get; private set; } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return GetRemoteObserver(id.Addr); + } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + ProxyObserver remoteObserver; + if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) + { + StreamingTransportClient<IRemoteEvent<T>> client = + new StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, _observerContainer, _remoteEventCodec); + + remoteObserver = new ProxyObserver(client); + _cachedClients[remoteEndpoint] = remoteObserver; + } + + return remoteObserver; + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return RegisterObserver(id.Addr, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(remoteEndpoint, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(observer); + } + + /// <summary> + /// Release all resources for the DefaultRemoteManager. + /// </summary> + public void Dispose() + { + foreach (ProxyObserver cachedClient in _cachedClients.Values) + { + cachedClient.Dispose(); + } + + if (_server != null) + { + _server.Dispose(); + } + } + + /// <summary> + /// Observer to send messages to connected remote host + /// </summary> + private class ProxyObserver : IObserver<T>, IDisposable + { + private readonly StreamingTransportClient<IRemoteEvent<T>> _client; + + /// <summary> + /// Create new ProxyObserver + /// </summary> + /// <param name="client">The connected WritableTransport client used to send + /// messages to remote host</param> + public ProxyObserver(StreamingTransportClient<IRemoteEvent<T>> client) + { + _client = client; + } + + /// <summary> + /// Send the message to the remote host + /// </summary> + /// <param name="message">The message to send</param> + public void OnNext(T message) + { + IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, + _client.Link.RemoteEndpoint, + message); + + _client.Send(remoteEvent); + } + + /// <summary> + /// Close underlying WritableTransport client + /// </summary> + public void Dispose() + { + _client.Dispose(); + } + + public void OnError(Exception error) + { + throw error; + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs new file mode 100644 index 0000000..90e3aca --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Net; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// StreamingRemoteManagerFactory for StreamingRemoteManager. + /// </summary> + public sealed class StreamingRemoteManagerFactory + { + private readonly ITcpPortProvider _tcpPortProvider; + private readonly IInjector _injector; + + [Inject] + private StreamingRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector) + { + _tcpPortProvider = tcpPortProvider; + _injector = injector; + } + + //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447] + public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable + { +#pragma warning disable 618 +// This is the one place allowed to call this constructor. Hence, disabling the warning is OK. + var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>(); + return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec); +#pragma warning disable 618 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs new file mode 100644 index 0000000..111be5d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Establish connections to TransportServer for remote message passing + /// </summary> + /// <typeparam name="T">Generic Type of message.</typeparam> + internal sealed class StreamingTransportClient<T> : IDisposable + { + private readonly ILink<T> _link; + private readonly IObserver<TransportEvent<T>> _observer; + private readonly CancellationTokenSource _cancellationSource; + private bool _disposed; + private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingTransportClient<T>)); + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="streamingCodec">Streaming codec</param> + internal StreamingTransportClient(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec) + { + Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger); + + _link = new StreamingLink<T>(remoteEndpoint, streamingCodec); + _cancellationSource = new CancellationTokenSource(); + _disposed = false; + } + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="observer">Callback used when receiving responses from remote host</param> + /// <param name="streamingCodec">Streaming codec</param> + internal StreamingTransportClient(IPEndPoint remoteEndpoint, + IObserver<TransportEvent<T>> observer, + IStreamingCodec<T> streamingCodec) + : this(remoteEndpoint, streamingCodec) + { + _observer = observer; + Task.Run(() => ResponseLoop()); + } + + /// <summary> + /// Gets the underlying transport link. + /// </summary> + public ILink<T> Link + { + get { return _link; } + } + + /// <summary> + /// Send the remote message. + /// </summary> + /// <param name="message">The message to send</param> + public void Send(T message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + _link.Write(message); + } + + /// <summary> + /// Close all opened connections + /// </summary> + public void Dispose() + { + if (!_disposed) + { + _link.Dispose(); + _disposed = true; + } + } + + /// <summary> + /// Continually read responses from remote host + /// </summary> + private async Task ResponseLoop() + { + while (!_cancellationSource.IsCancellationRequested) + { + T message = await _link.ReadAsync(_cancellationSource.Token); + if (message == null) + { + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); + _observer.OnNext(transportEvent); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs new file mode 100644 index 0000000..3f05c17 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.StreamingCodec; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Server to handle incoming remote messages. + /// </summary> + /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> + internal sealed class StreamingTransportServer<T> : IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>)); + + private TcpListener _listener; + private readonly CancellationTokenSource _cancellationSource; + private readonly IObserver<TransportEvent<T>> _remoteObserver; + private readonly ITcpPortProvider _tcpPortProvider; + private readonly IStreamingCodec<T> _streamingCodec; + private bool _disposed; + private Task _serverTask; + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="address">Endpoint addres to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> + /// <param name="streamingCodec">Streaming codec</param> + internal StreamingTransportServer( + IPAddress address, + IObserver<TransportEvent<T>> remoteHandler, + ITcpPortProvider tcpPortProvider, + IStreamingCodec<T> streamingCodec) + { + _listener = new TcpListener(address, 0); + _remoteObserver = remoteHandler; + _tcpPortProvider = tcpPortProvider; + _cancellationSource = new CancellationTokenSource(); + _cancellationSource.Token.ThrowIfCancellationRequested(); + _streamingCodec = streamingCodec; + _disposed = false; + } + + /// <summary> + /// Returns the listening endpoint for the TransportServer + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _listener.LocalEndpoint as IPEndPoint; } + } + + /// <summary> + /// Starts listening for incoming remote messages. + /// </summary> + public void Run() + { + FindAPortAndStartListener(); + _serverTask = Task.Run(() => StartServer()); + } + + private void FindAPortAndStartListener() + { + var foundAPort = false; + var exception = new SocketException((int)SocketError.AddressAlreadyInUse); + for (var enumerator = _tcpPortProvider.GetEnumerator(); + !foundAPort && enumerator.MoveNext(); + ) + { + _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current); + try + { + _listener.Start(); + foundAPort = true; + } + catch (SocketException e) + { + exception = e; + } + } + if (!foundAPort) + { + Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER); + } + LOGGER.Log(Level.Info, + String.Format("Listening on {0}", _listener.LocalEndpoint.ToString())); + } + + + /// <summary> + /// Close the TransportServer and all open connections + /// </summary> + public void Dispose() + { + if (!_disposed) + { + _cancellationSource.Cancel(); + + try + { + _listener.Stop(); + } + catch (SocketException) + { + LOGGER.Log(Level.Info, "Disposing of transport server before listener is created."); + } + + if (_serverTask != null) + { + _serverTask.Wait(); + + // Give the TransportServer Task 500ms to shut down, ignore any timeout errors + try + { + CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500); + _serverTask.Wait(serverDisposeTimeout.Token); + } + catch (Exception e) + { + Console.Error.WriteLine(e); + } + finally + { + _serverTask.Dispose(); + } + } + } + + _disposed = true; + } + + /// <summary> + /// Helper method to start TransportServer. This will + /// be run in an asynchronous Task. + /// </summary> + /// <returns>An asynchronous Task for the running server.</returns> + private async Task StartServer() + { + try + { + while (!_cancellationSource.Token.IsCancellationRequested) + { + TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); + ProcessClient(client).Forget(); + } + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + catch (OperationCanceledException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + } + + /// <summary> + /// Recieves event from connected TcpClient and invokes handler on the event. + /// </summary> + /// <param name="client">The connected client</param> + private async Task ProcessClient(TcpClient client) + { + // Keep reading messages from client until they disconnect or timeout + CancellationToken token = _cancellationSource.Token; + using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec)) + { + while (!token.IsCancellationRequested) + { + T message = await link.ReadAsync(token); + + if (message == null) + { + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); + _remoteObserver.OnNext(transportEvent); + } + LOGGER.Log(Level.Error, + "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs new file mode 100644 index 0000000..1af9b86 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + // TODO: This class will be removed shortly and is used to reduce the size of pull request. + internal sealed class TemporaryWritableToStreamingCodec<T> : IStreamingCodec<T> where T:IWritable + { + private readonly IInjector _injector; + + [Inject] + public TemporaryWritableToStreamingCodec(IInjector injector) + { + _injector = injector; + } + + public T Read(IDataReader reader) + { + string type = reader.ReadString(); + var value = (T) _injector.ForkInjector().GetInstance(type); + value.Read(reader); + return value; + } + + public void Write(T obj, IDataWriter writer) + { + writer.WriteString(obj.GetType().AssemblyQualifiedName); + obj.Write(writer); + } + + public async Task<T> ReadAsync(IDataReader reader, CancellationToken token) + { + string type = await reader.ReadStringAsync(token); + var value = (T) _injector.ForkInjector().GetInstance(type); + await value.ReadAsync(reader, token); + return value; + } + + public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(obj.GetType().AssemblyQualifiedName, token); + await obj.WriteAsync(writer, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs deleted file mode 100644 index 9859338..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Util; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// Represents an open connection between remote hosts. This class is not thread safe - /// </summary> - /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableLink<T> : ILink<T> where T : IWritable - { - private static readonly Logger Logger = Logger.GetLogger(typeof (WritableLink<T>)); - - private readonly IPEndPoint _localEndpoint; - private bool _disposed; - private readonly NetworkStream _stream; - private readonly IInjector _injector; - - - /// <summary> - /// Stream reader to be passed to IWritable - /// </summary> - private readonly StreamDataReader _reader; - - /// <summary> - /// Stream writer from which to read from IWritable - /// </summary> - private readonly StreamDataWriter _writer; - - /// <summary> - /// Constructs a Link object. - /// Connects to the specified remote endpoint. - /// </summary> - /// <param name="remoteEndpoint">The remote endpoint to connect to</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - public WritableLink(IPEndPoint remoteEndpoint, IInjector injector) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - Client = new TcpClient(); - Client.Connect(remoteEndpoint); - - _stream = Client.GetStream(); - _localEndpoint = GetLocalEndpoint(); - _disposed = false; - _reader = new StreamDataReader(_stream); - _writer = new StreamDataWriter(_stream); - _injector = injector; - } - - /// <summary> - /// Constructs a Link object. - /// Uses the already connected TcpClient. - /// </summary> - /// <param name="client">The already connected client</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - public WritableLink(TcpClient client, IInjector injector) - { - if (client == null) - { - throw new ArgumentNullException("client"); - } - - Client = client; - _stream = Client.GetStream(); - _localEndpoint = GetLocalEndpoint(); - _disposed = false; - _reader = new StreamDataReader(_stream); - _writer = new StreamDataWriter(_stream); - _injector = injector; - } - - /// <summary> - /// Returns the local socket address - /// </summary> - public IPEndPoint LocalEndpoint - { - get { return _localEndpoint; } - } - - /// <summary> - /// Returns the remote socket address - /// </summary> - public IPEndPoint RemoteEndpoint - { - get { return (IPEndPoint) Client.Client.RemoteEndPoint; } - } - - /// <summary> - /// Gets the underlying TcpClient - /// </summary> - public TcpClient Client { get; private set; } - - /// <summary> - /// Writes the message to the remote host - /// </summary> - /// <param name="value">The data to write</param> - public void Write(T value) - { - if (value == null) - { - throw new ArgumentNullException("value"); - } - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger); - } - - _writer.WriteString(value.GetType().AssemblyQualifiedName); - value.Write(_writer); - } - - /// <summary> - /// Writes the value to this link asynchronously - /// </summary> - /// <param name="value">The data to write</param> - /// <param name="token">The cancellation token</param> - public async Task WriteAsync(T value, CancellationToken token) - { - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger); - } - - await _writer.WriteStringAsync(value.GetType().AssemblyQualifiedName, token); - await value.WriteAsync(_writer, token); - } - - /// <summary> - /// Reads the value from the link synchronously - /// </summary> - public T Read() - { - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger); - } - - string dataType = _reader.ReadString(); - - if (dataType == null) - { - return default(T); - } - - try - { - T value = (T) _injector.ForkInjector().GetInstance(dataType); - value.Read(_reader); - return value; - } - catch (InjectionException) - { - return default(T); - } - } - - /// <summary> - /// Reads the value from the link asynchronously - /// </summary> - /// <param name="token">The cancellation token</param> - public async Task<T> ReadAsync(CancellationToken token) - { - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger); - } - - string dataType = ""; - - dataType = await _reader.ReadStringAsync(token); - - if (dataType == null) - { - return default(T); - } - - try - { - T value = (T) _injector.ForkInjector().GetInstance(dataType); - await value.ReadAsync(_reader, token); - return value; - } - catch (InjectionException) - { - return default(T); - } - } - - /// <summary> - /// Close the client connection - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// <summary> - /// Subclasses of Links should overwrite this to handle disposing - /// of the link - /// </summary> - /// <param name="disposing">To dispose or not</param> - public virtual void Dispose(bool disposing) - { - if (_disposed) - { - return; - } - - if (disposing) - { - try - { - Client.GetStream().Close(); - } - catch (InvalidOperationException) - { - Logger.Log(Level.Warning, "failed to close stream on a non-connected socket."); - } - - Client.Close(); - } - _disposed = true; - } - - /// <summary> - /// Overrides Equals. Two Link objects are equal if they are connected - /// to the same remote endpoint. - /// </summary> - /// <param name="obj">The object to compare</param> - /// <returns>True if the object is equal to this Link, otherwise false</returns> - public override bool Equals(object obj) - { - Link<T> other = obj as Link<T>; - if (other == null) - { - return false; - } - - return other.RemoteEndpoint.Equals(RemoteEndpoint); - } - - /// <summary> - /// Gets the hash code for the Link object. - /// </summary> - /// <returns>The object's hash code</returns> - public override int GetHashCode() - { - return RemoteEndpoint.GetHashCode(); - } - - /// <summary> - /// Discovers the IPEndpoint for the current machine. - /// </summary> - /// <returns>The local IPEndpoint</returns> - private IPEndPoint GetLocalEndpoint() - { - IPAddress address = NetworkUtils.LocalIPAddress; - int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port; - return new IPEndPoint(address, port); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs deleted file mode 100644 index 9790e3e..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Net; -using Org.Apache.REEF.Wake.Util; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// Stores registered IObservers for DefaultRemoteManager. - /// Can register and look up IObservers by remote IPEndPoint. - /// </summary> - /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - internal class WritableObserverContainer<T> : IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable - { - private static readonly Logger Logger = Logger.GetLogger(typeof(WritableObserverContainer<>)); - private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap; - private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap; - private IObserver<T> _universalObserver; - - /// <summary> - /// Constructs a new ObserverContainer used to manage remote IObservers. - /// </summary> - public WritableObserverContainer() - { - _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer()); - _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>(); - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// </summary> - /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) - { - if (remoteEndpoint.Address.Equals(IPAddress.Any)) - { - _universalObserver = observer; - return Disposable.Create(() => { _universalObserver = null; }); - } - - _endpointMap[remoteEndpoint] = observer; - return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer)); - } - - /// <summary> - /// Registers an IObserver to handle incoming messages from a remote host - /// </summary> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) - { - _typeMap[typeof(T)] = observer; - return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer)); - } - - /// <summary> - /// Look up the IObserver for the registered IPEndPoint or event type - /// and execute the IObserver. - /// </summary> - /// <param name="transportEvent">The incoming remote event</param> - public void OnNext(TransportEvent<IWritableRemoteEvent<T>> transportEvent) - { - IWritableRemoteEvent<T> remoteEvent = transportEvent.Data; - remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint; - remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint; - T value = remoteEvent.Value; - bool handled = false; - - IObserver<T> observer1; - IObserver<IRemoteMessage<T>> observer2; - if (_universalObserver != null) - { - _universalObserver.OnNext(value); - handled = true; - } - if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1)) - { - // IObserver was registered by IPEndpoint - observer1.OnNext(value); - handled = true; - } - else if (_typeMap.TryGetValue(value.GetType(), out observer2)) - { - // IObserver was registered by event type - IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint); - IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value); - observer2.OnNext(remoteMessage); - handled = true; - } - - if (!handled) - { - throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message"); - } - } - - public void OnError(Exception error) - { - throw error; - } - - public void OnCompleted() - { - Logger.Log(Level.Info, "Exiting the Writable Observer Container"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs deleted file mode 100644 index b3664d0..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Net; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// Writable remote event class - /// </summary> - /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable - { - private readonly IInjector _injector; - - /// <summary> - /// Creates the Remote Event - /// </summary> - /// <param name="localEndpoint">Local Address</param> - /// <param name="remoteEndpoint">Remote Address</param> - /// <param name="value">Actual message</param> - public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value) - { - LocalEndPoint = localEndpoint; - RemoteEndPoint = remoteEndpoint; - Value = value; - } - - /// <summary> - /// Creates empty Remote Event - /// </summary> - [Inject] - public WritableRemoteEvent(IInjector injector) - { - _injector = injector; - } - - /// <summary> - /// Local Address - /// </summary> - public IPEndPoint LocalEndPoint { get; set; } - - /// <summary> - /// Remote Address - /// </summary> - public IPEndPoint RemoteEndPoint { get; set; } - - /// <summary> - /// The actual message - /// </summary> - public T Value { get; set; } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - public void Read(IDataReader reader) - { - Value = (T)_injector.ForkInjector().GetInstance(typeof(T)); - Value.Read(reader); - } - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - public void Write(IDataWriter writer) - { - Value.Write(writer); - } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - /// <param name="token">The cancellation token</param> - public async Task ReadAsync(IDataReader reader, CancellationToken token) - { - Value = (T)_injector.ForkInjector().GetInstance(typeof(T)); - await Value.ReadAsync(reader, token); - } - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">The cancellation token</param> - public async Task WriteAsync(IDataWriter writer, CancellationToken token) - { - await Value.WriteAsync(writer, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs deleted file mode 100644 index 73d8bb6..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Net; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Util; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// Manages incoming and outgoing messages between remote hosts. - /// </summary> - /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : IWritable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof (WritableRemoteManager<T>)); - - private readonly WritableObserverContainer<T> _observerContainer; - private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server; - private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients; - private readonly IInjector _injector; - - /// <summary> - /// Constructs a DefaultRemoteManager listening on the specified address and - /// a specific port. - /// </summary> - /// <param name="localAddress">The address to listen on</param> - /// <param name="port">The port to listen on</param> - /// <param name="tcpPortProvider">Tcp port provider</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider, IInjector injector) - { - if (localAddress == null) - { - throw new ArgumentNullException("localAddress"); - } - if (port < 0) - { - throw new ArgumentException("Listening port must be greater than or equal to zero"); - } - - _observerContainer = new WritableObserverContainer<T>(); - _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); - _injector = injector; - - IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); - - // Begin to listen for incoming messages - _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider, injector); - _server.Run(); - - LocalEndpoint = _server.LocalEndpoint; - Identifier = new SocketRemoteIdentifier(LocalEndpoint); - } - - /// <summary> - /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. - /// </summary> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public WritableRemoteManager(IInjector injector) - { - using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager")) - { - _observerContainer = new WritableObserverContainer<T>(); - _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); - _injector = injector; - - LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0); - Identifier = new SocketRemoteIdentifier(LocalEndpoint); - } - } - - /// <summary> - /// Gets the RemoteIdentifier for the DefaultRemoteManager - /// </summary> - public IRemoteIdentifier Identifier { get; private set; } - - /// <summary> - /// Gets the local IPEndPoint for the DefaultRemoteManager - /// </summary> - public IPEndPoint LocalEndpoint { get; private set; } - - /// <summary> - /// Returns an IObserver used to send messages to the remote host at - /// the specified IPEndpoint. - /// </summary> - /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> - /// <returns>An IObserver used to send messages to the remote host</returns> - public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; - if (id == null) - { - throw new ArgumentException("ID not supported"); - } - - return GetRemoteObserver(id.Addr); - } - - /// <summary> - /// Returns an IObserver used to send messages to the remote host at - /// the specified IPEndpoint. - /// </summary> - /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> - /// <returns>An IObserver used to send messages to the remote host</returns> - public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - ProxyObserver remoteObserver; - if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) - { - WritableTransportClient<IWritableRemoteEvent<T>> client = - new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer, _injector); - - remoteObserver = new ProxyObserver(client); - _cachedClients[remoteEndpoint] = remoteObserver; - } - - return remoteObserver; - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// The IDisposable that is returned can be used to unregister the IObserver. - /// </summary> - /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - if (observer == null) - { - throw new ArgumentNullException("observer"); - } - - SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; - if (id == null) - { - throw new ArgumentException("ID not supported"); - } - - return RegisterObserver(id.Addr, observer); - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// The IDisposable that is returned can be used to unregister the IObserver. - /// </summary> - /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - if (observer == null) - { - throw new ArgumentNullException("observer"); - } - - return _observerContainer.RegisterObserver(remoteEndpoint, observer); - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// The IDisposable that is returned can be used to unregister the IObserver. - /// </summary> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) - { - if (observer == null) - { - throw new ArgumentNullException("observer"); - } - - return _observerContainer.RegisterObserver(observer); - } - - /// <summary> - /// Release all resources for the DefaultRemoteManager. - /// </summary> - public void Dispose() - { - foreach (ProxyObserver cachedClient in _cachedClients.Values) - { - cachedClient.Dispose(); - } - - if (_server != null) - { - _server.Dispose(); - } - } - - /// <summary> - /// Observer to send messages to connected remote host - /// </summary> - private class ProxyObserver : IObserver<T>, IDisposable - { - private readonly WritableTransportClient<IWritableRemoteEvent<T>> _client; - - /// <summary> - /// Create new ProxyObserver - /// </summary> - /// <param name="client">The connected WritableTransport client used to send - /// messages to remote host</param> - public ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client) - { - _client = client; - } - - /// <summary> - /// Send the message to the remote host - /// </summary> - /// <param name="message">The message to send</param> - public void OnNext(T message) - { - IWritableRemoteEvent<T> remoteEvent = new WritableRemoteEvent<T>(_client.Link.LocalEndpoint, - _client.Link.RemoteEndpoint, - message); - - _client.Send(remoteEvent); - } - - /// <summary> - /// Close underlying WritableTransport client - /// </summary> - public void Dispose() - { - _client.Dispose(); - } - - public void OnError(Exception error) - { - throw error; - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs deleted file mode 100644 index 52fef8d..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Net; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// WritableRemoteManagerFactory for WritableRemoteManager. - /// </summary> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public sealed class WritableRemoteManagerFactory - { - private readonly ITcpPortProvider _tcpPortProvider; - private readonly IInjector _injector; - - [Inject] - private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector) - { - _tcpPortProvider = tcpPortProvider; - _injector = injector; - } - - public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable - { -#pragma warning disable 618 -// This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider, _injector); -#pragma warning disable 618 - } - - public IRemoteManager<T> GetInstance<T>() where T : IWritable - { -#pragma warning disable 618 -// This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new WritableRemoteManager<T>(_injector); -#pragma warning disable 618 - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs deleted file mode 100644 index b245f0f..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Net; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// Establish connections to TransportServer for remote message passing - /// </summary> - /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableTransportClient<T> : IDisposable where T : IWritable - { - private readonly ILink<T> _link; - private readonly IObserver<TransportEvent<T>> _observer; - private readonly CancellationTokenSource _cancellationSource; - private bool _disposed; - private static readonly Logger Logger = Logger.GetLogger(typeof(WritableTransportClient<T>)); - - /// <summary> - /// Construct a TransportClient. - /// Used to send messages to the specified remote endpoint. - /// </summary> - /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - public WritableTransportClient(IPEndPoint remoteEndpoint, IInjector injector) - { - Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger); - - _link = new WritableLink<T>(remoteEndpoint, injector); - _cancellationSource = new CancellationTokenSource(); - _disposed = false; - } - - /// <summary> - /// Construct a TransportClient. - /// Used to send messages to the specified remote endpoint. - /// </summary> - /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> - /// <param name="observer">Callback used when receiving responses from remote host</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - public WritableTransportClient(IPEndPoint remoteEndpoint, - IObserver<TransportEvent<T>> observer, - IInjector injector) - : this(remoteEndpoint, injector) - { - _observer = observer; - Task.Run(() => ResponseLoop()); - } - - /// <summary> - /// Gets the underlying transport link. - /// </summary> - public ILink<T> Link - { - get { return _link; } - } - - /// <summary> - /// Send the remote message. - /// </summary> - /// <param name="message">The message to send</param> - public void Send(T message) - { - if (message == null) - { - throw new ArgumentNullException("message"); - } - - _link.Write(message); - } - - /// <summary> - /// Close all opened connections - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected void Dispose(bool disposing) - { - if (!_disposed && disposing) - { - _link.Dispose(); - _disposed = true; - } - } - - /// <summary> - /// Continually read responses from remote host - /// </summary> - private async Task ResponseLoop() - { - while (!_cancellationSource.IsCancellationRequested) - { - T message = await _link.ReadAsync(_cancellationSource.Token); - if (message == null) - { - break; - } - - TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); - _observer.OnNext(transportEvent); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs deleted file mode 100644 index 6b5961f..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.IO; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Util; - -namespace Org.Apache.REEF.Wake.Remote.Impl -{ - /// <summary> - /// Server to handle incoming remote messages. - /// </summary> - /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableTransportServer<T> : IDisposable where T : IWritable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>)); - - private TcpListener _listener; - private readonly CancellationTokenSource _cancellationSource; - private readonly IObserver<TransportEvent<T>> _remoteObserver; - private readonly ITcpPortProvider _tcpPortProvider; - private readonly IInjector _injector; - private bool _disposed; - private Task _serverTask; - /// <summary> - /// Constructs a TransportServer to listen for remote events. - /// Listens on the specified remote endpoint. When it recieves a remote - /// event, it will envoke the specified remote handler. - /// </summary> - /// <param name="port">Port to listen on</param> - /// <param name="remoteHandler">The handler to invoke when receiving incoming - /// remote messages</param> - /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider, IInjector injector) - : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider, injector) - { - } - - /// <summary> - /// Constructs a TransportServer to listen for remote events. - /// Listens on the specified remote endpoint. When it recieves a remote - /// event, it will envoke the specified remote handler. - /// </summary> - /// <param name="localEndpoint">Endpoint to listen on</param> - /// <param name="remoteHandler">The handler to invoke when receiving incoming - /// remote messages</param> - /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> - /// <param name="injector">The injector to pass arguments to incoming messages</param> - public WritableTransportServer( - IPEndPoint localEndpoint, - IObserver<TransportEvent<T>> remoteHandler, - ITcpPortProvider tcpPortProvider, - IInjector injector) - { - _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); - _remoteObserver = remoteHandler; - _tcpPortProvider = tcpPortProvider; - _cancellationSource = new CancellationTokenSource(); - _cancellationSource.Token.ThrowIfCancellationRequested(); - _injector = injector; - _disposed = false; - } - - /// <summary> - /// Returns the listening endpoint for the TransportServer - /// </summary> - public IPEndPoint LocalEndpoint - { - get { return _listener.LocalEndpoint as IPEndPoint; } - } - - /// <summary> - /// Starts listening for incoming remote messages. - /// </summary> - public void Run() - { - if (LocalEndpoint.Port == 0) - { - FindAPortAndStartListener(); - } - else - { - _listener.Start(); - } - - _serverTask = Task.Run(() => StartServer()); - } - - private void FindAPortAndStartListener() - { - var foundAPort = false; - var exception = new SocketException((int)SocketError.AddressAlreadyInUse); - for (var enumerator = _tcpPortProvider.GetEnumerator(); - !foundAPort && enumerator.MoveNext(); - ) - { - _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current); - try - { - _listener.Start(); - foundAPort = true; - } - catch (SocketException e) - { - exception = e; - } - } - if (!foundAPort) - { - Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER); - } - LOGGER.Log(Level.Info, - String.Format("Listening on {0}", _listener.LocalEndpoint.ToString())); - } - - - /// <summary> - /// Close the TransportServer and all open connections - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public void Dispose(bool disposing) - { - if (!_disposed && disposing) - { - _cancellationSource.Cancel(); - - try - { - _listener.Stop(); - } - catch (SocketException) - { - LOGGER.Log(Level.Info, "Disposing of transport server before listener is created."); - } - - if (_serverTask != null) - { - _serverTask.Wait(); - - // Give the TransportServer Task 500ms to shut down, ignore any timeout errors - try - { - CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500); - _serverTask.Wait(serverDisposeTimeout.Token); - } - catch (Exception e) - { - Console.Error.WriteLine(e); - } - finally - { - _serverTask.Dispose(); - } - } - } - - _disposed = true; - } - - /// <summary> - /// Helper method to start TransportServer. This will - /// be run in an asynchronous Task. - /// </summary> - /// <returns>An asynchronous Task for the running server.</returns> - private async Task StartServer() - { - try - { - while (!_cancellationSource.Token.IsCancellationRequested) - { - TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); - ProcessClient(client).Forget(); - } - } - catch (InvalidOperationException) - { - LOGGER.Log(Level.Info, "TransportServer has been closed."); - } - catch (OperationCanceledException) - { - LOGGER.Log(Level.Info, "TransportServer has been closed."); - } - } - - /// <summary> - /// Recieves event from connected TcpClient and invokes handler on the event. - /// </summary> - /// <param name="client">The connected client</param> - private async Task ProcessClient(TcpClient client) - { - - // Keep reading messages from client until they disconnect or timeout - CancellationToken token = _cancellationSource.Token; - using (ILink<T> link = new WritableLink<T>(client, _injector)) - { - while (!token.IsCancellationRequested) - { - T message = await link.ReadAsync(token); - - if (message == null) - { - break; - } - - TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); - _remoteObserver.OnNext(transportEvent); - } - LOGGER.Log(Level.Error, - "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested); - } - } - } -} \ No newline at end of file
