Repository: incubator-reef
Updated Branches:
refs/heads/master 2a09c826a -> 4151f059c
[REEF-555] Introduce MapInputWithControlMessage
This addressed the issue by implementing classes
* TMapInputWithControlMessage - include both input for mapper as well
as the message of whether to continue or stop. Input will be present
only if we want to continue.
* TMapInputWithControlMessageCodec
* TMapInputWithControlMessagePipelineDataConverter that uses
PipelineConverter of TMapInput as the base converter.
JIRA:
[REEF-555](https://issues.apache.org/jira/browse/REEF-555)
Pull Request:
This closes #338
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4151f059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4151f059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4151f059
Branch: refs/heads/master
Commit: 4151f059c6668839f9bfa01fd673bbaadcdf4aa1
Parents: 2a09c82
Author: Dhruv <[email protected]>
Authored: Wed Aug 5 14:44:27 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 6 15:38:23 2015 -0700
----------------------------------------------------------------------
.../MapInputWithControlMessageTests.cs | 195 +++++++++++++++++++
.../Org.Apache.REEF.IMRU.Tests.csproj | 1 +
.../MapControlMessage.cs | 28 +++
.../MapInputWithControlMessage.cs | 58 ++++++
.../MapInputWithControlMessageCodec.cs | 141 ++++++++++++++
...utwithControlMessagePipelineDataConverter.cs | 93 +++++++++
.../Org.Apache.REEF.IMRU.csproj | 4 +
7 files changed, 520 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
new file mode 100644
index 0000000..cf7d9e1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.CodeDom;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.IMRU.OnREEF;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+ [TestClass]
+ public class MapInputWithControlMessageTests
+ {
+ /// <summary>
+ /// Tests the codec for TMapInputWithControlMessage
+ /// </summary>
+ [TestMethod]
+ public void TestMapInputWithControlMessageCodec()
+ {
+ float[] baseMessage = {0, 1};
+
+ var config = TangFactory.GetTang().NewConfigurationBuilder()
+
.BindImplementation(GenericType<IStreamingCodec<float[]>>.Class,
+ GenericType<FloatArrayStreamingCodec>.Class)
+ .Build();
+
+ IStreamingCodec<MapInputWithControlMessage<float[]>> codec =
+
TangFactory.GetTang().NewInjector(config).GetInstance<MapInputWithControlMessageCodec<float[]>>();
+
+ MemoryStream stream = new MemoryStream();
+ IDataWriter writer = new StreamDataWriter(stream);
+
+ codec.Write(new MapInputWithControlMessage<float[]>(baseMessage,
MapControlMessage.AnotherRound), writer);
+ codec.Write(new
MapInputWithControlMessage<float[]>(MapControlMessage.Stop), writer);
+
+ stream.Position = 0;
+ IDataReader reader = new StreamDataReader(stream);
+
+ var message1 = codec.Read(reader);
+ var message2 = codec.Read(reader);
+
+ Assert.AreEqual(message1.Message[0], baseMessage[0]);
+ Assert.AreEqual(message1.Message[1], baseMessage[1]);
+ Assert.IsNull(message2.Message);
+ Assert.AreEqual(message1.ControlMessage,
MapControlMessage.AnotherRound);
+ Assert.AreEqual(message2.ControlMessage, MapControlMessage.Stop);
+ }
+
+ /// <summary>
+ /// Tests the pipelining Data converter for TMapInputWithControlMessage
+ /// </summary>
+ [TestMethod]
+ public void TestMapInputPipelining()
+ {
+ int chunkSize = 2;
+
+ var config = TangFactory.GetTang().NewConfigurationBuilder(
+ PipelineDataConverterConfiguration<int[]>.Conf
+
.Set(PipelineDataConverterConfiguration<int[]>.DataConverter,
+ GenericType<PipelineIntDataConverter>.Class)
+ .Build()).BindNamedParameter<ChunkSize, int>(
+ GenericType<ChunkSize>.Class,
+
chunkSize.ToString(CultureInfo.InvariantCulture)).Build();
+
+ IPipelineDataConverter<MapInputWithControlMessage<int[]>>
dataConverter =
+ TangFactory.GetTang()
+ .NewInjector(config)
+
.GetInstance<MapInputwithControlMessagePipelineDataConverter<int[]>>();
+
+ int[] baseMessage = {1, 2, 3};
+
+ var chunks1 = dataConverter.PipelineMessage(new
MapInputWithControlMessage<int[]>(baseMessage,
+ MapControlMessage.AnotherRound));
+
+ var chunks2 = dataConverter.PipelineMessage(new
MapInputWithControlMessage<int[]>(MapControlMessage.Stop));
+
+ Assert.AreEqual(chunks1.Count, 2);
+ Assert.IsTrue(chunks1[0].Data.Message.Length == 2);
+ Assert.IsTrue(chunks1[1].Data.Message.Length == 1);
+ Assert.AreEqual(chunks1[0].Data.Message[0], baseMessage[0]);
+ Assert.AreEqual(chunks1[0].Data.Message[1], baseMessage[1]);
+ Assert.AreEqual(chunks1[1].Data.Message[0], baseMessage[2]);
+ Assert.AreEqual(chunks1[0].Data.ControlMessage,
MapControlMessage.AnotherRound);
+ Assert.AreEqual(chunks1[1].Data.ControlMessage,
MapControlMessage.AnotherRound);
+ Assert.AreEqual(chunks1[0].IsLast, false);
+ Assert.AreEqual(chunks1[1].IsLast, true);
+
+ Assert.AreEqual(chunks2.Count, 1);
+ Assert.IsNull(chunks2[0].Data.Message);
+ Assert.AreEqual(chunks2[0].Data.ControlMessage,
MapControlMessage.Stop);
+ Assert.AreEqual(chunks2[0].IsLast, true);
+
+ var fullMessage1 = dataConverter.FullMessage(chunks1);
+ var fullMessage2 = dataConverter.FullMessage(chunks2);
+
+ Assert.AreEqual(fullMessage1.Message[0], baseMessage[0]);
+ Assert.AreEqual(fullMessage1.Message[1], baseMessage[1]);
+ Assert.AreEqual(fullMessage1.Message[2], baseMessage[2]);
+ Assert.AreEqual(fullMessage1.ControlMessage,
chunks1[0].Data.ControlMessage);
+ Assert.IsNull(fullMessage2.Message);
+ Assert.AreEqual(fullMessage2.ControlMessage,
chunks2[0].Data.ControlMessage);
+ }
+
+ [NamedParameter("Chunk size.")]
+ private sealed class ChunkSize : Name<int>
+ {
+ }
+
+ private class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+ {
+ private readonly int _chunkSize;
+
+ [Inject]
+ private PipelineIntDataConverter([Parameter(typeof(ChunkSize))]
int chunkSize)
+ {
+ _chunkSize = chunkSize;
+ }
+
+ public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
+ {
+ var messageList = new List<PipelineMessage<int[]>>();
+ var totalChunks = message.Length / _chunkSize;
+
+ if (message.Length % _chunkSize != 0)
+ {
+ totalChunks++;
+ }
+
+ var counter = 0;
+ for (var i = 0; i < message.Length; i += _chunkSize)
+ {
+ var data = new int[Math.Min(_chunkSize, message.Length -
i)];
+ Buffer.BlockCopy(message, i * sizeof(int), data, 0,
data.Length * sizeof(int));
+
+ messageList.Add(counter == totalChunks - 1
+ ? new PipelineMessage<int[]>(data, true)
+ : new PipelineMessage<int[]>(data, false));
+
+ counter++;
+ }
+
+ return messageList;
+ }
+
+ public int[] FullMessage(List<PipelineMessage<int[]>>
pipelineMessage)
+ {
+ var size = pipelineMessage.Select(x => x.Data.Length).Sum();
+ var data = new int[size];
+ var offset = 0;
+
+ foreach (var message in pipelineMessage)
+ {
+ Buffer.BlockCopy(message.Data, 0, data, offset,
message.Data.Length * sizeof(int));
+ offset += message.Data.Length * sizeof(int);
+ }
+
+ return data;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
index 9807cbe..8288c77 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
@@ -40,6 +40,7 @@ under the License.
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="MapInputWithControlMessageTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="MapperCountTest.cs" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
new file mode 100644
index 0000000..e0a6eb1
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+ /// <summary>
+ /// Control Message telling Map Tasks what to do
+ /// </summary>
+ internal enum MapControlMessage
+ {
+ AnotherRound, //Do another round of map function
+ Stop //Stop
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
new file mode 100644
index 0000000..8ca5eee
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+ /// <summary>
+ /// Input to Map task
+ /// Containes both actual message of type TMapInput and control
+ /// message from UpdateTask
+ /// </summary>
+ /// <typeparam name="TMapInput"></typeparam>
+ internal class MapInputWithControlMessage<TMapInput>
+ {
+ /// <summary>
+ /// Internal constructor
+ /// </summary>
+ /// <param name="controlMessage">Control message from Update
Function</param>
+ internal MapInputWithControlMessage(MapControlMessage controlMessage)
+ {
+ ControlMessage = controlMessage;
+ }
+
+ /// <summary>
+ /// Internal constructor
+ /// </summary>
+ /// <param name="input">Actual map input</param>
+ /// <param name="controlMessage">Control message from Update
Function</param>
+ internal MapInputWithControlMessage(TMapInput input, MapControlMessage
controlMessage)
+ {
+ Message = input;
+ ControlMessage = controlMessage;
+ }
+
+ /// <summary>
+ /// Actual input for Mappers
+ /// </summary>
+ internal TMapInput Message { get; set; }
+
+ /// <summary>
+ /// Control message from Update Task to Map task
+ /// </summary>
+ internal MapControlMessage ControlMessage { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
new file mode 100644
index 0000000..644b487
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+ /// <summary>
+ /// Streaming codec for MapInputWithControlMessage
+ /// </summary>
+ /// <typeparam name="TMapInput"></typeparam>
+ internal class MapInputWithControlMessageCodec<TMapInput> :
IStreamingCodec<MapInputWithControlMessage<TMapInput>>
+ {
+ private static Logger Logger =
Logger.GetLogger(typeof(MapInputWithControlMessage<>));
+ private readonly IStreamingCodec<TMapInput> _baseCodec;
+
+ [Inject]
+ private MapInputWithControlMessageCodec(IStreamingCodec<TMapInput>
baseCodec)
+ {
+ _baseCodec = baseCodec;
+ }
+
+ /// <summary>
+ /// Reads message from reader
+ /// </summary>
+ /// <param name="reader">reader from which to read the message</param>
+ /// <returns>Read message</returns>
+ MapInputWithControlMessage<TMapInput>
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.Read(
+ IDataReader reader)
+ {
+ byte[] messageType = new byte[1];
+ reader.Read(ref messageType, 0, 1);
+ MapControlMessage controlMessage;
+
+ switch (messageType[0])
+ {
+ case 0:
+ controlMessage = MapControlMessage.AnotherRound;
+ TMapInput message = _baseCodec.Read(reader);
+ return new MapInputWithControlMessage<TMapInput>(message,
controlMessage);
+ case 1:
+ controlMessage = MapControlMessage.Stop;
+ return new
MapInputWithControlMessage<TMapInput>(controlMessage);
+ }
+
+ Exceptions.Throw(new Exception("Control message type not valid in
Codec read"), Logger);
+ return null;
+ }
+
+ /// <summary>
+ /// Writes message to the writer
+ /// </summary>
+ /// <param name="obj">Message to write</param>
+ /// <param name="writer">Writer used to write the message</param>
+ void
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.Write(MapInputWithControlMessage<TMapInput>
obj,
+ IDataWriter writer)
+ {
+ switch (obj.ControlMessage)
+ {
+ case MapControlMessage.AnotherRound:
+ writer.Write(new byte[] {0}, 0, 1);
+ _baseCodec.Write(obj.Message, writer);
+ break;
+ case MapControlMessage.Stop:
+ writer.Write(new byte[] {1}, 0, 1);
+ break;
+ }
+ }
+
+ /// <summary>
+ /// Reads message asynchronously from reader
+ /// </summary>
+ /// <param name="reader">reader from which to read the message</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>Read message</returns>
+ async Task<MapInputWithControlMessage<TMapInput>>
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.
+ ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ byte[] messageType = new byte[1];
+ await reader.ReadAsync(messageType, 0, 1, token);
+ MapControlMessage controlMessage = MapControlMessage.AnotherRound;
+
+ switch (messageType[0])
+ {
+ case 0:
+ controlMessage = MapControlMessage.AnotherRound;
+ TMapInput message = await _baseCodec.ReadAsync(reader,
token);
+ return new MapInputWithControlMessage<TMapInput>(message,
controlMessage);
+ case 1:
+ controlMessage = MapControlMessage.Stop;
+ return new
MapInputWithControlMessage<TMapInput>(controlMessage);
+ }
+
+ Exceptions.Throw(new Exception("Control message type not valis in
Codec read"), Logger);
+ return null;
+ }
+
+ /// <summary>
+ /// Writes message asynchronously to the writer
+ /// </summary>
+ /// <param name="obj">Message to write</param>
+ /// <param name="writer">Writer used to write the message</param>
+ /// <param name="token">Cancellation token</param>
+ async Task
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.WriteAsync(
+ MapInputWithControlMessage<TMapInput> obj, IDataWriter writer,
CancellationToken token)
+ {
+ switch (obj.ControlMessage)
+ {
+ case MapControlMessage.AnotherRound:
+ await writer.WriteAsync(new byte[] { 0 }, 0, 1, token);
+ await _baseCodec.WriteAsync(obj.Message, writer, token);
+ break;
+ case MapControlMessage.Stop:
+ writer.Write(new byte[] { 1 }, 0, 1);
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
new file mode 100644
index 0000000..a8a1c47
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+ /// <summary>
+ /// Pipeline Data Converter for MapInputwithControlMessage to chunk and
dechunk the message for communication
+ /// </summary>
+ /// <typeparam name="TMapInput"></typeparam>
+ internal class MapInputwithControlMessagePipelineDataConverter<TMapInput> :
+ IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>
+ {
+ private readonly IPipelineDataConverter<TMapInput>
_basePipelineDataConverter;
+
+ [Inject]
+ internal MapInputwithControlMessagePipelineDataConverter(
+ IPipelineDataConverter<TMapInput> basePipelineDataConverter)
+ {
+ _basePipelineDataConverter = basePipelineDataConverter;
+ }
+
+ /// <summary>
+ /// Chunks the message
+ /// </summary>
+ /// <param name="message">Message to be chunked</param>
+ /// <returns>List of message chunks</returns>
+ List<PipelineMessage<MapInputWithControlMessage<TMapInput>>>
+
IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>.PipelineMessage(
+ MapInputWithControlMessage<TMapInput> message)
+ {
+ List<PipelineMessage<MapInputWithControlMessage<TMapInput>>>
messageChunks =
+ new
List<PipelineMessage<MapInputWithControlMessage<TMapInput>>>();
+
+ if (message.ControlMessage == MapControlMessage.Stop)
+ {
+ messageChunks.Add(new
PipelineMessage<MapInputWithControlMessage<TMapInput>>(message, true));
+ return messageChunks;
+ }
+
+ var baseMessageChunks =
_basePipelineDataConverter.PipelineMessage(message.Message);
+
+ messageChunks.AddRange(
+ baseMessageChunks.Select(
+ t =>
+ new
PipelineMessage<MapInputWithControlMessage<TMapInput>>(
+ new MapInputWithControlMessage<TMapInput>(t.Data,
message.ControlMessage), t.IsLast)));
+
+ return messageChunks;
+ }
+
+ /// <summary>
+ /// Dechunks the message
+ /// </summary>
+ /// <param name="pipelineMessage">Message chunks</param>
+ /// <returns>Single aggregated message</returns>
+ MapInputWithControlMessage<TMapInput>
IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>.FullMessage(
+ List<PipelineMessage<MapInputWithControlMessage<TMapInput>>>
pipelineMessage)
+ {
+ if (pipelineMessage.Count == 1)
+ {
+ return pipelineMessage[0].Data;
+ }
+
+ var baseMessageChunks =
+ pipelineMessage.Select(x => new
PipelineMessage<TMapInput>(x.Data.Message, false)).ToList();
+
+ MapInputWithControlMessage<TMapInput> combinedMessage =
+ new
MapInputWithControlMessage<TMapInput>(_basePipelineDataConverter.FullMessage(baseMessageChunks),
+ pipelineMessage[0].Data.ControlMessage);
+
+ return combinedMessage;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index bdafd6b..ca0ae8e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -60,6 +60,10 @@ under the License.
<Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
<Compile Include="InProcess\MapFunctions.cs" />
<Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
+ <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs"
/>
+ <Compile
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
+ <Compile
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs"
/>
+ <Compile
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
/>
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>