Repository: incubator-reef Updated Branches: refs/heads/master 9e3e09051 -> 33b6edf1d
[REEF-257] Improve memory efficiency in Wake.NET This adds the IWritable interface as well as supporting classes to allow for a late serialization of messages. This improves upon the current scheme where all messages are `byte[]` and need to be copied several times. JIRA: [REEF-257](https://issues.apache.org/jira/browse/REEF-257) Pull Request: This closes #149 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/33b6edf1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/33b6edf1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/33b6edf1 Branch: refs/heads/master Commit: 33b6edf1d5c182129df029aeba7f7badbe4f4a28 Parents: 9e3e090 Author: Dhruv <[email protected]> Authored: Fri Apr 17 10:45:05 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri May 1 10:58:15 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Wake.Tests.csproj | 2 + .../WritableString.cs | 93 +++++ .../WritableTransportTest.cs | 166 +++++++++ .../Org.Apache.REEF.Wake.csproj | 9 + .../Org.Apache.REEF.Wake/Remote/IDataReader.cs | 147 ++++++++ .../Org.Apache.REEF.Wake/Remote/IDataWriter.cs | 150 ++++++++ .../cs/Org.Apache.REEF.Wake/Remote/IWritable.cs | 61 ++++ .../Remote/Impl/StreamDataReader.cs | 353 +++++++++++++++++++ .../Remote/Impl/StreamDataWriter.cs | 218 ++++++++++++ .../Remote/Impl/WritableLink.cs | 293 +++++++++++++++ .../Remote/Impl/WritableTransportClient.cs | 128 +++++++ .../Remote/Impl/WritableTransportServer.cs | 198 +++++++++++ .../cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs | 104 ++++++ 13 files changed, 1922 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj index 911bd00..d709933 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -49,7 +49,9 @@ under the License. <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="PubSubSubjectTest.cs" /> <Compile Include="RemoteManagerTest.cs" /> + <Compile Include="WritableTransportTest.cs" /> <Compile Include="TransportTest.cs" /> + <Compile Include="WritableString.cs" /> </ItemGroup> <ItemGroup> <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs new file mode 100644 index 0000000..8a6d041 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.Tests +{ + /// <summary> + /// Writable wrapper around the string class + /// </summary> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableString : IWritable + { + /// <summary> + /// Returns the actual string data + /// </summary> + public string Data { get; set; } + + /// <summary> + /// Empty constructor for instantiation with reflection + /// </summary> + public WritableString() + { + } + + /// <summary> + /// Constructor + /// </summary> + /// <param name="data">The string data</param> + public WritableString(string data) + { + Data = data; + } + + /// <summary> + /// Reads the string + /// </summary> + /// <param name="reader">reader to read from</param> + public void Read(IDataReader reader) + { + Data = reader.ReadString(); + } + + /// <summary> + /// Writes the string + /// </summary> + /// <param name="writer">Writer to write</param> + public void Write(IDataWriter writer) + { + writer.WriteString(Data); + } + + /// <summary> + /// Reads the string + /// </summary> + /// <param name="reader">reader to read from</param> + /// <param name="token">the cancellation token</param> + public async Task ReadAsync(IDataReader reader, CancellationToken token) + { + Data = await reader.ReadStringAsync(token); + } + + /// <summary> + /// Writes the string + /// </summary> + /// <param name="writer">Writer to write</param> + /// <param name="token">the cancellation token</param> + public async Task WriteAsync(IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(Data, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs new file mode 100644 index 0000000..03de24e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Reactive; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Tests +{ + /// <summary> + /// Tests the WritableTransportServer, WritableTransportClient and WritableLink. + /// Basically the Wake transport layer. + /// </summary> + [TestClass] + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableTransportTest + { + /// <summary> + /// Tests whether WritableTransportServer receives + /// string messages from WritableTransportClient + /// </summary> + [TestMethod] + public void TestWritableTransportServer() + { + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); + + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler)) + { + server.Run(); + + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new WritableTransportClient<WritableString>(remoteEndpoint)) + { + client.Send(new WritableString("Hello")); + client.Send(new WritableString(", ")); + client.Send(new WritableString("World!")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + } + + Assert.AreEqual(3, events.Count); + Assert.AreEqual(events[0], "Hello"); + Assert.AreEqual(events[1], ", "); + Assert.AreEqual(events[2], "World!"); + } + + + /// <summary> + /// Checks whether WritableTransportClient is able to receive messages from remote host + /// </summary> + [TestMethod] + public void TestWritableTransportSenderStage() + { + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + + List<string> events = new List<string>(); + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + + // Server echoes the message back to the client + var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => tEvent.Link.Write(tEvent.Data)); + + using (WritableTransportServer<WritableString> server = new WritableTransportServer<WritableString>(endpoint, remoteHandler)) + { + server.Run(); + + var clientHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler)) + { + client.Send(new WritableString("Hello")); + client.Send(new WritableString(", ")); + client.Send(new WritableString(" World")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + } + + Assert.AreEqual(3, events.Count); + Assert.AreEqual(events[0], "Hello"); + Assert.AreEqual(events[1], ", "); + Assert.AreEqual(events[2], " World"); + } + + /// <summary> + /// Checks whether WritableTransportClient and WritableTransportServer works + /// in asynchronous condition while sending messages asynchronously from different + /// threads + /// </summary> + [TestMethod] + public void TestWritableRaceCondition() + { + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + int numEventsExpected = 150; + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); + + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler)) + { + server.Run(); + + for (int i = 0; i < numEventsExpected / 3; i++) + { + Task.Run(() => + { + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new WritableTransportClient<WritableString>(remoteEndpoint)) + { + client.Send(new WritableString("Hello")); + client.Send(new WritableString(", ")); + client.Send(new WritableString("World!")); + } + }); + } + + for (int i = 0; i < numEventsExpected; i++) + { + events.Add(queue.Take().Data); + } + } + + Assert.AreEqual(numEventsExpected, events.Count); + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index f9d1617..cc1ec2e 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -60,6 +60,14 @@ under the License. <Compile Include="IObserverFactory.cs" /> <Compile Include="IStage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Remote\IDataReader.cs" /> + <Compile Include="Remote\IDataWriter.cs" /> + <Compile Include="Remote\Impl\StreamDataReader.cs" /> + <Compile Include="Remote\Impl\StreamDataWriter.cs" /> + <Compile Include="Remote\Impl\WritableLink.cs" /> + <Compile Include="Remote\Impl\WritableTransportClient.cs" /> + <Compile Include="Remote\Impl\WritableTransportServer.cs" /> + <Compile Include="Remote\IWritable.cs" /> <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" /> <Compile Include="Remote\ICodec.cs" /> <Compile Include="Remote\ICodecFactory.cs" /> @@ -99,6 +107,7 @@ under the License. <Compile Include="Remote\Proto\WakeRemoteProtos.cs" /> <Compile Include="Remote\RemoteConfiguration.cs" /> <Compile Include="Remote\RemoteRuntimeException.cs" /> + <Compile Include="Remote\TypeCache.cs" /> <Compile Include="RX\AbstractObserver.cs" /> <Compile Include="RX\AbstractRxStage.cs" /> <Compile Include="RX\Impl\PubSubSubject.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs new file mode 100644 index 0000000..8c78a89 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataReader.cs @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Interface for reading standard primitives + /// </summary> + public interface IDataReader + { + /// <summary> + /// Reads double + /// </summary> + /// <returns>read double</returns> + double ReadDouble(); + + /// <summary> + /// Reads float + /// </summary> + /// <returns>read float</returns> + float ReadFloat(); + + /// <summary> + /// Reads long + /// </summary> + /// <returns>read long</returns> + long ReadLong(); + + /// <summary> + /// Reads bool + /// </summary> + /// <returns>read bool</returns> + bool ReadBoolean(); + + /// <summary> + /// Reads integer + /// </summary> + /// <returns>read integer</returns> + int ReadInt32(); + + /// <summary> + /// Reads short + /// </summary> + /// <returns>read short</returns> + short ReadInt16(); + + /// <summary> + /// Reads string + /// </summary> + /// <returns>read string</returns> + string ReadString(); + + /// <summary> + /// Reads data in to the buffer + /// </summary> + /// <param name="buffer">byte array to which to write</param> + /// <param name="index">starting index in the array</param> + /// <param name="bytesToRead">number of bytes to write</param> + /// <returns>Task handler that reads bytes and returns that number of bytes read + /// if success, otherwise -1</returns> + int Read(ref byte[] buffer, int index, int bytesToRead); + + /// <summary> + /// Reads double asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads double</returns> + Task<double> ReadDoubleAsync(CancellationToken token); + + /// <summary> + /// Reads float asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads float</returns> + Task<float> ReadFloatAsync(CancellationToken token); + + /// <summary> + /// Reads long asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads long</returns> + Task<long> ReadLongAsync(CancellationToken token); + + /// <summary> + /// Reads bool asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads bool</returns> + Task<bool> ReadBooleanAsync(CancellationToken token); + + /// <summary> + /// Reads integer asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads integer</returns> + Task<int> ReadInt32Async(CancellationToken token); + + /// <summary> + /// Reads short asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads short</returns> + Task<short> ReadInt16Async(CancellationToken token); + + /// <summary> + /// Reads string asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads string</returns> + Task<string> ReadStringAsync(CancellationToken token); + + + /// <summary> + /// Reads data in to the buffer asynchronously + /// </summary> + /// <param name="buffer">byte array to which to write</param> + /// <param name="index">starting index in the array</param> + /// <param name="bytesToRead">number of bytes to write</param> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads bytes and returns that number of bytes read + /// if success, otherwise -1</returns> + Task<int> ReadAsync(byte[] buffer, int index, int bytesToRead, CancellationToken token); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs new file mode 100644 index 0000000..13872ff --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDataWriter.cs @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Interface for writing standard primitives + /// </summary> + public interface IDataWriter + { + /// <summary> + /// write double + /// </summary> + /// <param name="obj">double to be written</param> + void WriteDouble(double obj); + + /// <summary> + /// write float + /// </summary> + /// <param name="obj">float to be written</param> + void WriteFloat(float obj); + + /// <summary> + /// write long + /// </summary> + /// <param name="obj">long to be written</param> + void WriteLong(long obj); + + /// <summary> + /// write boolean + /// </summary> + /// <param name="obj">bool to be written</param> + void WriteBoolean(bool obj); + + /// <summary> + /// write integer + /// </summary> + /// <param name="obj">int to be written</param> + void WriteInt32(int obj); + + /// <summary> + /// write short + /// </summary> + /// <param name="obj">short to be written</param> + void WriteInt16(short obj); + + /// <summary> + /// write string + /// </summary> + /// <param name="obj">string to be written</param> + void WriteString(string obj); + + /// <summary> + /// write bytes to the byte array + /// </summary> + /// <param name="buffer">byte array from which to read</param> + /// <param name="index">starting index in the array</param> + /// <param name="count">number of bytes to write</param> + void Write(byte[] buffer, int index, int count); + + /// <summary> + /// write double asynchronously + /// </summary> + /// <param name="obj">double to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteDoubleAsync(double obj, CancellationToken token); + + /// <summary> + /// write float asynchronously + /// </summary> + /// <param name="obj">float to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteFloatAsync(float obj, CancellationToken token); + + /// <summary> + /// write long asynchronously + /// </summary> + /// <param name="obj">long to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteLongAsync(long obj, CancellationToken token); + + /// <summary> + /// write bool asynchronously + /// </summary> + /// <param name="obj">bool to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteBooleanAsync(bool obj, CancellationToken token); + + /// <summary> + /// write integer asynchronously + /// </summary> + /// <param name="obj">integer to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteInt32Async(int obj, CancellationToken token); + + /// <summary> + /// write short asynchronously + /// </summary> + /// <param name="obj">short to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteInt16Async(short obj, CancellationToken token); + + /// <summary> + /// write string asynchronously + /// </summary> + /// <param name="obj">string to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteStringAsync(string obj, CancellationToken token); + + /// <summary> + /// write bytes to the byte array asynchronously + /// </summary> + /// <param name="buffer">byte array from which to read</param> + /// <param name="index">starting index in the array</param> + /// <param name="count">number of bytes to write</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + Task WriteAsync(byte[] buffer, int index, int count, CancellationToken token); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs new file mode 100644 index 0000000..644cf82 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Interface that classes should implement if they need to be readable to and writable + /// from the stream. It is assumed that the classes inheriting this interface will have a + /// default empty constructor + /// </summary> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public interface IWritable + { + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + void Read(IDataReader reader); + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + void Write(IDataWriter writer); + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + /// <param name="token">The cancellation token</param> + Task ReadAsync(IDataReader reader, CancellationToken token); + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">The cancellation token</param> + Task WriteAsync(IDataWriter writer, CancellationToken token); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs new file mode 100644 index 0000000..67098d0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Class with functions to Read from stream + /// </summary> + public class StreamDataReader : IDataReader + { + Logger Logger = Logger.GetLogger(typeof(StreamDataReader)); + + /// <summary> + /// Stream from which to read + /// </summary> + private readonly Stream _stream; + + /// <summary> + /// Constructs the StreamDataReader + /// </summary> + /// <param name="stream">Stream from which to read</param> + public StreamDataReader(Stream stream) + { + _stream = stream; + } + + /// <summary> + /// Reads double + /// </summary> + /// <returns>read double</returns> + public double ReadDouble() + { + byte[] doubleBytes = new byte[sizeof(double)]; + int readBytes = Read(ref doubleBytes, 0, sizeof(double)); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToDouble(doubleBytes, 0); + } + + /// <summary> + /// Reads float + /// </summary> + /// <returns>read float</returns> + public float ReadFloat() + { + byte[] floatBytes = new byte[sizeof(float)]; + int readBytes = Read(ref floatBytes, 0, sizeof(float)); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToSingle(floatBytes, 0); + } + + /// <summary> + /// Reads long + /// </summary> + /// <returns>read long</returns> + public long ReadLong() + { + byte[] longBytes = new byte[sizeof(long)]; + int readBytes = Read(ref longBytes, 0, sizeof (long)); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToInt64(longBytes, 0); + } + + /// <summary> + /// Reads bool + /// </summary> + /// <returns>read bool</returns> + public bool ReadBoolean() + { + byte[] boolBytes = new byte[sizeof(bool)]; + int readBytes = Read(ref boolBytes, 0, sizeof(bool)); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToBoolean(boolBytes, 0); + } + + /// <summary> + /// Reads integer + /// </summary> + /// <returns>read integer</returns> + public int ReadInt32() + { + byte[] intBytes = new byte[sizeof(int)]; + int readBytes = Read(ref intBytes, 0, sizeof (int)); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + + return BitConverter.ToInt32(intBytes, 0); + } + + /// <summary> + /// Reads short + /// </summary> + /// <returns>read short</returns> + public short ReadInt16() + { + byte[] intBytes = new byte[sizeof(Int16)]; + int readBytes = Read(ref intBytes, 0, sizeof(Int16)); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToInt16(intBytes, 0); + } + + /// <summary> + /// Reads string + /// </summary> + /// <returns>read string</returns> + public string ReadString() + { + int length = ReadInt32(); + + byte[] stringByte = new byte[length]; + int readBytes = Read(ref stringByte, 0, stringByte.Length); + + if (readBytes == -1) + { + return null; + } + + char[] stringChar = new char[stringByte.Length / sizeof(char)]; + Buffer.BlockCopy(stringByte, 0, stringChar, 0, stringByte.Length); + return new string(stringChar); + } + + /// <summary> + /// Reads data in to the buffer + /// </summary> + /// <param name="buffer">byte array to which to write</param> + /// <param name="index">starting index in the array</param> + /// <param name="bytesToRead">number of bytes to write</param> + /// <returns>Task handler that reads bytes and returns that number of bytes read + /// if success, otherwise -1</returns> + public int Read(ref byte[] buffer, int index, int bytesToRead) + { + if (buffer == null || buffer.Length < bytesToRead) + { + buffer = new byte[bytesToRead]; + } + + int totalBytesRead = 0; + while (totalBytesRead < bytesToRead) + { + int bytesRead = _stream.Read(buffer, index + totalBytesRead, bytesToRead - totalBytesRead); + if (bytesRead == 0) + { + // Read timed out or connection was closed + return -1; + } + + totalBytesRead += bytesRead; + } + + return totalBytesRead; + } + + /// <summary> + /// Reads double asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads double</returns> + public async Task<double> ReadDoubleAsync(CancellationToken token) + { + byte[] boolBytes = new byte[sizeof(double)]; + int readBytes = await ReadAsync(boolBytes, 0, sizeof(double), token); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToDouble(boolBytes, 0); + } + + /// <summary> + /// Reads float asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads float</returns> + public async Task<float> ReadFloatAsync(CancellationToken token) + { + byte[] boolBytes = new byte[sizeof(float)]; + int readBytes = await ReadAsync(boolBytes, 0, sizeof(float), token); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToSingle(boolBytes, 0); + } + + /// <summary> + /// Reads long asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads long</returns> + public async Task<long> ReadLongAsync(CancellationToken token) + { + byte[] longBytes = new byte[sizeof(long)]; + int readBytes = await ReadAsync(longBytes, 0, sizeof (long), token); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToInt64(longBytes, 0); + } + + /// <summary> + /// Reads bool asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads bool</returns> + public async Task<bool> ReadBooleanAsync(CancellationToken token) + { + byte[] boolBytes = new byte[sizeof(bool)]; + int readBytes = await ReadAsync(boolBytes, 0, sizeof(bool), token); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToBoolean(boolBytes, 0); + } + + /// <summary> + /// Reads integer asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads integer</returns> + public async Task<int> ReadInt32Async(CancellationToken token) + { + byte[] intBytes = new byte[sizeof(int)]; + int readBytes = await ReadAsync(intBytes, 0, sizeof (int), token); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToInt32(intBytes, 0); + } + + /// <summary> + /// Reads short asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads short</returns> + public async Task<short> ReadInt16Async(CancellationToken token) + { + byte[] intBytes = new byte[sizeof(Int16)]; + int readBytes = await ReadAsync(intBytes, 0, sizeof (Int16), token); + + if (readBytes == -1) + { + Exceptions.Throw(new Exception("No bytes read"), Logger); + } + return BitConverter.ToInt16(intBytes, 0); + } + + /// <summary> + /// Reads string asynchronously + /// </summary> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads string</returns> + public async Task<string> ReadStringAsync(CancellationToken token) + { + int length = ReadInt32(); + + byte[] stringByte = new byte[length]; + int readBytes = await ReadAsync(stringByte, 0, stringByte.Length, token); + + if (readBytes == -1) + { + return null; + } + + char[] stringChar = new char[stringByte.Length / sizeof(char)]; + Buffer.BlockCopy(stringByte, 0, stringChar, 0, stringByte.Length); + return new string(stringChar); + } + + /// <summary> + /// Reads data in to the buffer asynchronously + /// </summary> + /// <param name="buffer">byte array to which to write</param> + /// <param name="index">starting index in the array</param> + /// <param name="bytesToRead">number of bytes to write</param> + /// <param name="token">Cancellation token</param> + /// <returns>Task handler that reads bytes and returns that number of bytes read + /// if success, otherwise -1</returns> + public async Task<int> ReadAsync(byte[] buffer, int index, int bytesToRead, CancellationToken token) + { + int totalBytesRead = 0; + while (totalBytesRead < bytesToRead) + { + int bytesRead = await _stream.ReadAsync(buffer, totalBytesRead + index, bytesToRead - totalBytesRead, token); + if (bytesRead == 0) + { + // Read timed out or connection was closed + return -1; + } + + totalBytesRead += bytesRead; + } + + return bytesToRead; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs new file mode 100644 index 0000000..b702e1c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class StreamDataWriter : IDataWriter + { + /// <summary> + /// Stream to which to write + /// </summary> + private readonly Stream _stream; + + /// <summary> + /// Constructs the StreamDataReader + /// </summary> + /// <param name="stream">Stream from which to read</param> + public StreamDataWriter(Stream stream) + { + _stream = stream; + } + + /// <summary> + /// write double + /// </summary> + /// <param name="obj">double to be written</param> + public void WriteDouble(double obj) + { + _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(double)); + } + + /// <summary> + /// write float + /// </summary> + /// <param name="obj">float to be written</param> + public void WriteFloat(float obj) + { + _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(float)); + } + + /// <summary> + /// write long + /// </summary> + /// <param name="obj">long to be written</param> + public void WriteLong(long obj) + { + _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(long)); + } + + /// <summary> + /// write boolean + /// </summary> + /// <param name="obj">bool to be written</param> + public void WriteBoolean(bool obj) + { + _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(bool)); + } + + /// <summary> + /// write integer + /// </summary> + /// <param name="obj">int to be written</param> + public void WriteInt32(int obj) + { + _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(int)); + } + + /// <summary> + /// write short + /// </summary> + /// <param name="obj">short to be written</param> + public void WriteInt16(short obj) + { + _stream.Write(BitConverter.GetBytes(obj), 0, sizeof(short)); + } + + /// <summary> + /// write string + /// </summary> + /// <param name="obj">string to be written</param> + public void WriteString(string obj) + { + var charString = obj.ToCharArray(); + byte[] byteString = new byte[charString.Length * sizeof(char)]; + WriteInt32(byteString.Length); + Buffer.BlockCopy(charString, 0, byteString, 0, byteString.Length); + _stream.Write(byteString, 0, byteString.Length); + } + + /// <summary> + /// write bytes to the byte array + /// </summary> + /// <param name="buffer">byte array from which to read</param> + /// <param name="index">starting index in the array</param> + /// <param name="count">number of bytes to write</param> + public void Write(byte[] buffer, int index, int count) + { + _stream.Write(buffer, index, count); + } + + /// <summary> + /// write double asynchronously + /// </summary> + /// <param name="obj">double to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteDoubleAsync(double obj, CancellationToken token) + { + await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, sizeof(double), token); + } + + /// <summary> + /// write float asynchronously + /// </summary> + /// <param name="obj">float to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteFloatAsync(float obj, CancellationToken token) + { + await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, sizeof(float), token); + } + + /// <summary> + /// write long asynchronously + /// </summary> + /// <param name="obj">long to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteLongAsync(long obj, CancellationToken token) + { + await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, sizeof(long), token); + } + + /// <summary> + /// write bool asynchronously + /// </summary> + /// <param name="obj">bool to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteBooleanAsync(bool obj, CancellationToken token) + { + await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, sizeof(bool), token); + } + + /// <summary> + /// write integer asynchronously + /// </summary> + /// <param name="obj">integer to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteInt32Async(int obj, CancellationToken token) + { + await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, sizeof(int), token); + } + + /// <summary> + /// write short asynchronously + /// </summary> + /// <param name="obj">short to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteInt16Async(short obj, CancellationToken token) + { + await _stream.WriteAsync(BitConverter.GetBytes(obj), 0, sizeof(short), token); + } + + /// <summary> + /// write string asynchronously + /// </summary> + /// <param name="obj">string to be written</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteStringAsync(string obj, CancellationToken token) + { + var charString = obj.ToCharArray(); + byte[] byteString = new byte[charString.Length * sizeof(char)]; + await WriteInt32Async(byteString.Length, token); + Buffer.BlockCopy(charString, 0, byteString, 0, byteString.Length); + await _stream.WriteAsync(byteString, 0, byteString.Length, token); + } + + /// <summary> + /// write bytes to the byte array asynchronously + /// </summary> + /// <param name="buffer">byte array from which to read</param> + /// <param name="index">starting index in the array</param> + /// <param name="count">number of bytes to write</param> + /// <param name="token">Cancellation token</param> + /// <returns>the handler to the task</returns> + public async Task WriteAsync(byte[] buffer, int index, int count, CancellationToken token) + { + await _stream.WriteAsync(buffer, index, count, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs new file mode 100644 index 0000000..f867240 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Represents an open connection between remote hosts. This class is not thread safe + /// </summary> + /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableLink<T> : ILink<T> where T : IWritable + { + private static readonly Logger Logger = Logger.GetLogger(typeof (WritableLink<T>)); + + private readonly IPEndPoint _localEndpoint; + private bool _disposed; + private readonly NetworkStream _stream; + + /// <summary> + /// Cache structure to store the constructor functions for various types. + /// </summary> + private readonly TypeCache<T> _cache; + + /// <summary> + /// Stream reader to be passed to IWritable + /// </summary> + private readonly StreamDataReader _reader; + + /// <summary> + /// Stream writer from which to read from IWritable + /// </summary> + private readonly StreamDataWriter _writer; + + /// <summary> + /// Constructs a Link object. + /// Connects to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The remote endpoint to connect to</param> + public WritableLink(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + Client = new TcpClient(); + Client.Connect(remoteEndpoint); + + _stream = Client.GetStream(); + _localEndpoint = GetLocalEndpoint(); + _disposed = false; + _cache = new TypeCache<T>(); + _reader = new StreamDataReader(_stream); + _writer = new StreamDataWriter(_stream); + } + + /// <summary> + /// Constructs a Link object. + /// Uses the already connected TcpClient. + /// </summary> + /// <param name="client">The already connected client</param> + public WritableLink(TcpClient client) + { + if (client == null) + { + throw new ArgumentNullException("client"); + } + + Client = client; + _stream = Client.GetStream(); + _localEndpoint = GetLocalEndpoint(); + _disposed = false; + _cache = new TypeCache<T>(); + _reader = new StreamDataReader(_stream); + _writer = new StreamDataWriter(_stream); + } + + /// <summary> + /// Returns the local socket address + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _localEndpoint; } + } + + /// <summary> + /// Returns the remote socket address + /// </summary> + public IPEndPoint RemoteEndpoint + { + get { return (IPEndPoint) Client.Client.RemoteEndPoint; } + } + + /// <summary> + /// Gets the underlying TcpClient + /// </summary> + public TcpClient Client { get; private set; } + + /// <summary> + /// Writes the message to the remote host + /// </summary> + /// <param name="value">The data to write</param> + public void Write(T value) + { + if (value == null) + { + throw new ArgumentNullException("value"); + } + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger); + } + + _writer.WriteString(value.GetType().AssemblyQualifiedName); + value.Write(_writer); + } + + /// <summary> + /// Writes the value to this link asynchronously + /// </summary> + /// <param name="value">The data to write</param> + /// <param name="token">The cancellation token</param> + public async Task WriteAsync(T value, CancellationToken token) + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger); + } + + await _writer.WriteStringAsync(value.GetType().AssemblyQualifiedName, token); + await value.WriteAsync(_writer, token); + } + + /// <summary> + /// Reads the value from the link synchronously + /// </summary> + public T Read() + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger); + } + + string dataType = _reader.ReadString(); + + if (dataType == null) + { + return default(T); + } + + T value = _cache.GetInstance(dataType); + + if (value == null) + { + return default(T); + } + + value.Read(_reader); + return value; + } + + /// <summary> + /// Reads the value from the link asynchronously + /// </summary> + /// <param name="token">The cancellation token</param> + public async Task<T> ReadAsync(CancellationToken token) + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger); + } + + string dataType = ""; + + dataType = await _reader.ReadStringAsync(token); + + if (dataType == null) + { + return default(T); + } + + T value = _cache.GetInstance(dataType); + + if(value==null) + { + return default(T); + } + + await value.ReadAsync(_reader, token); + return value; + } + + /// <summary> + /// Close the client connection + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Subclasses of Links should overwrite this to handle disposing + /// of the link + /// </summary> + /// <param name="disposing">To dispose or not</param> + public virtual void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + try + { + Client.GetStream().Close(); + } + catch (InvalidOperationException) + { + Logger.Log(Level.Warning, "failed to close stream on a non-connected socket."); + } + + Client.Close(); + } + _disposed = true; + } + + /// <summary> + /// Overrides Equals. Two Link objects are equal if they are connected + /// to the same remote endpoint. + /// </summary> + /// <param name="obj">The object to compare</param> + /// <returns>True if the object is equal to this Link, otherwise false</returns> + public override bool Equals(object obj) + { + Link<T> other = obj as Link<T>; + if (other == null) + { + return false; + } + + return other.RemoteEndpoint.Equals(RemoteEndpoint); + } + + /// <summary> + /// Gets the hash code for the Link object. + /// </summary> + /// <returns>The object's hash code</returns> + public override int GetHashCode() + { + return RemoteEndpoint.GetHashCode(); + } + + /// <summary> + /// Discovers the IPEndpoint for the current machine. + /// </summary> + /// <returns>The local IPEndpoint</returns> + private IPEndPoint GetLocalEndpoint() + { + IPAddress address = NetworkUtils.LocalIPAddress; + int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port; + return new IPEndPoint(address, port); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs new file mode 100644 index 0000000..5e42fb7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Establish connections to TransportServer for remote message passing + /// </summary> + /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableTransportClient<T> : IDisposable where T : IWritable + { + private readonly ILink<T> _link; + private readonly IObserver<TransportEvent<T>> _observer; + private readonly CancellationTokenSource _cancellationSource; + private bool _disposed; + private static readonly Logger Logger = Logger.GetLogger(typeof(WritableTransportClient<T>)); + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + public WritableTransportClient(IPEndPoint remoteEndpoint) + { + Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger); + + _link = new WritableLink<T>(remoteEndpoint); + _cancellationSource = new CancellationTokenSource(); + _disposed = false; + } + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="observer">Callback used when receiving responses from remote host</param> + public WritableTransportClient(IPEndPoint remoteEndpoint, + IObserver<TransportEvent<T>> observer) + : this(remoteEndpoint) + { + _observer = observer; + Task.Run(() => ResponseLoop()); + } + + /// <summary> + /// Gets the underlying transport link. + /// </summary> + public ILink<T> Link + { + get { return _link; } + } + + /// <summary> + /// Send the remote message. + /// </summary> + /// <param name="message">The message to send</param> + public void Send(T message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + _link.Write(message); + } + + /// <summary> + /// Close all opened connections + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _link.Dispose(); + _disposed = true; + } + } + + /// <summary> + /// Continually read responses from remote host + /// </summary> + private async Task ResponseLoop() + { + while (!_cancellationSource.IsCancellationRequested) + { + T message = await _link.ReadAsync(_cancellationSource.Token); + if (message == null) + { + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); + _observer.OnNext(transportEvent); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs new file mode 100644 index 0000000..05a520d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Server to handle incoming remote messages. + /// </summary> + /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableTransportServer<T> : IDisposable where T : IWritable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>)); + + private readonly TcpListener _listener; + private readonly CancellationTokenSource _cancellationSource; + private readonly IObserver<TransportEvent<T>> _remoteObserver; + private bool _disposed; + private Task _serverTask; + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="port">Port to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler) + : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler) + { + } + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="localEndpoint">Endpoint to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + public WritableTransportServer(IPEndPoint localEndpoint, + IObserver<TransportEvent<T>> remoteHandler) + { + _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); + _remoteObserver = remoteHandler; + _cancellationSource = new CancellationTokenSource(); + _cancellationSource.Token.ThrowIfCancellationRequested(); + _disposed = false; + } + + /// <summary> + /// Returns the listening endpoint for the TransportServer + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _listener.LocalEndpoint as IPEndPoint; } + } + + /// <summary> + /// Starts listening for incoming remote messages. + /// </summary> + public void Run() + { + _listener.Start(); + _serverTask = Task.Run(() => StartServer()); + } + + /// <summary> + /// Close the TransportServer and all open connections + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _cancellationSource.Cancel(); + + try + { + _listener.Stop(); + } + catch (SocketException) + { + LOGGER.Log(Level.Info, "Disposing of transport server before listener is created."); + } + + if (_serverTask != null) + { + _serverTask.Wait(); + + // Give the TransportServer Task 500ms to shut down, ignore any timeout errors + try + { + CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500); + _serverTask.Wait(serverDisposeTimeout.Token); + } + catch (Exception e) + { + Console.Error.WriteLine(e); + } + finally + { + _serverTask.Dispose(); + } + } + } + + _disposed = true; + } + + /// <summary> + /// Helper method to start TransportServer. This will + /// be run in an asynchronous Task. + /// </summary> + /// <returns>An asynchronous Task for the running server.</returns> + private async Task StartServer() + { + try + { + while (!_cancellationSource.Token.IsCancellationRequested) + { + TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); + ProcessClient(client).Forget(); + } + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + catch (OperationCanceledException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + } + + /// <summary> + /// Recieves event from connected TcpClient and invokes handler on the event. + /// </summary> + /// <param name="client">The connected client</param> + private async Task ProcessClient(TcpClient client) + { + // Keep reading messages from client until they disconnect or timeout + CancellationToken token = _cancellationSource.Token; + using (ILink<T> link = new WritableLink<T>(client)) + { + while (!token.IsCancellationRequested) + { + //T message = link.Read(); + T message = await link.ReadAsync(token); + + if (message == null) + { + //LOGGER.Log(Level.Error, + // "ProcessClient, no message received, break." + link.RemoteEndpoint + " - " + + // link.LocalEndpoint); + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); + + _remoteObserver.OnNext(transportEvent); + } + LOGGER.Log(Level.Error, + "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33b6edf1/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs new file mode 100644 index 0000000..9bbe549 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Cache used to store the constructor functions to instantiate various Types. + /// It is assumed that all types are inherited from the base type T + /// </summary> + /// <typeparam name="T"></typeparam> + public class TypeCache<T> + { + private const BindingFlags ConstructorFlags = + BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance; + + /// <summary> + /// Cache that stores the constructors for already used types using the assmebly name + /// </summary> + private readonly Dictionary<string, Func<T>> _typeConstructorMapping = new Dictionary<string, Func<T>>(); + + public T GetInstance(string typeString) + { + if (!_typeConstructorMapping.ContainsKey(typeString)) + { + var type = Type.GetType(typeString); + + if (type != null) + { + _typeConstructorMapping[typeString] = GetActivator(type); + } + } + + return _typeConstructorMapping[typeString](); + } + + /// <summary> + /// Returns the constructor for type T given actual type. Type can be + /// that of inherited class. + /// <param name="actualType">The actual type for which we want to create the constructor.</param> + /// <returns>The constructor function</returns> + /// </summary> + private Func<T> GetActivator(Type actualType) + { + ConstructorInfo constructor; + if (actualType.IsValueType) + { + // For struct types, there is an implicit default constructor. + constructor = null; + } + else if (!TryGetDefaultConstructor(actualType, out constructor)) + { + throw new Exception("could not get default constructor"); + } + NewExpression nex = constructor == null ? Expression.New(actualType) : Expression.New(constructor); + var body = Expression.Convert(nex, typeof (T)); + Expression<Func<T>> lambda = Expression.Lambda<Func<T>>(body); + + return lambda.Compile(); + } + + /// <summary> + /// Fills the constructor information and meta-data + /// </summary> + /// <param name="type">The type for which constructor needs to be created</param> + /// <param name="constructor">The information and meta data for the constructor creation</param> + /// <returns></returns> + private bool TryGetDefaultConstructor(Type type, out ConstructorInfo constructor) + { + // first, determine if there is a suitable constructor + if (type.IsAbstract || type.IsInterface) + { + constructor = null; + return false; + } + + constructor = type.GetConstructor(ConstructorFlags, null, Type.EmptyTypes, null); + return null != constructor; + } + } +}
