[REEF-423]: Move Streaming Codec classes from Network to Wake layer JIRA: [REEF-423](https://issues.apache.org/jira/browse/REEF-423)
Pull Request: This closes #257 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e70bb56c Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e70bb56c Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e70bb56c Branch: refs/heads/master Commit: e70bb56c20834d96440b541bc5e425dfe53283c7 Parents: 6eac41a Author: Dhruv <[email protected]> Authored: Sun Jun 28 15:39:39 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Jun 29 09:24:31 2015 -0700 ---------------------------------------------------------------------- .../BroadcastReduceDriver.cs | 2 +- .../PipelinedBroadcastReduceDriver.cs | 2 +- .../ScatterReduceDriver.cs | 2 +- .../GroupCommunicationTests.cs | 4 +- .../GroupCommunicationTreeTopologyTests.cs | 2 +- .../GroupCommunication/StreamingCodecTests.cs | 4 +- .../CodecToStreamingCodecConfiguration.cs | 2 +- .../Group/Config/StreamingCodecConfiguration.cs | 2 +- .../Driver/Impl/GroupCommunicationMessage.cs | 2 +- .../Pipelining/StreamingPipelineMessageCodec.cs | 2 +- .../Group/Task/Impl/OperatorTopology.cs | 2 +- .../Org.Apache.REEF.Network.csproj | 9 -- .../StreamingCodec/CodecToStreamingCodec.cs | 71 ------------ .../DoubleArrayStreamingCodec.cs | 109 ------------------- .../DoubleStreamingCodec.cs | 82 -------------- .../FloatArrayStreamingCodec.cs | 109 ------------------- .../FloatStreamingCodec.cs | 82 -------------- .../IntArrayStreamingCodec.cs | 109 ------------------- .../CommonStreamingCodecs/IntStreamingCodec.cs | 82 -------------- .../StringStreamingCodec.cs | 82 -------------- .../StreamingCodec/IStreamingCodec.cs | 61 ----------- .../Org.Apache.REEF.Wake.csproj | 9 ++ .../StreamingCodec/CodecToStreamingCodec.cs | 71 ++++++++++++ .../DoubleArrayStreamingCodec.cs | 109 +++++++++++++++++++ .../DoubleStreamingCodec.cs | 82 ++++++++++++++ .../FloatArrayStreamingCodec.cs | 109 +++++++++++++++++++ .../FloatStreamingCodec.cs | 82 ++++++++++++++ .../IntArrayStreamingCodec.cs | 109 +++++++++++++++++++ .../CommonStreamingCodecs/IntStreamingCodec.cs | 82 ++++++++++++++ .../StringStreamingCodec.cs | 82 ++++++++++++++ .../StreamingCodec/IStreamingCodec.cs | 61 +++++++++++ 31 files changed, 809 insertions(+), 809 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs index 85ae3c0..051b23d 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs @@ -27,7 +27,7 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs index f32de61..6675fb5 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs @@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; @@ -44,6 +43,7 @@ using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Parameters; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs index 8a9d3e2..10595fd 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs @@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; @@ -44,6 +43,7 @@ using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs index f2bec7c..61813af 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs @@ -30,7 +30,6 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Examples.GroupCommunication; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; @@ -43,7 +42,6 @@ using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Network.Tests.NamingService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; @@ -53,6 +51,8 @@ using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.StreamingCodec; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Network.Tests.GroupCommunication { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs index 88ae9b3..6b9b8c7 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs @@ -21,7 +21,6 @@ using System.Collections.Generic; using System.Globalization; using System.Linq; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -34,6 +33,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Network.Tests.GroupCommunication { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs index d4f0647..1757c6c 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs @@ -22,13 +22,13 @@ using System.IO; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Network.StreamingCodec; -using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.StreamingCodec; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Network.Tests.GroupCommunication { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs index a270272..f8e1483 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs @@ -18,10 +18,10 @@ */ using Org.Apache.REEF.Network.Group.Pipelining; -using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Config { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs index f655b91..2a30047 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs @@ -18,9 +18,9 @@ */ using Org.Apache.REEF.Network.Group.Pipelining; -using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Config { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs index 33f9c92..ed7855b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs @@ -19,9 +19,9 @@ using System; using System.Threading; -using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Driver.Impl { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs index aff8558..9236e95 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs @@ -21,7 +21,7 @@ using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Wake.Remote; using System.Threading; using System.Threading.Tasks; -using Org.Apache.REEF.Network.StreamingCodec; +using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Pipelining { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index 67e1da9..6ecf7f3 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -29,10 +29,10 @@ using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Task.Impl { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 390bbb4..7b167b4 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -52,13 +52,6 @@ under the License. <ItemGroup> <Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" /> <Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\StringStreamingCodec.cs" /> - <Compile Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" /> <Compile Include="Group\Config\CodecConfiguration.cs" /> <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" /> <Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" /> @@ -156,8 +149,6 @@ under the License. <Compile Include="NetworkService\WritableNsConnection.cs" /> <Compile Include="NetworkService\WritableNsMessage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> - <Compile Include="StreamingCodec\CodecToStreamingCodec.cs" /> - <Compile Include="StreamingCodec\IStreamingCodec.cs" /> <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> <Compile Include="Utilities\Utils.cs" /> </ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs deleted file mode 100644 index 1964bb1..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec -{ - /// <summary> - /// Converts codec to streaming codec - /// </summary> - /// <typeparam name="T">Message type</typeparam> - public sealed class CodecToStreamingCodec<T> : IStreamingCodec<T> - { - private readonly ICodec<T> _codec; - - [Inject] - private CodecToStreamingCodec(ICodec<T> codec) - { - _codec = codec; - } - - public T Read(IDataReader reader) - { - int length = reader.ReadInt32(); - byte[] byteArr = new byte[length]; - reader.Read(ref byteArr, 0, length); - return _codec.Decode(byteArr); - } - - public void Write(T obj, IDataWriter writer) - { - var byteArr = _codec.Encode(obj); - writer.WriteInt32(byteArr.Length); - writer.Write(byteArr, 0, byteArr.Length); - } - - public async Task<T> ReadAsync(IDataReader reader, CancellationToken token) - { - int length = await reader.ReadInt32Async(token); - byte[] byteArr = new byte[length]; - await reader.ReadAsync(byteArr, 0, length, token); - return _codec.Decode(byteArr); - } - - public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token) - { - var byteArr = _codec.Encode(obj); - await writer.WriteInt32Async(byteArr.Length, token); - await writer.WriteAsync(byteArr, 0, byteArr.Length, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs deleted file mode 100644 index 5caaaee..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for double array - /// </summary> - public sealed class DoubleArrayStreamingCodec : IStreamingCodec<double[]> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private DoubleArrayStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The double array read from the reader</returns> - public double[] Read(IDataReader reader) - { - int length = reader.ReadInt32(); - byte[] buffer = new byte[sizeof(double)*length]; - reader.Read(ref buffer, 0, buffer.Length); - double[] doubleArr = new double[length]; - Buffer.BlockCopy(buffer, 0, doubleArr, 0, buffer.Length); - return doubleArr; - } - - /// <summary> - /// Writes the double array to the writer. - /// </summary> - /// <param name="obj">The double array to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(double[] obj, IDataWriter writer) - { - if (obj == null) - { - throw new ArgumentNullException("obj", "double array is null"); - } - - writer.WriteInt32(obj.Length); - byte[] buffer = new byte[sizeof(double) * obj.Length]; - Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); - writer.Write(buffer, 0, buffer.Length); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The double array read from the reader</returns> - public async Task<double[]> ReadAsync(IDataReader reader, CancellationToken token) - { - int length = await reader.ReadInt32Async(token); - byte[] buffer = new byte[sizeof(double) * length]; - await reader.ReadAsync(buffer, 0, buffer.Length, token); - double[] doubleArr = new double[length]; - Buffer.BlockCopy(buffer, 0, doubleArr, 0, sizeof(double) * length); - return doubleArr; - } - - /// <summary> - /// Writes the double array to the writer. - /// </summary> - /// <param name="obj">The double array to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(double[] obj, IDataWriter writer, CancellationToken token) - { - if (obj == null) - { - throw new ArgumentNullException("obj", "double array is null"); - } - - await writer.WriteInt32Async(obj.Length, token); - byte[] buffer = new byte[sizeof(double) * obj.Length]; - Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(double) * obj.Length); - await writer.WriteAsync(buffer, 0, buffer.Length, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs deleted file mode 100644 index 5f19f00..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for double - /// </summary> - public sealed class DoubleStreamingCodec : IStreamingCodec<double> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private DoubleStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The double read from the reader</returns> - public double Read(IDataReader reader) - { - return reader.ReadDouble(); - } - - /// <summary> - /// Writes the double to the writer. - /// </summary> - /// <param name="obj">The double to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(double obj, IDataWriter writer) - { - writer.WriteDouble(obj); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The double read from the reader</returns> - public async Task<double> ReadAsync(IDataReader reader, CancellationToken token) - { - return await reader.ReadDoubleAsync(token); - } - - /// <summary> - /// Writes the double to the writer. - /// </summary> - /// <param name="obj">The double to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(double obj, IDataWriter writer, CancellationToken token) - { - await writer.WriteDoubleAsync(obj, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs deleted file mode 100644 index 8d68749..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for float array - /// </summary> - public sealed class FloatArrayStreamingCodec : IStreamingCodec<float[]> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private FloatArrayStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The float array read from the reader</returns> - public float[] Read(IDataReader reader) - { - int length = reader.ReadInt32(); - byte[] buffer = new byte[sizeof(float)*length]; - reader.Read(ref buffer, 0, buffer.Length); - float[] floatArr = new float[length]; - Buffer.BlockCopy(buffer, 0, floatArr, 0, buffer.Length); - return floatArr; - } - - /// <summary> - /// Writes the float array to the writer. - /// </summary> - /// <param name="obj">The float array to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(float[] obj, IDataWriter writer) - { - if (obj == null) - { - throw new ArgumentNullException("obj", "float array is null"); - } - - writer.WriteInt32(obj.Length); - byte[] buffer = new byte[sizeof(float) * obj.Length]; - Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); - writer.Write(buffer, 0, buffer.Length); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The float array read from the reader</returns> - public async Task<float[]> ReadAsync(IDataReader reader, CancellationToken token) - { - int length = await reader.ReadInt32Async(token); - byte[] buffer = new byte[sizeof(float) * length]; - await reader.ReadAsync(buffer, 0, buffer.Length, token); - float[] floatArr = new float[length]; - Buffer.BlockCopy(buffer, 0, floatArr, 0, sizeof(float) * length); - return floatArr; - } - - /// <summary> - /// Writes the float array to the writer. - /// </summary> - /// <param name="obj">The float array to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(float[] obj, IDataWriter writer, CancellationToken token) - { - if (obj == null) - { - throw new ArgumentNullException("obj", "float array is null"); - } - - await writer.WriteInt32Async(obj.Length, token); - byte[] buffer = new byte[sizeof(float) * obj.Length]; - Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(float) * obj.Length); - await writer.WriteAsync(buffer, 0, buffer.Length, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs deleted file mode 100644 index 22ed947..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for float - /// </summary> - public sealed class FloatStreamingCodec : IStreamingCodec<float> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private FloatStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The float read from the reader</returns> - public float Read(IDataReader reader) - { - return reader.ReadFloat(); - } - - /// <summary> - /// Writes the float to the writer. - /// </summary> - /// <param name="obj">The float to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(float obj, IDataWriter writer) - { - writer.WriteFloat(obj); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The float read from the reader</returns> - public async Task<float> ReadAsync(IDataReader reader, CancellationToken token) - { - return await reader.ReadFloatAsync(token); - } - - /// <summary> - /// Writes the float to the writer. - /// </summary> - /// <param name="obj">The float to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(float obj, IDataWriter writer, CancellationToken token) - { - await writer.WriteFloatAsync(obj, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs deleted file mode 100644 index 090d99f..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for integer array - /// </summary> - public sealed class IntArrayStreamingCodec : IStreamingCodec<int[]> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private IntArrayStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The integer array read from the reader</returns> - public int[] Read(IDataReader reader) - { - int length = reader.ReadInt32(); - byte[] buffer = new byte[sizeof(int)*length]; - reader.Read(ref buffer, 0, buffer.Length); - int[] intArr = new int[length]; - Buffer.BlockCopy(buffer, 0, intArr, 0, buffer.Length); - return intArr; - } - - /// <summary> - /// Writes the integer array to the writer. - /// </summary> - /// <param name="obj">The integer array to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(int[] obj, IDataWriter writer) - { - if (obj == null) - { - throw new ArgumentNullException("obj", "integer array is null"); - } - - writer.WriteInt32(obj.Length); - byte[] buffer = new byte[sizeof(int) * obj.Length]; - Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); - writer.Write(buffer, 0, buffer.Length); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The integer array read from the reader</returns> - public async Task<int[]> ReadAsync(IDataReader reader, CancellationToken token) - { - int length = await reader.ReadInt32Async(token); - byte[] buffer = new byte[sizeof(int) * length]; - await reader.ReadAsync(buffer, 0, buffer.Length, token); - int[] intArr = new int[length]; - Buffer.BlockCopy(buffer, 0, intArr, 0, sizeof(int) * length); - return intArr; - } - - /// <summary> - /// Writes the integer array to the writer. - /// </summary> - /// <param name="obj">The integer array to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(int[] obj, IDataWriter writer, CancellationToken token) - { - if (obj == null) - { - throw new ArgumentNullException("obj", "integer array is null"); - } - - await writer.WriteInt32Async(obj.Length, token); - byte[] buffer = new byte[sizeof(int) * obj.Length]; - Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(int) * obj.Length); - await writer.WriteAsync(buffer, 0, buffer.Length, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs deleted file mode 100644 index 22e5490..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for integer - /// </summary> - public sealed class IntStreamingCodec : IStreamingCodec<int> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private IntStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The iinteger read from the reader</returns> - public int Read(IDataReader reader) - { - return reader.ReadInt32(); - } - - /// <summary> - /// Writes the integer to the writer. - /// </summary> - /// <param name="obj">The integer to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(int obj, IDataWriter writer) - { - writer.WriteInt32(obj); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The integer read from the reader</returns> - public async Task<int> ReadAsync(IDataReader reader, CancellationToken token) - { - return await reader.ReadInt32Async(token); - } - - /// <summary> - /// Writes the integer to the writer. - /// </summary> - /// <param name="obj">The integer to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(int obj, IDataWriter writer, CancellationToken token) - { - await writer.WriteInt32Async(obj, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs deleted file mode 100644 index 63036f5..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs -{ - /// <summary> - /// Streaming codec for string - /// </summary> - public sealed class StringStreamingCodec : IStreamingCodec<string> - { - /// <summary> - /// Injectable constructor - /// </summary> - [Inject] - private StringStreamingCodec() - { - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The string read from the reader</returns> - public string Read(IDataReader reader) - { - return reader.ReadString(); - } - - /// <summary> - /// Writes the string to the writer. - /// </summary> - /// <param name="obj">The string to be encoded</param> - /// <param name="writer">The writer to which to write</param> - public void Write(string obj, IDataWriter writer) - { - writer.WriteString(obj); - } - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The string read from the reader</returns> - public async Task<string> ReadAsync(IDataReader reader, CancellationToken token) - { - return await reader.ReadStringAsync(token); - } - - /// <summary> - /// Writes the string to the writer. - /// </summary> - /// <param name="obj">The string to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - public async Task WriteAsync(string obj, IDataWriter writer, CancellationToken token) - { - await writer.WriteStringAsync(obj, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs deleted file mode 100644 index 440b1c0..0000000 --- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.StreamingCodec -{ - /// <summary> - /// Codec Interface that external users should implement to directly write to the stream - /// </summary> - public interface IStreamingCodec<T> - { - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - ///<returns>The instance of type T read from the reader</returns> - T Read(IDataReader reader); - - /// <summary> - /// Writes the class fields to the writer. - /// </summary> - /// <param name="obj">The object of type T to be encoded</param> - /// <param name="writer">The writer to which to write</param> - void Write(T obj, IDataWriter writer); - - /// <summary> - /// Instantiate the class from the reader. - /// </summary> - /// <param name="reader">The reader from which to read</param> - /// <param name="token">Cancellation token</param> - /// <returns>The instance of type T read from the reader</returns> - Task<T> ReadAsync(IDataReader reader, CancellationToken token); - - /// <summary> - /// Writes the class fields to the writer. - /// </summary> - /// <param name="obj">The object of type T to be encoded</param> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">Cancellation token</param> - Task WriteAsync(T obj, IDataWriter writer, CancellationToken token); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index 170c967..d2b2970 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -131,6 +131,15 @@ under the License. <Compile Include="RX\IStaticObservable.cs" /> <Compile Include="RX\ISubject.cs" /> <Compile Include="RX\ObserverCompletedException.cs" /> + <Compile Include="StreamingCodec\CodecToStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\StringStreamingCodec.cs" /> + <Compile Include="StreamingCodec\IStreamingCodec.cs" /> <Compile Include="Time\Event\Alarm.cs" /> <Compile Include="Time\Event\StartTime.cs" /> <Compile Include="Time\Event\StopTime.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs new file mode 100644 index 0000000..61b39ab --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec +{ + /// <summary> + /// Converts codec to streaming codec + /// </summary> + /// <typeparam name="T">Message type</typeparam> + public sealed class CodecToStreamingCodec<T> : IStreamingCodec<T> + { + private readonly ICodec<T> _codec; + + [Inject] + private CodecToStreamingCodec(ICodec<T> codec) + { + _codec = codec; + } + + public T Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] byteArr = new byte[length]; + reader.Read(ref byteArr, 0, length); + return _codec.Decode(byteArr); + } + + public void Write(T obj, IDataWriter writer) + { + var byteArr = _codec.Encode(obj); + writer.WriteInt32(byteArr.Length); + writer.Write(byteArr, 0, byteArr.Length); + } + + public async Task<T> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] byteArr = new byte[length]; + await reader.ReadAsync(byteArr, 0, length, token); + return _codec.Decode(byteArr); + } + + public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token) + { + var byteArr = _codec.Encode(obj); + await writer.WriteInt32Async(byteArr.Length, token); + await writer.WriteAsync(byteArr, 0, byteArr.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs new file mode 100644 index 0000000..a4d8654 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for double array + /// </summary> + public sealed class DoubleArrayStreamingCodec : IStreamingCodec<double[]> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private DoubleArrayStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The double array read from the reader</returns> + public double[] Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] buffer = new byte[sizeof(double)*length]; + reader.Read(ref buffer, 0, buffer.Length); + double[] doubleArr = new double[length]; + Buffer.BlockCopy(buffer, 0, doubleArr, 0, buffer.Length); + return doubleArr; + } + + /// <summary> + /// Writes the double array to the writer. + /// </summary> + /// <param name="obj">The double array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(double[] obj, IDataWriter writer) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "double array is null"); + } + + writer.WriteInt32(obj.Length); + byte[] buffer = new byte[sizeof(double) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); + writer.Write(buffer, 0, buffer.Length); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The double array read from the reader</returns> + public async Task<double[]> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] buffer = new byte[sizeof(double) * length]; + await reader.ReadAsync(buffer, 0, buffer.Length, token); + double[] doubleArr = new double[length]; + Buffer.BlockCopy(buffer, 0, doubleArr, 0, sizeof(double) * length); + return doubleArr; + } + + /// <summary> + /// Writes the double array to the writer. + /// </summary> + /// <param name="obj">The double array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(double[] obj, IDataWriter writer, CancellationToken token) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "double array is null"); + } + + await writer.WriteInt32Async(obj.Length, token); + byte[] buffer = new byte[sizeof(double) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(double) * obj.Length); + await writer.WriteAsync(buffer, 0, buffer.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs new file mode 100644 index 0000000..6cca28f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for double + /// </summary> + public sealed class DoubleStreamingCodec : IStreamingCodec<double> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private DoubleStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The double read from the reader</returns> + public double Read(IDataReader reader) + { + return reader.ReadDouble(); + } + + /// <summary> + /// Writes the double to the writer. + /// </summary> + /// <param name="obj">The double to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(double obj, IDataWriter writer) + { + writer.WriteDouble(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The double read from the reader</returns> + public async Task<double> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadDoubleAsync(token); + } + + /// <summary> + /// Writes the double to the writer. + /// </summary> + /// <param name="obj">The double to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(double obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteDoubleAsync(obj, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs new file mode 100644 index 0000000..72b6140 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for float array + /// </summary> + public sealed class FloatArrayStreamingCodec : IStreamingCodec<float[]> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private FloatArrayStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The float array read from the reader</returns> + public float[] Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] buffer = new byte[sizeof(float)*length]; + reader.Read(ref buffer, 0, buffer.Length); + float[] floatArr = new float[length]; + Buffer.BlockCopy(buffer, 0, floatArr, 0, buffer.Length); + return floatArr; + } + + /// <summary> + /// Writes the float array to the writer. + /// </summary> + /// <param name="obj">The float array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(float[] obj, IDataWriter writer) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "float array is null"); + } + + writer.WriteInt32(obj.Length); + byte[] buffer = new byte[sizeof(float) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); + writer.Write(buffer, 0, buffer.Length); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The float array read from the reader</returns> + public async Task<float[]> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] buffer = new byte[sizeof(float) * length]; + await reader.ReadAsync(buffer, 0, buffer.Length, token); + float[] floatArr = new float[length]; + Buffer.BlockCopy(buffer, 0, floatArr, 0, sizeof(float) * length); + return floatArr; + } + + /// <summary> + /// Writes the float array to the writer. + /// </summary> + /// <param name="obj">The float array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(float[] obj, IDataWriter writer, CancellationToken token) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "float array is null"); + } + + await writer.WriteInt32Async(obj.Length, token); + byte[] buffer = new byte[sizeof(float) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(float) * obj.Length); + await writer.WriteAsync(buffer, 0, buffer.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs new file mode 100644 index 0000000..7bc6215 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for float + /// </summary> + public sealed class FloatStreamingCodec : IStreamingCodec<float> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private FloatStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The float read from the reader</returns> + public float Read(IDataReader reader) + { + return reader.ReadFloat(); + } + + /// <summary> + /// Writes the float to the writer. + /// </summary> + /// <param name="obj">The float to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(float obj, IDataWriter writer) + { + writer.WriteFloat(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The float read from the reader</returns> + public async Task<float> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadFloatAsync(token); + } + + /// <summary> + /// Writes the float to the writer. + /// </summary> + /// <param name="obj">The float to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(float obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteFloatAsync(obj, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs new file mode 100644 index 0000000..186cae0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for integer array + /// </summary> + public sealed class IntArrayStreamingCodec : IStreamingCodec<int[]> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private IntArrayStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The integer array read from the reader</returns> + public int[] Read(IDataReader reader) + { + int length = reader.ReadInt32(); + byte[] buffer = new byte[sizeof(int)*length]; + reader.Read(ref buffer, 0, buffer.Length); + int[] intArr = new int[length]; + Buffer.BlockCopy(buffer, 0, intArr, 0, buffer.Length); + return intArr; + } + + /// <summary> + /// Writes the integer array to the writer. + /// </summary> + /// <param name="obj">The integer array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(int[] obj, IDataWriter writer) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "integer array is null"); + } + + writer.WriteInt32(obj.Length); + byte[] buffer = new byte[sizeof(int) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length); + writer.Write(buffer, 0, buffer.Length); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The integer array read from the reader</returns> + public async Task<int[]> ReadAsync(IDataReader reader, CancellationToken token) + { + int length = await reader.ReadInt32Async(token); + byte[] buffer = new byte[sizeof(int) * length]; + await reader.ReadAsync(buffer, 0, buffer.Length, token); + int[] intArr = new int[length]; + Buffer.BlockCopy(buffer, 0, intArr, 0, sizeof(int) * length); + return intArr; + } + + /// <summary> + /// Writes the integer array to the writer. + /// </summary> + /// <param name="obj">The integer array to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(int[] obj, IDataWriter writer, CancellationToken token) + { + if (obj == null) + { + throw new ArgumentNullException("obj", "integer array is null"); + } + + await writer.WriteInt32Async(obj.Length, token); + byte[] buffer = new byte[sizeof(int) * obj.Length]; + Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(int) * obj.Length); + await writer.WriteAsync(buffer, 0, buffer.Length, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs new file mode 100644 index 0000000..127dec6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for integer + /// </summary> + public sealed class IntStreamingCodec : IStreamingCodec<int> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private IntStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The iinteger read from the reader</returns> + public int Read(IDataReader reader) + { + return reader.ReadInt32(); + } + + /// <summary> + /// Writes the integer to the writer. + /// </summary> + /// <param name="obj">The integer to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(int obj, IDataWriter writer) + { + writer.WriteInt32(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The integer read from the reader</returns> + public async Task<int> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadInt32Async(token); + } + + /// <summary> + /// Writes the integer to the writer. + /// </summary> + /// <param name="obj">The integer to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(int obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteInt32Async(obj, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs new file mode 100644 index 0000000..8bbfb8e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for string + /// </summary> + public sealed class StringStreamingCodec : IStreamingCodec<string> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private StringStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The string read from the reader</returns> + public string Read(IDataReader reader) + { + return reader.ReadString(); + } + + /// <summary> + /// Writes the string to the writer. + /// </summary> + /// <param name="obj">The string to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(string obj, IDataWriter writer) + { + writer.WriteString(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The string read from the reader</returns> + public async Task<string> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadStringAsync(token); + } + + /// <summary> + /// Writes the string to the writer. + /// </summary> + /// <param name="obj">The string to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(string obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(obj, token); + } + } +}
