Repository: incubator-reef
Updated Branches:
  refs/heads/master 24f95a119 -> 41cbbcbbe


[REEF-259]: Improve the memory efficiency of NetworkService

This change introduces `WritableNetworkService` which constraints the
`NsMessage` and messages passed to it to be `IWritable` classes.

JIRA:
  [REEF-259](https://issues.apache.org/jira/browse/REEF-259)

Pull Request:
  This closes #178


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/41cbbcbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/41cbbcbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/41cbbcbb

Branch: refs/heads/master
Commit: 41cbbcbbe7a9362bac41953a7d0eabe7bf14ee75
Parents: 24f95a1
Author: dkm2110 <[email protected]>
Authored: Wed May 6 14:10:57 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri May 15 16:32:11 2015 -0700

----------------------------------------------------------------------
 .../WritableNetworkServiceTests.cs              | 262 +++++++++++++++++++
 .../NetworkService/WritableString.cs            |  94 +++++++
 .../Org.Apache.REEF.Network.Tests.csproj        |   2 +
 .../NetworkService/NetworkService.cs            |   2 +-
 .../NetworkService/WritableNetworkService.cs    | 161 ++++++++++++
 .../NetworkService/WritableNsConnection.cs      | 138 ++++++++++
 .../NetworkService/WritableNsMessage.cs         | 181 +++++++++++++
 .../Org.Apache.REEF.Network.csproj              |   3 +
 8 files changed, 842 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
new file mode 100644
index 0000000..07464ff
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using System.Linq;
+using System.Net;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.Tests.NamingService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.Tests.NetworkService
+{
+    /// <summary>
+    /// Tests for Writable Network Service
+    /// </summary>
+    [TestClass]
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableNetworkServiceTests
+    {
+        /// <summary>
+        /// Tests one way communication between two network services
+        /// </summary>
+        [TestMethod]
+        public void TestWritableNetworkServiceOneWayCommunication()
+        {
+            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 
7000);
+            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 
8000);
+
+            BlockingCollection<WritableString> queue;
+
+            using (var nameServer = NameServerTests.BuildNameServer())
+            {
+                IPEndPoint endpoint = nameServer.LocalEndpoint;
+                int nameServerPort = endpoint.Port;
+                string nameServerAddr = endpoint.Address.ToString();
+
+                var handlerConf1 =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
+                            GenericType<NetworkMessageHandler>.Class)
+                        .Build();
+
+                var handlerConf2 =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
+                            GenericType<MessageHandler>.Class)
+                        .Build();
+
+                var networkServiceInjection1 = 
BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+                    handlerConf1);
+
+                var networkServiceInjection2 = 
BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+                   handlerConf2);
+
+                using (INetworkService<WritableString> networkService1 = 
networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
+                using (INetworkService<WritableString> networkService2 = 
networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
+                {
+                    queue = 
networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+                    IIdentifier id1 = new StringIdentifier("service1");
+                    IIdentifier id2 = new StringIdentifier("service2");
+                    networkService1.Register(id1);
+                    networkService2.Register(id2);
+
+                    using (IConnection<WritableString> connection = 
networkService1.NewConnection(id2))
+                    {
+                        connection.Open();
+                        connection.Write(new WritableString("abc"));
+                        connection.Write(new WritableString("def"));
+                        connection.Write(new WritableString("ghi"));
+
+                        Assert.AreEqual("abc", queue.Take().Data);
+                        Assert.AreEqual("def", queue.Take().Data);
+                        Assert.AreEqual("ghi", queue.Take().Data);
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests two way communication between two network services
+        /// </summary>
+        [TestMethod]
+        public void TestWritableNetworkServiceTwoWayCommunication()
+        {
+            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 
7000);
+            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 
8000);
+
+            BlockingCollection<WritableString> queue1;
+            BlockingCollection<WritableString> queue2;
+
+            using (var nameServer = NameServerTests.BuildNameServer())
+            {
+                IPEndPoint endpoint = nameServer.LocalEndpoint;
+                int nameServerPort = endpoint.Port;
+                string nameServerAddr = endpoint.Address.ToString();
+
+                var handlerConf =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
+                            GenericType<MessageHandler>.Class)
+                        .Build();
+
+                var networkServiceInjection1 = 
BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+                    handlerConf);
+
+                var networkServiceInjection2 = 
BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+                   handlerConf);
+
+                using (INetworkService<WritableString> networkService1 = 
networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
+                using (INetworkService<WritableString> networkService2 = 
networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
+                {
+                    queue1 = 
networkServiceInjection1.GetInstance<MessageHandler>().Queue;
+                    queue2 = 
networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+
+                    IIdentifier id1 = new StringIdentifier("service1");
+                    IIdentifier id2 = new StringIdentifier("service2");
+                    networkService1.Register(id1);
+                    networkService2.Register(id2);
+
+                    using (IConnection<WritableString> connection1 = 
networkService1.NewConnection(id2))
+                    using (IConnection<WritableString> connection2 = 
networkService2.NewConnection(id1))
+                    {
+                        connection1.Open();
+                        connection1.Write(new WritableString("abc"));
+                        connection1.Write(new WritableString("def"));
+                        connection1.Write(new WritableString("ghi"));
+
+                        connection2.Open();
+                        connection2.Write(new WritableString("jkl"));
+                        connection2.Write(new WritableString("nop"));
+
+                        Assert.AreEqual("abc", queue2.Take().Data);
+                        Assert.AreEqual("def", queue2.Take().Data);
+                        Assert.AreEqual("ghi", queue2.Take().Data);
+
+                        Assert.AreEqual("jkl", queue1.Take().Data);
+                        Assert.AreEqual("nop", queue1.Take().Data);
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Creates an instance of network service.
+        /// </summary>
+        /// <param name="networkServicePort">The port that the NetworkService 
will listen on</param>
+        /// <param name="nameServicePort">The port of the NameServer</param>
+        /// <param name="nameServiceAddr">The ip address of the 
NameServer</param>
+        /// <param name="factory">Identifier factory for WritableString</param>
+        /// <param name="handler">The observer to handle incoming 
messages</param>
+        /// <returns></returns>
+        private IInjector BuildNetworkService(
+            int networkServicePort,
+            int nameServicePort,
+            string nameServiceAddr,
+            IConfiguration handlerConf)
+        {
+            var networkServiceConf = 
TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
+                .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, 
int>(
+                    
GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
+                    networkServicePort.ToString(CultureInfo.CurrentCulture))
+                .BindNamedParameter<NamingConfigurationOptions.NameServerPort, 
int>(
+                    
GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+                    nameServicePort.ToString(CultureInfo.CurrentCulture))
+                
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+                    
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+                    nameServiceAddr)
+                .BindImplementation(GenericType<INameClient>.Class, 
GenericType<NameClient>.Class)
+                .Build();
+
+            return TangFactory.GetTang().NewInjector(networkServiceConf);
+        }
+
+        /// <summary>
+        /// The observer to handle incoming messages for WritableString
+        /// </summary>
+        private class MessageHandler : 
IObserver<WritableNsMessage<WritableString>>
+        {
+            private readonly BlockingCollection<WritableString> _queue;
+
+            public BlockingCollection<WritableString> Queue
+            {
+                get { return _queue; }
+            } 
+
+            [Inject]
+            private MessageHandler()
+            {
+                _queue = new BlockingCollection<WritableString>();
+            }
+
+            public void OnNext(WritableNsMessage<WritableString> value)
+            {
+                _queue.Add(value.Data.First());
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// The network handler to handle incoming Writable NsMessages
+        /// </summary>
+        private class NetworkMessageHandler : 
IObserver<WritableNsMessage<WritableString>>
+        {
+            [Inject]
+            public NetworkMessageHandler()
+            {
+            }
+
+            public void OnNext(WritableNsMessage<WritableString> value)
+            {
+            }
+
+            public void OnError(Exception error)
+            {
+            }
+
+            public void OnCompleted()
+            {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
new file mode 100644
index 0000000..400aa52
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Tests.NetworkService
+{
+    /// <summary>
+    /// Writable wrapper around the string class
+    /// </summary>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableString : IWritable
+    {
+        /// <summary>
+        /// Returns the actual string data
+        /// </summary>
+        public string Data { get; set; }
+        
+        /// <summary>
+        /// Empty constructor for instantiation with reflection
+        /// </summary>
+        [Inject]
+        public WritableString()
+        {
+        }
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="data">The string data</param>
+        public WritableString(string data)
+        {
+            Data = data;
+        }
+
+        /// <summary>
+        /// Reads the string
+        /// </summary>
+        /// <param name="reader">reader to read from</param>
+        public void Read(IDataReader reader)
+        {
+            Data = reader.ReadString();
+        }
+
+        /// <summary>
+        /// Writes the string
+        /// </summary>
+        /// <param name="writer">Writer to write</param>
+        public void Write(IDataWriter writer)
+        {
+            writer.WriteString(Data);
+        }
+
+        /// <summary>
+        /// Reads the string
+        /// </summary>
+        /// <param name="reader">reader to read from</param>
+        /// <param name="token">the cancellation token</param>
+        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            Data = await reader.ReadStringAsync(token);
+        }
+
+        /// <summary>
+        /// Writes the string
+        /// </summary>
+        /// <param name="writer">Writer to write</param>
+        /// <param name="token">the cancellation token</param>
+        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
+        {
+            await writer.WriteStringAsync(Data, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index 26dd98a..cb2019b 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -53,7 +53,9 @@ under the License.
     <Compile Include="GroupCommunication\GroupCommunicationTests.cs" />
     <Compile 
Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
     <Compile Include="NamingService\NameServerTests.cs" />
+    <Compile Include="NetworkService\WritableNetworkServiceTests.cs" />
     <Compile Include="NetworkService\NetworkServiceTests.cs" />
+    <Compile Include="NetworkService\WritableString.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
index 1b796a3..90f1e0f 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
@@ -49,7 +49,7 @@ namespace Org.Apache.REEF.Network.NetworkService
         private readonly Dictionary<IIdentifier, IConnection<T>> 
_connectionMap;  
 
         /// <summary>
-        /// Create a new NetworkFactory.
+        /// Create a new NetworkService.
         /// </summary>
         /// <param name="nsPort">The port that the NetworkService will listen 
on</param>
         /// <param name="nameServerAddr">The address of the NameServer</param>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
new file mode 100644
index 0000000..834ced4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Impl;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+    /// <summary>
+    /// Writable Network service used for Reef Task communication.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableNetworkService<T> : INetworkService<T> where T : 
IWritable
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(NetworkService<>));
+
+        private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
+        private readonly IObserver<WritableNsMessage<T>> _messageHandler;
+        private IIdentifier _localIdentifier;
+        private IDisposable _messageHandlerDisposable;
+        private readonly Dictionary<IIdentifier, IConnection<T>> 
_connectionMap;
+        private readonly INameClient _nameClient;
+
+        /// <summary>
+        /// Create a new Writable NetworkService.
+        /// </summary>
+        /// <param name="nsPort">The port that the NetworkService will listen 
on</param>
+        /// <param name="messageHandler">The observer to handle incoming 
messages</param>
+        /// <param name="idFactory">The factory used to create 
IIdentifiers</param>
+        /// <param name="nameClient">The name client used to register 
Ids</param>
+        /// <param name="remoteManagerFactory">Writable RemoteManagerFactory 
to create a 
+        /// Writable RemoteManager</param>
+        [Inject]
+        private WritableNetworkService(
+            [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int 
nsPort,
+            IObserver<WritableNsMessage<T>> messageHandler,
+            IIdentifierFactory idFactory,
+            INameClient nameClient,
+            WritableRemoteManagerFactory remoteManagerFactory)
+        {
+ 
+            IPAddress localAddress = NetworkUtils.LocalIPAddress;
+            _remoteManager = 
remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort);
+            _messageHandler = messageHandler;
+
+            _nameClient = nameClient;
+            _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+            Logger.Log(Level.Info, "Started network service");
+        }
+
+        /// <summary>
+        /// Name client for registering ids
+        /// </summary>
+        public INameClient NamingClient
+        {
+            get { return _nameClient; }
+        }
+
+        /// <summary>
+        /// Open a new connection to the remote host registered to
+        /// the name service with the given identifier
+        /// </summary>
+        /// <param name="destinationId">The identifier of the remote 
host</param>
+        /// <returns>The IConnection used for communication</returns>
+        public IConnection<T> NewConnection(IIdentifier destinationId)
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot open connection 
without first registering an ID");
+            }
+
+            IConnection<T> connection;
+            if (_connectionMap.TryGetValue(destinationId, out connection))
+            {
+                return connection;
+            }
+            else
+            {
+                connection = new WritableNsConnection<T>(_localIdentifier, 
destinationId,
+                    NamingClient, _remoteManager, _connectionMap);
+
+                _connectionMap[destinationId] = connection;
+                return connection;
+            }
+        }
+
+        /// <summary>
+        /// Register the identifier for the NetworkService with the 
NameService.
+        /// </summary>
+        /// <param name="id">The identifier to register</param>
+        public void Register(IIdentifier id)
+        {
+            Logger.Log(Level.Info, "Registering id {0} with network service.", 
id);
+
+            _localIdentifier = id;
+            NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+            // Create and register incoming message handler
+            var anyEndpoint = new 
IPEndPoint(_remoteManager.LocalEndpoint.Address, 0);
+            _messageHandlerDisposable = 
_remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
+
+            Logger.Log(Level.Info, "End of Registering id {0} with network 
service.", id);
+        }
+
+        /// <summary>
+        /// Unregister the identifier for the NetworkService with the 
NameService.
+        /// </summary>
+        public void Unregister()
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot unregister a non 
existant identifier");
+            }
+
+            NamingClient.Unregister(_localIdentifier.ToString());
+            _localIdentifier = null;
+            _messageHandlerDisposable.Dispose();
+        }
+
+        /// <summary>
+        /// Dispose of the NetworkService's resources
+        /// </summary>
+        public void Dispose()
+        {
+            NamingClient.Dispose();
+            _remoteManager.Dispose();
+
+            Logger.Log(Level.Info, "Disposed of network service");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
new file mode 100644
index 0000000..c20238c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+    /// <summary>
+    /// Represents a connection between two hosts using the Writable 
NetworkService.
+    /// </summary>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableNsConnection<T> : IConnection<T> where T : IWritable
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(WritableNsConnection<T>));
+
+        private readonly IIdentifier _sourceId;
+        private readonly IIdentifier _destId;
+        private readonly INameClient _nameClient;
+        private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
+        private readonly Dictionary<IIdentifier, IConnection<T>> 
_connectionMap;
+        private IObserver<WritableNsMessage<T>> _remoteSender;
+
+        /// <summary>
+        /// Creates a new NsConnection between two hosts.
+        /// </summary>
+        /// <param name="sourceId">The identifier of the sender</param>
+        /// <param name="destId">The identifier of the receiver</param>
+        /// <param name="nameClient">The NameClient used for naming 
lookup</param>
+        /// <param name="remoteManager">The remote manager used for network 
communication</param>
+        /// <param name="connectionMap">A cache of opened connections.  Will 
remove itself from
+        /// the cache when the NsConnection is disposed.</param>
+        public WritableNsConnection(
+            IIdentifier sourceId,
+            IIdentifier destId,
+            INameClient nameClient,
+            IRemoteManager<WritableNsMessage<T>> remoteManager,
+            Dictionary<IIdentifier, IConnection<T>> connectionMap)
+        {
+            _sourceId = sourceId;
+            _destId = destId;
+            _nameClient = nameClient;
+            _remoteManager = remoteManager;
+            _connectionMap = connectionMap;
+        }
+
+        /// <summary>
+        /// Opens the connection to the remote host.
+        /// </summary>
+        public void Open()
+        {
+            string destStr = _destId.ToString();
+            Logger.Log(Level.Verbose, "Network service opening connection to 
{0}...", destStr);
+
+            IPEndPoint destAddr = _nameClient.Lookup(destStr);
+            if (null == destAddr)
+            {
+                throw new RemotingException("Destination Address identifier 
cannot be found");
+            }
+
+            try
+            {
+                _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
+                Logger.Log(Level.Verbose, "Network service completed 
connection to {0}.", destStr);
+            }
+            catch (SocketException)
+            {
+                Logger.Log(Level.Error, "Network Service cannot open 
connection to " + destAddr);
+                throw;
+            }
+            catch (ObjectDisposedException)
+            {
+                Logger.Log(Level.Error, "Network Service cannot open 
connection to " + destAddr);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Writes the object to the remote host.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Write(T message)
+        {
+            if (_remoteSender == null)
+            {
+                throw new IllegalStateException("NsConnection has not been 
opened yet.");
+            }
+
+            try
+            {
+                _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, 
_destId, message));
+            }
+            catch (IOException)
+            {
+                Logger.Log(Level.Error, "Network Service cannot write message 
to {0}", _destId);
+                throw;
+            }
+            catch (ObjectDisposedException)
+            {
+                Logger.Log(Level.Error, "Network Service cannot write message 
to {0}", _destId);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Closes the connection
+        /// </summary>
+        public void Dispose()
+        {
+            _connectionMap.Remove(_destId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
new file mode 100644
index 0000000..28cd5f9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Hadoop.Avro;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+    /// <summary>
+    /// Writable Message sent between NetworkServices.</summary>
+    /// <typeparam name="T">The type of data being sent. It is assumed to be 
Writable</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class WritableNsMessage<T> : IWritable where T : IWritable
+    {
+        private IIdentifierFactory _factory;
+        private IInjector _injection;
+        /// <summary>
+        /// Constructor to allow instantiation by reflection
+        /// </summary>
+        [Inject]
+        public WritableNsMessage(IIdentifierFactory factory, IInjector 
injection)
+        {
+            _factory = factory;
+            _injection = injection;
+        }
+        
+        /// <summary>
+        /// Create a new Writable NsMessage with no data.
+        /// </summary>
+        /// <param name="sourceId">The identifier of the sender</param>
+        /// <param name="destId">The identifier of the receiver</param>
+        public WritableNsMessage(IIdentifier sourceId, IIdentifier destId)
+        {
+            SourceId = sourceId;
+            DestId = destId;
+            Data = new List<T>();
+        }
+
+        /// <summary>
+        /// Create a new Writable NsMessage with data.
+        /// </summary>
+        /// <param name="sourceId">The identifier of the sender</param>
+        /// <param name="destId">The identifier of the receiver</param>
+        /// <param name="message">The message to send</param>
+        public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T 
message)
+        {
+            SourceId = sourceId;
+            DestId = destId;
+            Data = new List<T> {message};
+        }
+
+        /// <summary>
+        /// The identifier of the sender of the message.
+        /// </summary>
+        internal IIdentifier SourceId { get; private set; }
+
+        /// <summary>
+        /// The identifier of the receiver of the message.
+        /// </summary>
+        internal IIdentifier DestId { get; private set; }
+
+        /// <summary>
+        /// A list of data being sent in the message.
+        /// </summary>
+        public IList<T> Data { get; set; }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        public void Read(IDataReader reader)
+        {
+            SourceId = _factory.Create(reader.ReadString());
+            DestId = _factory.Create(reader.ReadString());
+            int messageCount = reader.ReadInt32();
+
+            Data = new List<T>();
+
+            for (int index = 0; index < messageCount; index++)
+            {
+                var dataPoint = 
(T)_injection.ForkInjector().GetInstance(typeof(T));
+
+                if (null == dataPoint)
+                {
+                    throw new Exception("T type instance cannot be created 
from the stream data in Network Service Message");
+                }
+
+                dataPoint.Read(reader);
+                Data.Add(dataPoint);
+            }
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(IDataWriter writer)
+        {
+            writer.WriteString(SourceId.ToString());
+            writer.WriteString(DestId.ToString());
+            writer.WriteInt32(Data.Count);
+
+            foreach (var data in Data)
+            {
+                data.Write(writer);
+            }
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            SourceId = _factory.Create(await reader.ReadStringAsync(token));
+            DestId = _factory.Create(await reader.ReadStringAsync(token));
+            int messageCount = await reader.ReadInt32Async(token);
+
+            Data = new List<T>();
+
+            for (int index = 0; index < messageCount; index++)
+            {
+                var dataPoint = Activator.CreateInstance<T>();
+
+                if (null == dataPoint)
+                {
+                    throw new Exception("T type instance cannot be created 
from the stream data in Network Service Message");
+                }
+
+                await dataPoint.ReadAsync(reader, token);
+                Data.Add(dataPoint);
+            }
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
+        {
+            await writer.WriteStringAsync(SourceId.ToString(), token);
+            await writer.WriteStringAsync(DestId.ToString(), token);
+            await writer.WriteInt32Async(Data.Count, token);
+
+            foreach (var data in Data)
+            {
+                data.Write(writer);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41cbbcbb/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 9a47a69..96f6ee2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -142,6 +142,9 @@ under the License.
     <Compile Include="NetworkService\NetworkServiceOptions.cs" />
     <Compile Include="NetworkService\NsConnection.cs" />
     <Compile Include="NetworkService\NsMessage.cs" />
+    <Compile Include="NetworkService\WritableNetworkService.cs" />
+    <Compile Include="NetworkService\WritableNsConnection.cs" />
+    <Compile Include="NetworkService\WritableNsMessage.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
     <Compile Include="Utilities\Utils.cs" />


Reply via email to