Repository: incubator-reef
Updated Branches:
  refs/heads/master d3a02c464 -> c43610fb1


[REEF-611] Introduce Integer Array BroadcastReduce IMRU example code

This addressed the issue by
  * implementing Update function returns an integer array containing
    value k+1 in iteration k and gets as input the sum of arrays from
    all mappers.
  * implementing Map function returns the same array it gets as input
    from update function.
  * specifying corresponding job definition

JIRA:
  [REEF-611](https://issues.apache.org/jira/browse/REEF-611)

Pull Request:
  This closes #400


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c43610fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c43610fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c43610fb

Branch: refs/heads/master
Commit: c43610fb154dc9a6026c9501b4ea6f902f7f1507
Parents: d3a02c4
Author: Dhruv <[email protected]>
Authored: Fri Aug 21 11:56:51 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 21 13:42:52 2015 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.IMRU.Examples.csproj        |   6 ++
 .../BroadcastReceiverReduceSenderMapFunction.cs |  61 +++++++++++
 .../BroadcastReduceConfiguration.cs             |  46 ++++++++
 ...oadcastSenderReduceReceiverUpdateFunction.cs |  98 +++++++++++++++++
 .../IntArraySumReduceFunction.cs                |  63 +++++++++++
 .../PipelineIntDataConverter.cs                 |  92 ++++++++++++++++
 .../PipelinedBroadcastAndReduce.cs              | 105 +++++++++++++++++++
 7 files changed, 471 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 2072827..ceaa644 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,6 +46,12 @@ under the License.
     <Compile Include="MapperCount\IntSumReduceFunction.cs" />
     <Compile Include="MapperCount\MapperCount.cs" />
     <Compile Include="MapperCount\MapperCountUpdateFunction.cs" />
+    <Compile 
Include="PipelinedBroadcastReduce\BroadcastReceiverReduceSenderMapFunction.cs" 
/>
+    <Compile 
Include="PipelinedBroadcastReduce\BroadcastReduceConfiguration.cs" />
+    <Compile 
Include="PipelinedBroadcastReduce\BroadcastSenderReduceReceiverUpdateFunction.cs"
 />
+    <Compile Include="PipelinedBroadcastReduce\IntArraySumReduceFunction.cs" />
+    <Compile Include="PipelinedBroadcastReduce\PipelinedBroadcastAndReduce.cs" 
/>
+    <Compile Include="PipelinedBroadcastReduce\PipelineIntDataConverter.cs" />
     <Compile Include="Run.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
new file mode 100644
index 0000000..30b67b5
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// Map function for integer array broadcast and reduce
+    /// </summary>
+    internal sealed class BroadcastReceiverReduceSenderMapFunction : 
IMapFunction<int[], int[]>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(BroadcastReceiverReduceSenderMapFunction));
+
+        private int _iterations;
+
+        [Inject]
+        private BroadcastReceiverReduceSenderMapFunction()
+        {
+        }
+
+        /// <summary>
+        /// Map function
+        /// </summary>
+        /// <param name="mapInput">integer array</param>
+        /// <returns>The same integer array</returns>
+        int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+        {
+            _iterations++;
+
+            Logger.Log(Level.Info, string.Format("Received value {0}", 
mapInput[0]));
+
+            if (mapInput[0] != _iterations)
+            {
+                Exceptions.Throw(new Exception("Expected value in mappers 
different from actual value"), Logger);
+            }
+
+            return mapInput;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
new file mode 100644
index 0000000..dd27978
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    internal static class BroadcastReduceConfiguration
+    {
+        [NamedParameter("Number of Iterations")]
+        internal class NumberOfIterations : Name<int>
+        {
+        }
+
+        [NamedParameter("Number of Dims.")]
+        internal class Dimensions : Name<int>
+        {
+        }
+
+        [NamedParameter("Number of Workers")]
+        internal class NumWorkers : Name<int>
+        {
+        }
+
+        [NamedParameter("ChunkSize")]
+        public class ChunkSize : Name<int>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
new file mode 100644
index 0000000..868a914
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// The Update function for integer array broadcast and reduce
+    /// </summary>
+    internal sealed class BroadcastSenderReduceReceiverUpdateFunction : 
IUpdateFunction<int[], int[], int[]>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(BroadcastSenderReduceReceiverUpdateFunction));
+
+        private int _iterations;
+        private readonly int _maxIters;
+        private readonly int _dim;
+        private readonly int[] _intArr;
+        private readonly int _workers;
+
+        [Inject]
+        private BroadcastSenderReduceReceiverUpdateFunction(
+            [Parameter(typeof 
(BroadcastReduceConfiguration.NumberOfIterations))] int maxIters,
+            [Parameter(typeof (BroadcastReduceConfiguration.Dimensions))] int 
dim,
+            [Parameter(typeof (BroadcastReduceConfiguration.NumWorkers))] int 
numWorkers
+            )
+        {
+            _maxIters = maxIters;
+            _iterations = 0;
+            _dim = dim;
+            _intArr = new int[_dim];
+            _workers = numWorkers;
+        }
+
+        /// <summary>
+        /// Update function
+        /// </summary>
+        /// <param name="input">Input containing sum of all mappers 
arrays</param>
+        /// <returns>The Update Result</returns>
+        UpdateResult<int[], int[]> IUpdateFunction<int[], int[], 
int[]>.Update(int[] input)
+        {
+            Logger.Log(Level.Info, string.Format("Received value {0}", 
input[0]));
+
+            if (input[0] != (_iterations + 1)*_workers)
+            {
+                Exceptions.Throw(new Exception("Expected input to update 
functon not same as actual input"), Logger);
+            }
+
+            _iterations++;
+
+            if (_iterations < _maxIters)
+            {
+                for (int i = 0; i < _dim; i++)
+                {
+                    _intArr[i] = _iterations + 1;
+                }
+
+                return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+            }
+
+            return UpdateResult<int[], int[]>.Done(input);
+        }
+
+        /// <summary>
+        /// Initialize function. Sends integer array with value 1 to all 
mappers
+        /// </summary>
+        /// <returns>Map input</returns>
+        UpdateResult<int[], int[]> IUpdateFunction<int[], int[], 
int[]>.Initialize()
+        {
+            for (int i = 0; i < _dim; i++)
+            {
+                _intArr[i] = _iterations + 1;
+            }
+
+            return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
new file mode 100644
index 0000000..6ccd529
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// A reduce function that sums integer arrays.
+    /// </summary>
+    internal sealed class IntArraySumReduceFunction : IReduceFunction<int[]>
+    {
+        [Inject]
+        private IntArraySumReduceFunction()
+        {
+        }
+
+        /// <summary>
+        /// Reduce function that returns the sum of elements of int array
+        /// </summary>
+        /// <param name="elements">List of elements</param>
+        /// <returns>The sum of elements</returns>
+        int[] IReduceFunction<int[]>.Reduce(IEnumerable<int[]> elements)
+        {
+            int counter = 0;
+            int[] resArr = null;
+
+            foreach (var element in elements)
+            {
+                if (counter == 0)
+                {
+                    counter++;
+                    resArr = new int[element.Length];
+                }
+
+                for (int i = 0; i < resArr.Length; i++)
+                {
+                    resArr[i] += element[i];
+                }
+            }
+
+            return resArr;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
new file mode 100644
index 0000000..0d4762c
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// Pipeline Data Converter for integer array
+    /// </summary>
+    internal class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+    {
+        private readonly int _chunkSize;
+
+        [Inject]
+        private 
PipelineIntDataConverter([Parameter(typeof(BroadcastReduceConfiguration.ChunkSize))]
 int chunkSize)
+        {
+            _chunkSize = chunkSize;
+        }
+
+        /// <summary>
+        /// Convert a integer array to chunks
+        /// </summary>
+        /// <param name="message">message to be chunked</param>
+        /// <returns>chunked integer array</returns>
+        List<PipelineMessage<int[]>> 
IPipelineDataConverter<int[]>.PipelineMessage(int[] message)
+        {
+            var messageList = new List<PipelineMessage<int[]>>();
+            var totalChunks = message.Length / _chunkSize;
+
+            if (message.Length % _chunkSize != 0)
+            {
+                totalChunks++;
+            }
+
+            var counter = 0;
+            for (var i = 0; i < message.Length; i += _chunkSize)
+            {
+                var data = new int[Math.Min(_chunkSize, message.Length - i)];
+                Buffer.BlockCopy(message, i * sizeof(int), data, 0, 
data.Length * sizeof(int));
+
+                messageList.Add(counter == totalChunks - 1
+                    ? new PipelineMessage<int[]>(data, true)
+                    : new PipelineMessage<int[]>(data, false));
+
+                counter++;
+            }
+
+            return messageList;
+        }
+
+        /// <summary>
+        /// Converts integer array chunks to integer array
+        /// </summary>
+        /// <param name="pipelineMessage">List of integer array chunks</param>
+        /// <returns>aggregated integer array</returns>
+        int[] 
IPipelineDataConverter<int[]>.FullMessage(List<PipelineMessage<int[]>> 
pipelineMessage)
+        {
+            var size = pipelineMessage.Select(x => x.Data.Length).Sum();
+            var data = new int[size];
+            var offset = 0;
+
+            foreach (var message in pipelineMessage)
+            {
+                Buffer.BlockCopy(message.Data, 0, data, offset, 
message.Data.Length * sizeof(int));
+                offset += message.Data.Length * sizeof(int);
+            }
+
+            return data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
new file mode 100644
index 0000000..896a0d0
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Globalization;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IO.PartitionedData.Random;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// IMRU program that performs broadcast and reduce
+    /// </summary>
+    public sealed class PipelinedBroadcastAndReduce
+    {
+        private readonly IIMRUClient<int[], int[], int[]> _imruClient;
+
+        [Inject]
+        private PipelinedBroadcastAndReduce(IIMRUClient<int[], int[], int[]> 
imruClient)
+        {
+            _imruClient = imruClient;
+        }
+
+        /// <summary>
+        /// Runs the actual broadcast and reduce job
+        /// </summary>
+        public void Run(int numberofMappers, int chunkSize, int numIterations, 
int dim)
+        {
+            var updateFunctionConfig =
+                
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], 
int[], int[]>.ConfigurationModule
+                    .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                        
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build())
+                    .BindNamedParameter(typeof 
(BroadcastReduceConfiguration.NumberOfIterations),
+                        numIterations.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter(typeof 
(BroadcastReduceConfiguration.Dimensions),
+                        dim.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter(typeof 
(BroadcastReduceConfiguration.NumWorkers),
+                        numberofMappers.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+            var dataConverterConfig1 =
+                TangFactory.GetTang()
+                    
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
+                        
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
+                            
GenericType<PipelineIntDataConverter>.Class).Build())
+                    .BindNamedParameter(typeof 
(BroadcastReduceConfiguration.ChunkSize),
+                        chunkSize.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+            var dataConverterConfig2 =
+                TangFactory.GetTang()
+                    
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
+                        
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
+                            
GenericType<PipelineIntDataConverter>.Class).Build())
+                    .BindNamedParameter(typeof 
(BroadcastReduceConfiguration.ChunkSize),
+                        chunkSize.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+            var results = _imruClient.Submit(
+                new IMRUJobDefinitionBuilder()
+                    .SetMapFunctionConfiguration(IMRUMapConfiguration<int[], 
int[]>.ConfigurationModule
+                        .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                            
GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
+                        .Build())
+                    .SetUpdateFunctionConfiguration(updateFunctionConfig)
+                    
.SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
+                        .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
+                        .Build())
+                    
.SetUpdateFunctionCodecsConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
+                        .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
+                        .Build())
+                    
.SetReduceFunctionConfiguration(IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
+                        
.Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
+                            GenericType<IntArraySumReduceFunction>.Class)
+                        .Build())
+                    
.SetMapInputPipelineDataConverterConfiguration(dataConverterConfig1)
+                    
.SetMapOutputPipelineDataConverterConfiguration(dataConverterConfig2)
+                    .SetPartitionedDatasetConfiguration(
+                        
RandomDataConfiguration.ConfigurationModule.Set(RandomDataConfiguration.NumberOfPartitions,
+                            numberofMappers.ToString()).Build())
+                    .SetJobName("BroadcastReduce")
+                    .SetNumberOfMappers(numberofMappers)
+                    .Build());
+        }
+    }
+}
\ No newline at end of file

Reply via email to