Repository: incubator-reef
Updated Branches:
  refs/heads/master 2a09c826a -> 4151f059c


[REEF-555] Introduce MapInputWithControlMessage

This addressed the issue by implementing classes
  * TMapInputWithControlMessage - include both input for mapper as well
    as the message of whether to continue or stop. Input will be present
    only if we want to continue.
  * TMapInputWithControlMessageCodec
  * TMapInputWithControlMessagePipelineDataConverter that uses
    PipelineConverter of TMapInput as the base converter.

JIRA:
  [REEF-555](https://issues.apache.org/jira/browse/REEF-555)

Pull Request:
  This closes #338


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4151f059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4151f059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4151f059

Branch: refs/heads/master
Commit: 4151f059c6668839f9bfa01fd673bbaadcdf4aa1
Parents: 2a09c82
Author: Dhruv <[email protected]>
Authored: Wed Aug 5 14:44:27 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 6 15:38:23 2015 -0700

----------------------------------------------------------------------
 .../MapInputWithControlMessageTests.cs          | 195 +++++++++++++++++++
 .../Org.Apache.REEF.IMRU.Tests.csproj           |   1 +
 .../MapControlMessage.cs                        |  28 +++
 .../MapInputWithControlMessage.cs               |  58 ++++++
 .../MapInputWithControlMessageCodec.cs          | 141 ++++++++++++++
 ...utwithControlMessagePipelineDataConverter.cs |  93 +++++++++
 .../Org.Apache.REEF.IMRU.csproj                 |   4 +
 7 files changed, 520 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
new file mode 100644
index 0000000..cf7d9e1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapInputWithControlMessageTests.cs
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.CodeDom;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.IMRU.OnREEF;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+    [TestClass]
+    public class MapInputWithControlMessageTests
+    {
+        /// <summary>
+        /// Tests the codec for TMapInputWithControlMessage
+        /// </summary>
+        [TestMethod]
+        public void TestMapInputWithControlMessageCodec()
+        {
+            float[] baseMessage = {0, 1};
+
+            var config = TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindImplementation(GenericType<IStreamingCodec<float[]>>.Class,
+                    GenericType<FloatArrayStreamingCodec>.Class)
+                .Build();
+
+            IStreamingCodec<MapInputWithControlMessage<float[]>> codec =
+                
TangFactory.GetTang().NewInjector(config).GetInstance<MapInputWithControlMessageCodec<float[]>>();
+
+            MemoryStream stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+
+            codec.Write(new MapInputWithControlMessage<float[]>(baseMessage, 
MapControlMessage.AnotherRound), writer);
+            codec.Write(new 
MapInputWithControlMessage<float[]>(MapControlMessage.Stop), writer);
+
+            stream.Position = 0;
+            IDataReader reader = new StreamDataReader(stream);
+
+            var message1 = codec.Read(reader);
+            var message2 = codec.Read(reader);
+
+            Assert.AreEqual(message1.Message[0], baseMessage[0]);
+            Assert.AreEqual(message1.Message[1], baseMessage[1]);
+            Assert.IsNull(message2.Message);
+            Assert.AreEqual(message1.ControlMessage, 
MapControlMessage.AnotherRound);
+            Assert.AreEqual(message2.ControlMessage, MapControlMessage.Stop);
+        }
+
+        /// <summary>
+        /// Tests the pipelining Data converter for TMapInputWithControlMessage
+        /// </summary>
+        [TestMethod]
+        public void TestMapInputPipelining()
+        {
+            int chunkSize = 2;
+
+            var config = TangFactory.GetTang().NewConfigurationBuilder(
+                PipelineDataConverterConfiguration<int[]>.Conf
+                    
.Set(PipelineDataConverterConfiguration<int[]>.DataConverter,
+                        GenericType<PipelineIntDataConverter>.Class)
+                    .Build()).BindNamedParameter<ChunkSize, int>(
+                        GenericType<ChunkSize>.Class,
+                        
chunkSize.ToString(CultureInfo.InvariantCulture)).Build();
+
+            IPipelineDataConverter<MapInputWithControlMessage<int[]>> 
dataConverter =
+                TangFactory.GetTang()
+                    .NewInjector(config)
+                    
.GetInstance<MapInputwithControlMessagePipelineDataConverter<int[]>>();
+
+            int[] baseMessage = {1, 2, 3};
+
+            var chunks1 = dataConverter.PipelineMessage(new 
MapInputWithControlMessage<int[]>(baseMessage,
+                MapControlMessage.AnotherRound));
+
+            var chunks2 = dataConverter.PipelineMessage(new 
MapInputWithControlMessage<int[]>(MapControlMessage.Stop));
+
+            Assert.AreEqual(chunks1.Count, 2);
+            Assert.IsTrue(chunks1[0].Data.Message.Length == 2);
+            Assert.IsTrue(chunks1[1].Data.Message.Length == 1);
+            Assert.AreEqual(chunks1[0].Data.Message[0], baseMessage[0]);
+            Assert.AreEqual(chunks1[0].Data.Message[1], baseMessage[1]);
+            Assert.AreEqual(chunks1[1].Data.Message[0], baseMessage[2]);
+            Assert.AreEqual(chunks1[0].Data.ControlMessage, 
MapControlMessage.AnotherRound);
+            Assert.AreEqual(chunks1[1].Data.ControlMessage, 
MapControlMessage.AnotherRound);
+            Assert.AreEqual(chunks1[0].IsLast, false);
+            Assert.AreEqual(chunks1[1].IsLast, true);
+
+            Assert.AreEqual(chunks2.Count, 1);
+            Assert.IsNull(chunks2[0].Data.Message);
+            Assert.AreEqual(chunks2[0].Data.ControlMessage, 
MapControlMessage.Stop);
+            Assert.AreEqual(chunks2[0].IsLast, true);
+
+            var fullMessage1 = dataConverter.FullMessage(chunks1);
+            var fullMessage2 = dataConverter.FullMessage(chunks2);
+
+            Assert.AreEqual(fullMessage1.Message[0], baseMessage[0]);
+            Assert.AreEqual(fullMessage1.Message[1], baseMessage[1]);
+            Assert.AreEqual(fullMessage1.Message[2], baseMessage[2]);
+            Assert.AreEqual(fullMessage1.ControlMessage, 
chunks1[0].Data.ControlMessage);
+            Assert.IsNull(fullMessage2.Message);
+            Assert.AreEqual(fullMessage2.ControlMessage, 
chunks2[0].Data.ControlMessage);
+        }
+
+        [NamedParameter("Chunk size.")]
+        private sealed class ChunkSize : Name<int>
+        {
+        }
+
+        private class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+        {
+            private readonly int _chunkSize;
+
+            [Inject]
+            private PipelineIntDataConverter([Parameter(typeof(ChunkSize))] 
int chunkSize)
+            {
+                _chunkSize = chunkSize;
+            }
+
+            public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
+            {
+                var messageList = new List<PipelineMessage<int[]>>();
+                var totalChunks = message.Length / _chunkSize;
+
+                if (message.Length % _chunkSize != 0)
+                {
+                    totalChunks++;
+                }
+
+                var counter = 0;
+                for (var i = 0; i < message.Length; i += _chunkSize)
+                {
+                    var data = new int[Math.Min(_chunkSize, message.Length - 
i)];
+                    Buffer.BlockCopy(message, i * sizeof(int), data, 0, 
data.Length * sizeof(int));
+
+                    messageList.Add(counter == totalChunks - 1
+                        ? new PipelineMessage<int[]>(data, true)
+                        : new PipelineMessage<int[]>(data, false));
+
+                    counter++;
+                }
+
+                return messageList;
+            }
+
+            public int[] FullMessage(List<PipelineMessage<int[]>> 
pipelineMessage)
+            {
+                var size = pipelineMessage.Select(x => x.Data.Length).Sum();
+                var data = new int[size];
+                var offset = 0;
+
+                foreach (var message in pipelineMessage)
+                {
+                    Buffer.BlockCopy(message.Data, 0, data, offset, 
message.Data.Length * sizeof(int));
+                    offset += message.Data.Length * sizeof(int);
+                }
+
+                return data;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
index 9807cbe..8288c77 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
@@ -40,6 +40,7 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="MapInputWithControlMessageTests.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="MapperCountTest.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
new file mode 100644
index 0000000..e0a6eb1
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapControlMessage.cs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+    /// <summary>
+    /// Control Message telling Map Tasks what to do
+    /// </summary>
+    internal enum MapControlMessage
+    {
+        AnotherRound, //Do another round of map function
+        Stop //Stop
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
new file mode 100644
index 0000000..8ca5eee
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+    /// <summary>
+    /// Input to Map task
+    /// Containes both actual message of type TMapInput and control 
+    /// message from UpdateTask
+    /// </summary>
+    /// <typeparam name="TMapInput"></typeparam>
+    internal class MapInputWithControlMessage<TMapInput>
+    {
+        /// <summary>
+        /// Internal constructor
+        /// </summary>
+        /// <param name="controlMessage">Control message from Update 
Function</param>
+        internal MapInputWithControlMessage(MapControlMessage controlMessage)
+        {
+            ControlMessage = controlMessage;
+        }
+
+        /// <summary>
+        /// Internal constructor
+        /// </summary>
+        /// <param name="input">Actual map input</param>
+        /// <param name="controlMessage">Control message from Update 
Function</param>
+        internal MapInputWithControlMessage(TMapInput input, MapControlMessage 
controlMessage)
+        {
+            Message = input;
+            ControlMessage = controlMessage;
+        }
+
+        /// <summary>
+        /// Actual input for Mappers
+        /// </summary>
+        internal TMapInput Message { get; set; }
+
+        /// <summary>
+        /// Control message from Update Task to Map task
+        /// </summary>
+        internal MapControlMessage ControlMessage { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
new file mode 100644
index 0000000..644b487
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessageCodec.cs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+    /// <summary>
+    /// Streaming codec for MapInputWithControlMessage
+    /// </summary>
+    /// <typeparam name="TMapInput"></typeparam>
+    internal class MapInputWithControlMessageCodec<TMapInput> : 
IStreamingCodec<MapInputWithControlMessage<TMapInput>>
+    {
+        private static Logger Logger = 
Logger.GetLogger(typeof(MapInputWithControlMessage<>));
+        private readonly IStreamingCodec<TMapInput> _baseCodec;
+
+        [Inject]
+        private MapInputWithControlMessageCodec(IStreamingCodec<TMapInput> 
baseCodec)
+        {
+            _baseCodec = baseCodec;
+        }
+
+        /// <summary>
+        /// Reads message from reader
+        /// </summary>
+        /// <param name="reader">reader from which to read the message</param>
+        /// <returns>Read message</returns>
+        MapInputWithControlMessage<TMapInput> 
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.Read(
+            IDataReader reader)
+        {
+            byte[] messageType = new byte[1];
+            reader.Read(ref messageType, 0, 1);
+            MapControlMessage controlMessage;
+
+            switch (messageType[0])
+            {
+                case 0:
+                    controlMessage = MapControlMessage.AnotherRound;
+                    TMapInput message = _baseCodec.Read(reader);
+                    return new MapInputWithControlMessage<TMapInput>(message, 
controlMessage);                   
+                case 1:
+                    controlMessage = MapControlMessage.Stop;
+                    return new 
MapInputWithControlMessage<TMapInput>(controlMessage);
+            }
+
+            Exceptions.Throw(new Exception("Control message type not valid in 
Codec read"), Logger);
+            return null;
+        }
+
+        /// <summary>
+        /// Writes message to the writer
+        /// </summary>
+        /// <param name="obj">Message to write</param>
+        /// <param name="writer">Writer used to write the message</param>
+        void 
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.Write(MapInputWithControlMessage<TMapInput>
 obj,
+            IDataWriter writer)
+        {
+            switch (obj.ControlMessage)
+            {
+                case MapControlMessage.AnotherRound:
+                    writer.Write(new byte[] {0}, 0, 1);
+                    _baseCodec.Write(obj.Message, writer);
+                    break;
+                case MapControlMessage.Stop:
+                    writer.Write(new byte[] {1}, 0, 1);
+                    break;
+            }
+        }
+
+        /// <summary>
+        /// Reads message asynchronously from reader
+        /// </summary>
+        /// <param name="reader">reader from which to read the message</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>Read message</returns>
+        async Task<MapInputWithControlMessage<TMapInput>> 
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.
+            ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            byte[] messageType = new byte[1];
+            await reader.ReadAsync(messageType, 0, 1, token);
+            MapControlMessage controlMessage = MapControlMessage.AnotherRound;
+
+            switch (messageType[0])
+            {
+                case 0:
+                    controlMessage = MapControlMessage.AnotherRound;
+                    TMapInput message = await _baseCodec.ReadAsync(reader, 
token);
+                    return new MapInputWithControlMessage<TMapInput>(message, 
controlMessage);
+                case 1:
+                    controlMessage = MapControlMessage.Stop;
+                    return new 
MapInputWithControlMessage<TMapInput>(controlMessage);
+            }
+
+            Exceptions.Throw(new Exception("Control message type not valis in 
Codec read"), Logger);
+            return null;
+        }
+
+        /// <summary>
+        /// Writes message asynchronously to the writer
+        /// </summary>
+        /// <param name="obj">Message to write</param>
+        /// <param name="writer">Writer used to write the message</param>
+        /// <param name="token">Cancellation token</param>
+        async Task 
IStreamingCodec<MapInputWithControlMessage<TMapInput>>.WriteAsync(
+            MapInputWithControlMessage<TMapInput> obj, IDataWriter writer, 
CancellationToken token)
+        {
+            switch (obj.ControlMessage)
+            {
+                case MapControlMessage.AnotherRound:
+                    await writer.WriteAsync(new byte[] { 0 }, 0, 1, token);
+                    await _baseCodec.WriteAsync(obj.Message, writer, token);
+                    break;
+                case MapControlMessage.Stop:
+                    writer.Write(new byte[] { 1 }, 0, 1);
+                    break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
new file mode 100644
index 0000000..a8a1c47
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputwithControlMessagePipelineDataConverter.cs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage
+{
+    /// <summary>
+    /// Pipeline Data Converter for MapInputwithControlMessage to chunk and 
dechunk the message for communication
+    /// </summary>
+    /// <typeparam name="TMapInput"></typeparam>
+    internal class MapInputwithControlMessagePipelineDataConverter<TMapInput> :
+        IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>
+    {
+        private readonly IPipelineDataConverter<TMapInput> 
_basePipelineDataConverter;
+
+        [Inject]
+        internal MapInputwithControlMessagePipelineDataConverter(
+            IPipelineDataConverter<TMapInput> basePipelineDataConverter)
+        {
+            _basePipelineDataConverter = basePipelineDataConverter;
+        }
+
+        /// <summary>
+        /// Chunks the message
+        /// </summary>
+        /// <param name="message">Message to be chunked</param>
+        /// <returns>List of message chunks</returns>
+        List<PipelineMessage<MapInputWithControlMessage<TMapInput>>>
+            
IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>.PipelineMessage(
+            MapInputWithControlMessage<TMapInput> message)
+        {
+            List<PipelineMessage<MapInputWithControlMessage<TMapInput>>> 
messageChunks =
+                new 
List<PipelineMessage<MapInputWithControlMessage<TMapInput>>>();
+
+            if (message.ControlMessage == MapControlMessage.Stop)
+            {
+                messageChunks.Add(new 
PipelineMessage<MapInputWithControlMessage<TMapInput>>(message, true));
+                return messageChunks;
+            }
+
+            var baseMessageChunks = 
_basePipelineDataConverter.PipelineMessage(message.Message);
+
+            messageChunks.AddRange(
+                baseMessageChunks.Select(
+                    t =>
+                        new 
PipelineMessage<MapInputWithControlMessage<TMapInput>>(
+                            new MapInputWithControlMessage<TMapInput>(t.Data, 
message.ControlMessage), t.IsLast)));
+
+            return messageChunks;
+        }
+
+        /// <summary>
+        /// Dechunks the message
+        /// </summary>
+        /// <param name="pipelineMessage">Message chunks</param>
+        /// <returns>Single aggregated message</returns>
+        MapInputWithControlMessage<TMapInput> 
IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>.FullMessage(
+            List<PipelineMessage<MapInputWithControlMessage<TMapInput>>> 
pipelineMessage)
+        {
+            if (pipelineMessage.Count == 1)
+            {
+                return pipelineMessage[0].Data;
+            }
+
+            var baseMessageChunks =
+                pipelineMessage.Select(x => new 
PipelineMessage<TMapInput>(x.Data.Message, false)).ToList();
+
+            MapInputWithControlMessage<TMapInput> combinedMessage =
+                new 
MapInputWithControlMessage<TMapInput>(_basePipelineDataConverter.FullMessage(baseMessageChunks),
+                    pipelineMessage[0].Data.ControlMessage);
+
+            return combinedMessage;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4151f059/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index bdafd6b..ca0ae8e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -60,6 +60,10 @@ under the License.
     <Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
     <Compile Include="InProcess\MapFunctions.cs" />
     <Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
+    <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" 
/>
+    <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
+    <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" 
/>
+    <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
 />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>

Reply via email to