Repository: incubator-reef Updated Branches: refs/heads/master 35b099caf -> b65ff47fa
[REEF-261]: Introduce StreamingCodec This change adds `IStreamingCodec` as well as implementations for (arrays of) base types. Also, a wrapper is provided that turns any `ICodec` into an (inefficient) `IStreamingCodec`. JIRA: [REEF-261](https://issues.apache.org/jira/browse/REEF-261) Pull Request: This cloes #236 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b65ff47f Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b65ff47f Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b65ff47f Branch: refs/heads/master Commit: b65ff47fa82f8d7ab9c5a37563f841f1d1699e40 Parents: 35b099c Author: Dhruv <[email protected]> Authored: Sat Jun 20 11:55:56 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Jun 23 10:47:53 2015 -0700 ---------------------------------------------------------------------- .../GroupCommunication/StreamingCodecTests.cs | 185 +++++++++++++++++++ .../Org.Apache.REEF.Network.Tests.csproj | 1 + .../Group/Config/StreamingCodecConfiguration.cs | 47 +++++ .../Pipelining/StreamingPipelineMessageCodec.cs | 97 ++++++++++ .../Org.Apache.REEF.Network.csproj | 10 + .../StreamingCodec/CodecToStreamingCodec.cs | 71 +++++++ .../DoubleArrayStreamingCodec.cs | 109 +++++++++++ .../DoubleStreamingCodec.cs | 82 ++++++++ .../FloatArrayStreamingCodec.cs | 109 +++++++++++ .../FloatStreamingCodec.cs | 82 ++++++++ .../IntArrayStreamingCodec.cs | 109 +++++++++++ .../CommonStreamingCodecs/IntStreamingCodec.cs | 82 ++++++++ .../StreamingCodec/IStreamingCodec.cs | 61 ++++++ .../Remote/Impl/StreamDataReader.cs | 8 +- .../Remote/Impl/StreamDataWriter.cs | 8 +- 15 files changed, 1055 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs new file mode 100644 index 0000000..6dea9f1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Network.StreamingCodec; +using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Tests.GroupCommunication +{ + /// <summary> + /// Defines streaming codec tests + /// </summary> + [TestClass] + public class StreamingCodecTests + { + [TestMethod] + public async Task TestCommonStreamingCodecs() + { + IInjector injector = TangFactory.GetTang().NewInjector(); + IStreamingCodec<int> intCodec = injector.GetInstance<IntStreamingCodec>(); + IStreamingCodec<double> doubleCodec = injector.GetInstance<DoubleStreamingCodec>(); + IStreamingCodec<float> floatCodec = injector.GetInstance<FloatStreamingCodec>(); + + IStreamingCodec<int[]> intArrCodec = injector.GetInstance<IntArrayStreamingCodec>(); + IStreamingCodec<double[]> doubleArrCodec = injector.GetInstance<DoubleArrayStreamingCodec>(); + IStreamingCodec<float[]> floatArrCodec = injector.GetInstance<FloatArrayStreamingCodec>(); + + CancellationToken token = new CancellationToken(); + + int obj = 5; + int[] intArr = {1, 2}; + double[] doubleArr = { 1, 2 }; + float[] floatArr = { 1, 2 }; + + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + intCodec.Write(obj, writer); + await intCodec.WriteAsync(obj + 1, writer, token); + doubleCodec.Write(obj + 2, writer); + await doubleCodec.WriteAsync(obj + 3, writer, token); + floatCodec.Write(obj + 4, writer); + await floatCodec.WriteAsync(obj + 5, writer, token); + intArrCodec.Write(intArr, writer); + await intArrCodec.WriteAsync(intArr, writer, token); + doubleArrCodec.Write(doubleArr, writer); + await doubleArrCodec.WriteAsync(doubleArr, writer, token); + floatArrCodec.Write(floatArr, writer); + await floatArrCodec.WriteAsync(floatArr, writer, token); + + stream.Position = 0; + IDataReader reader = new StreamDataReader(stream); + int res1 = intCodec.Read(reader); + int res2 = await intCodec.ReadAsync(reader, token); + double res3 = doubleCodec.Read(reader); + double res4 = await doubleCodec.ReadAsync(reader, token); + float res5 = floatCodec.Read(reader); + float res6 = await floatCodec.ReadAsync(reader, token); + int[] resArr1 = intArrCodec.Read(reader); + int[] resArr2 = await intArrCodec.ReadAsync(reader, token); + double[] resArr3 = doubleArrCodec.Read(reader); + double[] resArr4 = await doubleArrCodec.ReadAsync(reader, token); + float[] resArr5 = floatArrCodec.Read(reader); + float[] resArr6 = await floatArrCodec.ReadAsync(reader, token); + + Assert.AreEqual(obj, res1); + Assert.AreEqual(obj + 1, res2); + Assert.AreEqual(obj + 2, res3); + Assert.AreEqual(obj + 3, res4); + Assert.AreEqual(obj + 4, res5); + Assert.AreEqual(obj + 5, res6); + + for (int i = 0; i < intArr.Length; i++) + { + Assert.AreEqual(intArr[i], resArr1[i]); + Assert.AreEqual(intArr[i], resArr2[i]); + } + + for (int i = 0; i < doubleArr.Length; i++) + { + Assert.AreEqual(doubleArr[i], resArr3[i]); + Assert.AreEqual(doubleArr[i], resArr4[i]); + } + + for (int i = 0; i < floatArr.Length; i++) + { + Assert.AreEqual(floatArr[i], resArr5[i]); + Assert.AreEqual(floatArr[i], resArr6[i]); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void TestIntArrayStreamingCodecsNullException() + { + IInjector injector = TangFactory.GetTang().NewInjector(); + IStreamingCodec<int[]> intArrCodec = injector.GetInstance<IntArrayStreamingCodec>(); + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + intArrCodec.Write(null, writer); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void TestFloatArrayStreamingCodecsNullException() + { + IInjector injector = TangFactory.GetTang().NewInjector(); + IStreamingCodec<float[]> floatArrCodec = injector.GetInstance<FloatArrayStreamingCodec>(); + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + floatArrCodec.Write(null, writer); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void TestDoubleArrayStreamingCodecsNullException() + { + IInjector injector = TangFactory.GetTang().NewInjector(); + IStreamingCodec<double[]> doubleArrCodec = injector.GetInstance<DoubleArrayStreamingCodec>(); + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + doubleArrCodec.Write(null, writer); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void TestNullStreamException() + { + IDataWriter writer = new StreamDataWriter(null); + writer.WriteFloat(2.0f); + } + + [TestMethod] + public async Task TestCodecToStreamingCodec() + { + var config = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation(GenericType<ICodec<int>>.Class, GenericType<IntCodec>.Class) + .BindImplementation(GenericType<IStreamingCodec<int>>.Class, + GenericType<CodecToStreamingCodec<int>>.Class) + .Build(); + + IStreamingCodec<int> streamingCodec = + TangFactory.GetTang().NewInjector(config).GetInstance<IStreamingCodec<int>>(); + + CancellationToken token = new CancellationToken(); + + int obj = 5; + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + streamingCodec.Write(obj, writer); + await streamingCodec.WriteAsync(obj + 1, writer, token); + + stream.Position = 0; + IDataReader reader = new StreamDataReader(stream); + int res1 = streamingCodec.Read(reader); + int res2 = await streamingCodec.ReadAsync(reader, token); + Assert.AreEqual(obj, res1); + Assert.AreEqual(obj + 1, res2); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj index 6cd5c05..e0568aa 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj @@ -53,6 +53,7 @@ under the License. <Compile Include="GroupCommunication\GroupCommuDriverTests.cs" /> <Compile Include="GroupCommunication\GroupCommunicationTests.cs" /> <Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" /> + <Compile Include="GroupCommunication\StreamingCodecTests.cs" /> <Compile Include="NamingService\NameServerTests.cs" /> <Compile Include="NetworkService\WritableNetworkServiceTests.cs" /> <Compile Include="NetworkService\NetworkServiceTests.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs new file mode 100644 index 0000000..f655b91 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.StreamingCodec; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.Network.Group.Config +{ + /// <summary> + /// Defines configuration for streaming codecs and pipeline message + /// streaming codecs by taking streaming codec as input. + /// </summary> + /// <typeparam name="T">Generic type of message</typeparam> + public sealed class StreamingCodecConfiguration<T> : ConfigurationModuleBuilder + { + /// <summary> + /// RequiredImpl for Codec. Client needs to set implementation for this paramter + /// </summary> + public static readonly RequiredImpl<IStreamingCodec<T>> Codec = new RequiredImpl<IStreamingCodec<T>>(); + + /// <summary> + /// Configuration Module for Codec + /// </summary> + public static ConfigurationModule Conf = new StreamingCodecConfiguration<T>() + .BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec) + .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs new file mode 100644 index 0000000..aff8558 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Network.StreamingCodec; + +namespace Org.Apache.REEF.Network.Group.Pipelining +{ + /// <summary> + /// The streaming codec for PipelineMessage + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public sealed class StreamingPipelineMessageCodec<T> : IStreamingCodec<PipelineMessage<T>> + { + /// <summary> + /// Creates new PipelineMessageCodec + /// </summary> + /// <param name="baseCodec">The codec for actual message in PipelineMessage</param> + [Inject] + private StreamingPipelineMessageCodec(IStreamingCodec<T> baseCodec) + { + BaseCodec = baseCodec; + } + + /// <summary> + /// Codec for actual message T + /// </summary> + public IStreamingCodec<T> BaseCodec { get; private set; } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The Pipeline Message read from the reader</returns> + public PipelineMessage<T> Read(IDataReader reader) + { + var message = BaseCodec.Read(reader); + var isLast = reader.ReadBoolean(); + return new PipelineMessage<T>(message, isLast); + } + + /// <summary> + /// Writes the class fields to the writer. + /// </summary> + /// <param name="obj">The object to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(PipelineMessage<T> obj, IDataWriter writer) + { + BaseCodec.Write(obj.Data, writer); + writer.WriteBoolean(obj.IsLast); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The Pipeline Message read from the reader</returns> + public async Task<PipelineMessage<T>> ReadAsync(IDataReader reader, CancellationToken token) + { + var message = await BaseCodec.ReadAsync(reader, token); + var isLast = await reader.ReadBooleanAsync(token); + return new PipelineMessage<T>(message, isLast); + } + + /// <summary> + /// Writes the class fields to the writer. + /// </summary> + /// <param name="obj">The object to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async System.Threading.Tasks.Task WriteAsync(PipelineMessage<T> obj, IDataWriter writer, CancellationToken token) + { + await BaseCodec.WriteAsync(obj.Data, writer, token); + await writer.WriteBooleanAsync(obj.IsLast, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index b8989ee..6a8fa0b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -50,12 +50,19 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> + <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" /> <Compile Include="Group\Codec\GcmMessageProto.cs" /> <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" /> <Compile Include="Group\Config\CodecConfiguration.cs" /> <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" /> <Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" /> <Compile Include="Group\Config\ReduceFunctionConfiguration.cs" /> + <Compile Include="Group\Config\StreamingCodecConfiguration.cs" /> <Compile Include="Group\Driver\ICommunicationGroupDriver.cs" /> <Compile Include="Group\Driver\IGroupCommDriver.cs" /> <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" /> @@ -79,6 +86,7 @@ under the License. <Compile Include="Group\Operators\Impl\ScatterSender.cs" /> <Compile Include="Group\Operators\Impl\Sender.cs" /> <Compile Include="Group\Operators\IOperatorSpec.cs" /> + <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" /> <Compile Include="Group\Task\IOperatorTopology.cs" /> <Compile Include="Group\Operators\IReduceFunction.cs" /> <Compile Include="Group\Operators\IReduceReceiver.cs" /> @@ -147,6 +155,8 @@ under the License. <Compile Include="NetworkService\WritableNsConnection.cs" /> <Compile Include="NetworkService\WritableNsMessage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="StreamingCodec\CodecToStreamingCodec.cs" /> + <Compile Include="StreamingCodec\IStreamingCodec.cs" /> <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> <Compile Include="Utilities\Utils.cs" /> </ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs new file mode 100644 index 0000000..1964bb1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec +{ + /// <summary> + /// Converts codec to streaming codec + /// </summary> + /// <typeparam name="T">Message type</typeparam> + public sealed class CodecToStreamingCodec<T> : IStreamingCodec<T> + { + private readonly ICodec<T> _codec; + + [Inject] + private CodecToStreamingCodec(ICodec<T> codec) + { + _codec = codec; + } + + public T Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] byteArr = new byte[length]; + reader.Read(ref byteArr, 0, length); + return _codec.Decode(byteArr); + } + + public void Write(T obj, IDataWriter writer) + { + var byteArr = _codec.Encode(obj); + writer.WriteInt32(byteArr.Length); + writer.Write(byteArr, 0, byteArr.Length); + } + + public async Task<T> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] byteArr = new byte[length]; + await reader.ReadAsync(byteArr, 0, length, token); + return _codec.Decode(byteArr); + } + + public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token) + { + var byteArr = _codec.Encode(obj); + await writer.WriteInt32Async(byteArr.Length, token); + await writer.WriteAsync(byteArr, 0, byteArr.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs new file mode 100644 index 0000000..5caaaee --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for double array + /// </summary> + public sealed class DoubleArrayStreamingCodec : IStreamingCodec<double[]> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private DoubleArrayStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The double array read from the reader</returns> + public double[] Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] buffer = new byte[sizeof(double)*length]; + reader.Read(ref buffer, 0, buffer.Length); + double[] doubleArr = new double[length]; + Buffer.BlockCopy(buffer, 0, doubleArr, 0, buffer.Length); + return doubleArr; + } + + /// <summary> + /// Writes the double array to the writer. + /// </summary> + /// <param name="obj">The double array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(double[] obj, IDataWriter writer) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "double array is null"); + } + + writer.WriteInt32(obj.Length); + byte[] buffer = new byte[sizeof(double) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); + writer.Write(buffer, 0, buffer.Length); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The double array read from the reader</returns> + public async Task<double[]> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] buffer = new byte[sizeof(double) * length]; + await reader.ReadAsync(buffer, 0, buffer.Length, token); + double[] doubleArr = new double[length]; + Buffer.BlockCopy(buffer, 0, doubleArr, 0, sizeof(double) * length); + return doubleArr; + } + + /// <summary> + /// Writes the double array to the writer. + /// </summary> + /// <param name="obj">The double array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(double[] obj, IDataWriter writer, CancellationToken token) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "double array is null"); + } + + await writer.WriteInt32Async(obj.Length, token); + byte[] buffer = new byte[sizeof(double) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(double) * obj.Length); + await writer.WriteAsync(buffer, 0, buffer.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs new file mode 100644 index 0000000..5f19f00 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for double + /// </summary> + public sealed class DoubleStreamingCodec : IStreamingCodec<double> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private DoubleStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The double read from the reader</returns> + public double Read(IDataReader reader) + { + return reader.ReadDouble(); + } + + /// <summary> + /// Writes the double to the writer. + /// </summary> + /// <param name="obj">The double to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(double obj, IDataWriter writer) + { + writer.WriteDouble(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The double read from the reader</returns> + public async Task<double> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadDoubleAsync(token); + } + + /// <summary> + /// Writes the double to the writer. + /// </summary> + /// <param name="obj">The double to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(double obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteDoubleAsync(obj, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs new file mode 100644 index 0000000..8d68749 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for float array + /// </summary> + public sealed class FloatArrayStreamingCodec : IStreamingCodec<float[]> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private FloatArrayStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The float array read from the reader</returns> + public float[] Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] buffer = new byte[sizeof(float)*length]; + reader.Read(ref buffer, 0, buffer.Length); + float[] floatArr = new float[length]; + Buffer.BlockCopy(buffer, 0, floatArr, 0, buffer.Length); + return floatArr; + } + + /// <summary> + /// Writes the float array to the writer. + /// </summary> + /// <param name="obj">The float array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(float[] obj, IDataWriter writer) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "float array is null"); + } + + writer.WriteInt32(obj.Length); + byte[] buffer = new byte[sizeof(float) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); + writer.Write(buffer, 0, buffer.Length); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The float array read from the reader</returns> + public async Task<float[]> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] buffer = new byte[sizeof(float) * length]; + await reader.ReadAsync(buffer, 0, buffer.Length, token); + float[] floatArr = new float[length]; + Buffer.BlockCopy(buffer, 0, floatArr, 0, sizeof(float) * length); + return floatArr; + } + + /// <summary> + /// Writes the float array to the writer. + /// </summary> + /// <param name="obj">The float array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(float[] obj, IDataWriter writer, CancellationToken token) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "float array is null"); + } + + await writer.WriteInt32Async(obj.Length, token); + byte[] buffer = new byte[sizeof(float) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(float) * obj.Length); + await writer.WriteAsync(buffer, 0, buffer.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs new file mode 100644 index 0000000..22ed947 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for float + /// </summary> + public sealed class FloatStreamingCodec : IStreamingCodec<float> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private FloatStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The float read from the reader</returns> + public float Read(IDataReader reader) + { + return reader.ReadFloat(); + } + + /// <summary> + /// Writes the float to the writer. + /// </summary> + /// <param name="obj">The float to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(float obj, IDataWriter writer) + { + writer.WriteFloat(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The float read from the reader</returns> + public async Task<float> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadFloatAsync(token); + } + + /// <summary> + /// Writes the float to the writer. + /// </summary> + /// <param name="obj">The float to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(float obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteFloatAsync(obj, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs new file mode 100644 index 0000000..090d99f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for integer array + /// </summary> + public sealed class IntArrayStreamingCodec : IStreamingCodec<int[]> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private IntArrayStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The integer array read from the reader</returns> + public int[] Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] buffer = new byte[sizeof(int)*length]; + reader.Read(ref buffer, 0, buffer.Length); + int[] intArr = new int[length]; + Buffer.BlockCopy(buffer, 0, intArr, 0, buffer.Length); + return intArr; + } + + /// <summary> + /// Writes the integer array to the writer. + /// </summary> + /// <param name="obj">The integer array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(int[] obj, IDataWriter writer) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "integer array is null"); + } + + writer.WriteInt32(obj.Length); + byte[] buffer = new byte[sizeof(int) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); + writer.Write(buffer, 0, buffer.Length); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The integer array read from the reader</returns> + public async Task<int[]> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] buffer = new byte[sizeof(int) * length]; + await reader.ReadAsync(buffer, 0, buffer.Length, token); + int[] intArr = new int[length]; + Buffer.BlockCopy(buffer, 0, intArr, 0, sizeof(int) * length); + return intArr; + } + + /// <summary> + /// Writes the integer array to the writer. + /// </summary> + /// <param name="obj">The integer array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(int[] obj, IDataWriter writer, CancellationToken token) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "integer array is null"); + } + + await writer.WriteInt32Async(obj.Length, token); + byte[] buffer = new byte[sizeof(int) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(int) * obj.Length); + await writer.WriteAsync(buffer, 0, buffer.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs new file mode 100644 index 0000000..22e5490 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for integer + /// </summary> + public sealed class IntStreamingCodec : IStreamingCodec<int> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private IntStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The iinteger read from the reader</returns> + public int Read(IDataReader reader) + { + return reader.ReadInt32(); + } + + /// <summary> + /// Writes the integer to the writer. + /// </summary> + /// <param name="obj">The integer to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(int obj, IDataWriter writer) + { + writer.WriteInt32(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The integer read from the reader</returns> + public async Task<int> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadInt32Async(token); + } + + /// <summary> + /// Writes the integer to the writer. + /// </summary> + /// <param name="obj">The integer to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(int obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteInt32Async(obj, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs new file mode 100644 index 0000000..440b1c0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec +{ + /// <summary> + /// Codec Interface that external users should implement to directly write to the stream + /// </summary> + public interface IStreamingCodec<T> + { + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The instance of type T read from the reader</returns> + T Read(IDataReader reader); + + /// <summary> + /// Writes the class fields to the writer. + /// </summary> + /// <param name="obj">The object of type T to be encoded</param> + /// <param name="writer">The writer to which to write</param> + void Write(T obj, IDataWriter writer); + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The instance of type T read from the reader</returns> + Task<T> ReadAsync(IDataReader reader, CancellationToken token); + + /// <summary> + /// Writes the class fields to the writer. + /// </summary> + /// <param name="obj">The object of type T to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + Task WriteAsync(T obj, IDataWriter writer, CancellationToken token); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs index 67098d0..2e88573 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataReader.cs @@ -18,10 +18,7 @@ */ using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; using Org.Apache.REEF.Utilities.Diagnostics; @@ -47,6 +44,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="stream">Stream from which to read</param> public StreamDataReader(Stream stream) { + if (stream == null) + { + throw new ArgumentNullException("stream", "input stream cannot be null"); + } + _stream = stream; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b65ff47f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs index b702e1c..976253b 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamDataWriter.cs @@ -18,10 +18,7 @@ */ using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -40,6 +37,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="stream">Stream from which to read</param> public StreamDataWriter(Stream stream) { + if (stream == null) + { + throw new ArgumentNullException("stream", "input stream cannot be null"); + } + _stream = stream; }
