Repository: incubator-reef Updated Branches: refs/heads/master 24f95a119 -> 41cbbcbbe
[REEF-259]: Improve the memory efficiency of NetworkService This change introduces `WritableNetworkService` which constraints the `NsMessage` and messages passed to it to be `IWritable` classes. JIRA: [REEF-259](https://issues.apache.org/jira/browse/REEF-259) Pull Request: This closes #178 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/41cbbcbb Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/41cbbcbb Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/41cbbcbb Branch: refs/heads/master Commit: 41cbbcbbe7a9362bac41953a7d0eabe7bf14ee75 Parents: 24f95a1 Author: dkm2110 <[email protected]> Authored: Wed May 6 14:10:57 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri May 15 16:32:11 2015 -0700 ---------------------------------------------------------------------- .../WritableNetworkServiceTests.cs | 262 +++++++++++++++++++ .../NetworkService/WritableString.cs | 94 +++++++ .../Org.Apache.REEF.Network.Tests.csproj | 2 + .../NetworkService/NetworkService.cs | 2 +- .../NetworkService/WritableNetworkService.cs | 161 ++++++++++++ .../NetworkService/WritableNsConnection.cs | 138 ++++++++++ .../NetworkService/WritableNsMessage.cs | 181 +++++++++++++ .../Org.Apache.REEF.Network.csproj | 3 + 8 files changed, 842 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs new file mode 100644 index 0000000..07464ff --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Globalization; +using System.Linq; +using System.Net; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Network.Tests.NamingService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Network.Tests.NetworkService +{ + /// <summary> + /// Tests for Writable Network Service + /// </summary> + [TestClass] + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableNetworkServiceTests + { + /// <summary> + /// Tests one way communication between two network services + /// </summary> + [TestMethod] + public void TestWritableNetworkServiceOneWayCommunication() + { + int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000); + int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000); + + BlockingCollection<WritableString> queue; + + using (var nameServer = NameServerTests.BuildNameServer()) + { + IPEndPoint endpoint = nameServer.LocalEndpoint; + int nameServerPort = endpoint.Port; + string nameServerAddr = endpoint.Address.ToString(); + + var handlerConf1 = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class, + GenericType<NetworkMessageHandler>.Class) + .Build(); + + var handlerConf2 = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class, + GenericType<MessageHandler>.Class) + .Build(); + + var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, + handlerConf1); + + var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, + handlerConf2); + + using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>()) + using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>()) + { + queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue; + IIdentifier id1 = new StringIdentifier("service1"); + IIdentifier id2 = new StringIdentifier("service2"); + networkService1.Register(id1); + networkService2.Register(id2); + + using (IConnection<WritableString> connection = networkService1.NewConnection(id2)) + { + connection.Open(); + connection.Write(new WritableString("abc")); + connection.Write(new WritableString("def")); + connection.Write(new WritableString("ghi")); + + Assert.AreEqual("abc", queue.Take().Data); + Assert.AreEqual("def", queue.Take().Data); + Assert.AreEqual("ghi", queue.Take().Data); + } + } + } + } + + /// <summary> + /// Tests two way communication between two network services + /// </summary> + [TestMethod] + public void TestWritableNetworkServiceTwoWayCommunication() + { + int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000); + int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000); + + BlockingCollection<WritableString> queue1; + BlockingCollection<WritableString> queue2; + + using (var nameServer = NameServerTests.BuildNameServer()) + { + IPEndPoint endpoint = nameServer.LocalEndpoint; + int nameServerPort = endpoint.Port; + string nameServerAddr = endpoint.Address.ToString(); + + var handlerConf = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class, + GenericType<MessageHandler>.Class) + .Build(); + + var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, + handlerConf); + + var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, + handlerConf); + + using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>()) + using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>()) + { + queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue; + queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue; + + IIdentifier id1 = new StringIdentifier("service1"); + IIdentifier id2 = new StringIdentifier("service2"); + networkService1.Register(id1); + networkService2.Register(id2); + + using (IConnection<WritableString> connection1 = networkService1.NewConnection(id2)) + using (IConnection<WritableString> connection2 = networkService2.NewConnection(id1)) + { + connection1.Open(); + connection1.Write(new WritableString("abc")); + connection1.Write(new WritableString("def")); + connection1.Write(new WritableString("ghi")); + + connection2.Open(); + connection2.Write(new WritableString("jkl")); + connection2.Write(new WritableString("nop")); + + Assert.AreEqual("abc", queue2.Take().Data); + Assert.AreEqual("def", queue2.Take().Data); + Assert.AreEqual("ghi", queue2.Take().Data); + + Assert.AreEqual("jkl", queue1.Take().Data); + Assert.AreEqual("nop", queue1.Take().Data); + } + } + } + } + + /// <summary> + /// Creates an instance of network service. + /// </summary> + /// <param name="networkServicePort">The port that the NetworkService will listen on</param> + /// <param name="nameServicePort">The port of the NameServer</param> + /// <param name="nameServiceAddr">The ip address of the NameServer</param> + /// <param name="factory">Identifier factory for WritableString</param> + /// <param name="handler">The observer to handle incoming messages</param> + /// <returns></returns> + private IInjector BuildNetworkService( + int networkServicePort, + int nameServicePort, + string nameServiceAddr, + IConfiguration handlerConf) + { + var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf) + .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>( + GenericType<NetworkServiceOptions.NetworkServicePort>.Class, + networkServicePort.ToString(CultureInfo.CurrentCulture)) + .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>( + GenericType<NamingConfigurationOptions.NameServerPort>.Class, + nameServicePort.ToString(CultureInfo.CurrentCulture)) + .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( + GenericType<NamingConfigurationOptions.NameServerAddress>.Class, + nameServiceAddr) + .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class) + .Build(); + + return TangFactory.GetTang().NewInjector(networkServiceConf); + } + + /// <summary> + /// The observer to handle incoming messages for WritableString + /// </summary> + private class MessageHandler : IObserver<WritableNsMessage<WritableString>> + { + private readonly BlockingCollection<WritableString> _queue; + + public BlockingCollection<WritableString> Queue + { + get { return _queue; } + } + + [Inject] + private MessageHandler() + { + _queue = new BlockingCollection<WritableString>(); + } + + public void OnNext(WritableNsMessage<WritableString> value) + { + _queue.Add(value.Data.First()); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// The network handler to handle incoming Writable NsMessages + /// </summary> + private class NetworkMessageHandler : IObserver<WritableNsMessage<WritableString>> + { + [Inject] + public NetworkMessageHandler() + { + } + + public void OnNext(WritableNsMessage<WritableString> value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs new file mode 100644 index 0000000..400aa52 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Tests.NetworkService +{ + /// <summary> + /// Writable wrapper around the string class + /// </summary> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableString : IWritable + { + /// <summary> + /// Returns the actual string data + /// </summary> + public string Data { get; set; } + + /// <summary> + /// Empty constructor for instantiation with reflection + /// </summary> + [Inject] + public WritableString() + { + } + + /// <summary> + /// Constructor + /// </summary> + /// <param name="data">The string data</param> + public WritableString(string data) + { + Data = data; + } + + /// <summary> + /// Reads the string + /// </summary> + /// <param name="reader">reader to read from</param> + public void Read(IDataReader reader) + { + Data = reader.ReadString(); + } + + /// <summary> + /// Writes the string + /// </summary> + /// <param name="writer">Writer to write</param> + public void Write(IDataWriter writer) + { + writer.WriteString(Data); + } + + /// <summary> + /// Reads the string + /// </summary> + /// <param name="reader">reader to read from</param> + /// <param name="token">the cancellation token</param> + public async Task ReadAsync(IDataReader reader, CancellationToken token) + { + Data = await reader.ReadStringAsync(token); + } + + /// <summary> + /// Writes the string + /// </summary> + /// <param name="writer">Writer to write</param> + /// <param name="token">the cancellation token</param> + public async Task WriteAsync(IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(Data, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj index 26dd98a..cb2019b 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj @@ -53,7 +53,9 @@ under the License. <Compile Include="GroupCommunication\GroupCommunicationTests.cs" /> <Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" /> <Compile Include="NamingService\NameServerTests.cs" /> + <Compile Include="NetworkService\WritableNetworkServiceTests.cs" /> <Compile Include="NetworkService\NetworkServiceTests.cs" /> + <Compile Include="NetworkService\WritableString.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs index 1b796a3..90f1e0f 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs @@ -49,7 +49,7 @@ namespace Org.Apache.REEF.Network.NetworkService private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; /// <summary> - /// Create a new NetworkFactory. + /// Create a new NetworkService. /// </summary> /// <param name="nsPort">The port that the NetworkService will listen on</param> /// <param name="nameServerAddr">The address of the NameServer</param> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs new file mode 100644 index 0000000..834ced4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Net; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Network.NetworkService.Codec; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Impl; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Writable Network service used for Reef Task communication. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableNetworkService<T> : INetworkService<T> where T : IWritable + { + private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkService<>)); + + private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager; + private readonly IObserver<WritableNsMessage<T>> _messageHandler; + private IIdentifier _localIdentifier; + private IDisposable _messageHandlerDisposable; + private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; + private readonly INameClient _nameClient; + + /// <summary> + /// Create a new Writable NetworkService. + /// </summary> + /// <param name="nsPort">The port that the NetworkService will listen on</param> + /// <param name="messageHandler">The observer to handle incoming messages</param> + /// <param name="idFactory">The factory used to create IIdentifiers</param> + /// <param name="nameClient">The name client used to register Ids</param> + /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a + /// Writable RemoteManager</param> + [Inject] + private WritableNetworkService( + [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int nsPort, + IObserver<WritableNsMessage<T>> messageHandler, + IIdentifierFactory idFactory, + INameClient nameClient, + WritableRemoteManagerFactory remoteManagerFactory) + { + + IPAddress localAddress = NetworkUtils.LocalIPAddress; + _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort); + _messageHandler = messageHandler; + + _nameClient = nameClient; + _connectionMap = new Dictionary<IIdentifier, IConnection<T>>(); + + Logger.Log(Level.Info, "Started network service"); + } + + /// <summary> + /// Name client for registering ids + /// </summary> + public INameClient NamingClient + { + get { return _nameClient; } + } + + /// <summary> + /// Open a new connection to the remote host registered to + /// the name service with the given identifier + /// </summary> + /// <param name="destinationId">The identifier of the remote host</param> + /// <returns>The IConnection used for communication</returns> + public IConnection<T> NewConnection(IIdentifier destinationId) + { + if (_localIdentifier == null) + { + throw new IllegalStateException("Cannot open connection without first registering an ID"); + } + + IConnection<T> connection; + if (_connectionMap.TryGetValue(destinationId, out connection)) + { + return connection; + } + else + { + connection = new WritableNsConnection<T>(_localIdentifier, destinationId, + NamingClient, _remoteManager, _connectionMap); + + _connectionMap[destinationId] = connection; + return connection; + } + } + + /// <summary> + /// Register the identifier for the NetworkService with the NameService. + /// </summary> + /// <param name="id">The identifier to register</param> + public void Register(IIdentifier id) + { + Logger.Log(Level.Info, "Registering id {0} with network service.", id); + + _localIdentifier = id; + NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint); + + // Create and register incoming message handler + var anyEndpoint = new IPEndPoint(_remoteManager.LocalEndpoint.Address, 0); + _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler); + + Logger.Log(Level.Info, "End of Registering id {0} with network service.", id); + } + + /// <summary> + /// Unregister the identifier for the NetworkService with the NameService. + /// </summary> + public void Unregister() + { + if (_localIdentifier == null) + { + throw new IllegalStateException("Cannot unregister a non existant identifier"); + } + + NamingClient.Unregister(_localIdentifier.ToString()); + _localIdentifier = null; + _messageHandlerDisposable.Dispose(); + } + + /// <summary> + /// Dispose of the NetworkService's resources + /// </summary> + public void Dispose() + { + NamingClient.Dispose(); + _remoteManager.Dispose(); + + Logger.Log(Level.Info, "Disposed of network service"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs new file mode 100644 index 0000000..c20238c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Runtime.Remoting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Represents a connection between two hosts using the Writable NetworkService. + /// </summary> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableNsConnection<T> : IConnection<T> where T : IWritable + { + private static readonly Logger Logger = Logger.GetLogger(typeof (WritableNsConnection<T>)); + + private readonly IIdentifier _sourceId; + private readonly IIdentifier _destId; + private readonly INameClient _nameClient; + private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager; + private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; + private IObserver<WritableNsMessage<T>> _remoteSender; + + /// <summary> + /// Creates a new NsConnection between two hosts. + /// </summary> + /// <param name="sourceId">The identifier of the sender</param> + /// <param name="destId">The identifier of the receiver</param> + /// <param name="nameClient">The NameClient used for naming lookup</param> + /// <param name="remoteManager">The remote manager used for network communication</param> + /// <param name="connectionMap">A cache of opened connections. Will remove itself from + /// the cache when the NsConnection is disposed.</param> + public WritableNsConnection( + IIdentifier sourceId, + IIdentifier destId, + INameClient nameClient, + IRemoteManager<WritableNsMessage<T>> remoteManager, + Dictionary<IIdentifier, IConnection<T>> connectionMap) + { + _sourceId = sourceId; + _destId = destId; + _nameClient = nameClient; + _remoteManager = remoteManager; + _connectionMap = connectionMap; + } + + /// <summary> + /// Opens the connection to the remote host. + /// </summary> + public void Open() + { + string destStr = _destId.ToString(); + Logger.Log(Level.Verbose, "Network service opening connection to {0}...", destStr); + + IPEndPoint destAddr = _nameClient.Lookup(destStr); + if (null == destAddr) + { + throw new RemotingException("Destination Address identifier cannot be found"); + } + + try + { + _remoteSender = _remoteManager.GetRemoteObserver(destAddr); + Logger.Log(Level.Verbose, "Network service completed connection to {0}.", destStr); + } + catch (SocketException) + { + Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr); + throw; + } + catch (ObjectDisposedException) + { + Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr); + throw; + } + } + + /// <summary> + /// Writes the object to the remote host. + /// </summary> + /// <param name="message">The message to send</param> + public void Write(T message) + { + if (_remoteSender == null) + { + throw new IllegalStateException("NsConnection has not been opened yet."); + } + + try + { + _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, _destId, message)); + } + catch (IOException) + { + Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId); + throw; + } + catch (ObjectDisposedException) + { + Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId); + throw; + } + } + + /// <summary> + /// Closes the connection + /// </summary> + public void Dispose() + { + _connectionMap.Remove(_destId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs new file mode 100644 index 0000000..28cd5f9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Hadoop.Avro; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; + + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Writable Message sent between NetworkServices.</summary> + /// <typeparam name="T">The type of data being sent. It is assumed to be Writable</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableNsMessage<T> : IWritable where T : IWritable + { + private IIdentifierFactory _factory; + private IInjector _injection; + /// <summary> + /// Constructor to allow instantiation by reflection + /// </summary> + [Inject] + public WritableNsMessage(IIdentifierFactory factory, IInjector injection) + { + _factory = factory; + _injection = injection; + } + + /// <summary> + /// Create a new Writable NsMessage with no data. + /// </summary> + /// <param name="sourceId">The identifier of the sender</param> + /// <param name="destId">The identifier of the receiver</param> + public WritableNsMessage(IIdentifier sourceId, IIdentifier destId) + { + SourceId = sourceId; + DestId = destId; + Data = new List<T>(); + } + + /// <summary> + /// Create a new Writable NsMessage with data. + /// </summary> + /// <param name="sourceId">The identifier of the sender</param> + /// <param name="destId">The identifier of the receiver</param> + /// <param name="message">The message to send</param> + public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T message) + { + SourceId = sourceId; + DestId = destId; + Data = new List<T> {message}; + } + + /// <summary> + /// The identifier of the sender of the message. + /// </summary> + internal IIdentifier SourceId { get; private set; } + + /// <summary> + /// The identifier of the receiver of the message. + /// </summary> + internal IIdentifier DestId { get; private set; } + + /// <summary> + /// A list of data being sent in the message. + /// </summary> + public IList<T> Data { get; set; } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + public void Read(IDataReader reader) + { + SourceId = _factory.Create(reader.ReadString()); + DestId = _factory.Create(reader.ReadString()); + int messageCount = reader.ReadInt32(); + + Data = new List<T>(); + + for (int index = 0; index < messageCount; index++) + { + var dataPoint = (T)_injection.ForkInjector().GetInstance(typeof(T)); + + if (null == dataPoint) + { + throw new Exception("T type instance cannot be created from the stream data in Network Service Message"); + } + + dataPoint.Read(reader); + Data.Add(dataPoint); + } + } + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + public void Write(IDataWriter writer) + { + writer.WriteString(SourceId.ToString()); + writer.WriteString(DestId.ToString()); + writer.WriteInt32(Data.Count); + + foreach (var data in Data) + { + data.Write(writer); + } + } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + /// <param name="token">The cancellation token</param> + public async Task ReadAsync(IDataReader reader, CancellationToken token) + { + SourceId = _factory.Create(await reader.ReadStringAsync(token)); + DestId = _factory.Create(await reader.ReadStringAsync(token)); + int messageCount = await reader.ReadInt32Async(token); + + Data = new List<T>(); + + for (int index = 0; index < messageCount; index++) + { + var dataPoint = Activator.CreateInstance<T>(); + + if (null == dataPoint) + { + throw new Exception("T type instance cannot be created from the stream data in Network Service Message"); + } + + await dataPoint.ReadAsync(reader, token); + Data.Add(dataPoint); + } + } + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">The cancellation token</param> + public async Task WriteAsync(IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(SourceId.ToString(), token); + await writer.WriteStringAsync(DestId.ToString(), token); + await writer.WriteInt32Async(Data.Count, token); + + foreach (var data in Data) + { + data.Write(writer); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 9a47a69..96f6ee2 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -142,6 +142,9 @@ under the License. <Compile Include="NetworkService\NetworkServiceOptions.cs" /> <Compile Include="NetworkService\NsConnection.cs" /> <Compile Include="NetworkService\NsMessage.cs" /> + <Compile Include="NetworkService\WritableNetworkService.cs" /> + <Compile Include="NetworkService\WritableNsConnection.cs" /> + <Compile Include="NetworkService\WritableNsMessage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> <Compile Include="Utilities\Utils.cs" />
