Repository: incubator-reef
Updated Branches:
refs/heads/master d3a02c464 -> c43610fb1
[REEF-611] Introduce Integer Array BroadcastReduce IMRU example code
This addressed the issue by
* implementing Update function returns an integer array containing
value k+1 in iteration k and gets as input the sum of arrays from
all mappers.
* implementing Map function returns the same array it gets as input
from update function.
* specifying corresponding job definition
JIRA:
[REEF-611](https://issues.apache.org/jira/browse/REEF-611)
Pull Request:
This closes #400
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c43610fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c43610fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c43610fb
Branch: refs/heads/master
Commit: c43610fb154dc9a6026c9501b4ea6f902f7f1507
Parents: d3a02c4
Author: Dhruv <[email protected]>
Authored: Fri Aug 21 11:56:51 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 21 13:42:52 2015 -0700
----------------------------------------------------------------------
.../Org.Apache.REEF.IMRU.Examples.csproj | 6 ++
.../BroadcastReceiverReduceSenderMapFunction.cs | 61 +++++++++++
.../BroadcastReduceConfiguration.cs | 46 ++++++++
...oadcastSenderReduceReceiverUpdateFunction.cs | 98 +++++++++++++++++
.../IntArraySumReduceFunction.cs | 63 +++++++++++
.../PipelineIntDataConverter.cs | 92 ++++++++++++++++
.../PipelinedBroadcastAndReduce.cs | 105 +++++++++++++++++++
7 files changed, 471 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 2072827..ceaa644 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,6 +46,12 @@ under the License.
<Compile Include="MapperCount\IntSumReduceFunction.cs" />
<Compile Include="MapperCount\MapperCount.cs" />
<Compile Include="MapperCount\MapperCountUpdateFunction.cs" />
+ <Compile
Include="PipelinedBroadcastReduce\BroadcastReceiverReduceSenderMapFunction.cs"
/>
+ <Compile
Include="PipelinedBroadcastReduce\BroadcastReduceConfiguration.cs" />
+ <Compile
Include="PipelinedBroadcastReduce\BroadcastSenderReduceReceiverUpdateFunction.cs"
/>
+ <Compile Include="PipelinedBroadcastReduce\IntArraySumReduceFunction.cs" />
+ <Compile Include="PipelinedBroadcastReduce\PipelinedBroadcastAndReduce.cs"
/>
+ <Compile Include="PipelinedBroadcastReduce\PipelineIntDataConverter.cs" />
<Compile Include="Run.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
new file mode 100644
index 0000000..30b67b5
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReceiverReduceSenderMapFunction.cs
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// Map function for integer array broadcast and reduce
+ /// </summary>
+ internal sealed class BroadcastReceiverReduceSenderMapFunction :
IMapFunction<int[], int[]>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof
(BroadcastReceiverReduceSenderMapFunction));
+
+ private int _iterations;
+
+ [Inject]
+ private BroadcastReceiverReduceSenderMapFunction()
+ {
+ }
+
+ /// <summary>
+ /// Map function
+ /// </summary>
+ /// <param name="mapInput">integer array</param>
+ /// <returns>The same integer array</returns>
+ int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+ {
+ _iterations++;
+
+ Logger.Log(Level.Info, string.Format("Received value {0}",
mapInput[0]));
+
+ if (mapInput[0] != _iterations)
+ {
+ Exceptions.Throw(new Exception("Expected value in mappers
different from actual value"), Logger);
+ }
+
+ return mapInput;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
new file mode 100644
index 0000000..dd27978
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastReduceConfiguration.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ internal static class BroadcastReduceConfiguration
+ {
+ [NamedParameter("Number of Iterations")]
+ internal class NumberOfIterations : Name<int>
+ {
+ }
+
+ [NamedParameter("Number of Dims.")]
+ internal class Dimensions : Name<int>
+ {
+ }
+
+ [NamedParameter("Number of Workers")]
+ internal class NumWorkers : Name<int>
+ {
+ }
+
+ [NamedParameter("ChunkSize")]
+ public class ChunkSize : Name<int>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
new file mode 100644
index 0000000..868a914
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/BroadcastSenderReduceReceiverUpdateFunction.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// The Update function for integer array broadcast and reduce
+ /// </summary>
+ internal sealed class BroadcastSenderReduceReceiverUpdateFunction :
IUpdateFunction<int[], int[], int[]>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof
(BroadcastSenderReduceReceiverUpdateFunction));
+
+ private int _iterations;
+ private readonly int _maxIters;
+ private readonly int _dim;
+ private readonly int[] _intArr;
+ private readonly int _workers;
+
+ [Inject]
+ private BroadcastSenderReduceReceiverUpdateFunction(
+ [Parameter(typeof
(BroadcastReduceConfiguration.NumberOfIterations))] int maxIters,
+ [Parameter(typeof (BroadcastReduceConfiguration.Dimensions))] int
dim,
+ [Parameter(typeof (BroadcastReduceConfiguration.NumWorkers))] int
numWorkers
+ )
+ {
+ _maxIters = maxIters;
+ _iterations = 0;
+ _dim = dim;
+ _intArr = new int[_dim];
+ _workers = numWorkers;
+ }
+
+ /// <summary>
+ /// Update function
+ /// </summary>
+ /// <param name="input">Input containing sum of all mappers
arrays</param>
+ /// <returns>The Update Result</returns>
+ UpdateResult<int[], int[]> IUpdateFunction<int[], int[],
int[]>.Update(int[] input)
+ {
+ Logger.Log(Level.Info, string.Format("Received value {0}",
input[0]));
+
+ if (input[0] != (_iterations + 1)*_workers)
+ {
+ Exceptions.Throw(new Exception("Expected input to update
functon not same as actual input"), Logger);
+ }
+
+ _iterations++;
+
+ if (_iterations < _maxIters)
+ {
+ for (int i = 0; i < _dim; i++)
+ {
+ _intArr[i] = _iterations + 1;
+ }
+
+ return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+ }
+
+ return UpdateResult<int[], int[]>.Done(input);
+ }
+
+ /// <summary>
+ /// Initialize function. Sends integer array with value 1 to all
mappers
+ /// </summary>
+ /// <returns>Map input</returns>
+ UpdateResult<int[], int[]> IUpdateFunction<int[], int[],
int[]>.Initialize()
+ {
+ for (int i = 0; i < _dim; i++)
+ {
+ _intArr[i] = _iterations + 1;
+ }
+
+ return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
new file mode 100644
index 0000000..6ccd529
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/IntArraySumReduceFunction.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// A reduce function that sums integer arrays.
+ /// </summary>
+ internal sealed class IntArraySumReduceFunction : IReduceFunction<int[]>
+ {
+ [Inject]
+ private IntArraySumReduceFunction()
+ {
+ }
+
+ /// <summary>
+ /// Reduce function that returns the sum of elements of int array
+ /// </summary>
+ /// <param name="elements">List of elements</param>
+ /// <returns>The sum of elements</returns>
+ int[] IReduceFunction<int[]>.Reduce(IEnumerable<int[]> elements)
+ {
+ int counter = 0;
+ int[] resArr = null;
+
+ foreach (var element in elements)
+ {
+ if (counter == 0)
+ {
+ counter++;
+ resArr = new int[element.Length];
+ }
+
+ for (int i = 0; i < resArr.Length; i++)
+ {
+ resArr[i] += element[i];
+ }
+ }
+
+ return resArr;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
new file mode 100644
index 0000000..0d4762c
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelineIntDataConverter.cs
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// Pipeline Data Converter for integer array
+ /// </summary>
+ internal class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+ {
+ private readonly int _chunkSize;
+
+ [Inject]
+ private
PipelineIntDataConverter([Parameter(typeof(BroadcastReduceConfiguration.ChunkSize))]
int chunkSize)
+ {
+ _chunkSize = chunkSize;
+ }
+
+ /// <summary>
+ /// Convert a integer array to chunks
+ /// </summary>
+ /// <param name="message">message to be chunked</param>
+ /// <returns>chunked integer array</returns>
+ List<PipelineMessage<int[]>>
IPipelineDataConverter<int[]>.PipelineMessage(int[] message)
+ {
+ var messageList = new List<PipelineMessage<int[]>>();
+ var totalChunks = message.Length / _chunkSize;
+
+ if (message.Length % _chunkSize != 0)
+ {
+ totalChunks++;
+ }
+
+ var counter = 0;
+ for (var i = 0; i < message.Length; i += _chunkSize)
+ {
+ var data = new int[Math.Min(_chunkSize, message.Length - i)];
+ Buffer.BlockCopy(message, i * sizeof(int), data, 0,
data.Length * sizeof(int));
+
+ messageList.Add(counter == totalChunks - 1
+ ? new PipelineMessage<int[]>(data, true)
+ : new PipelineMessage<int[]>(data, false));
+
+ counter++;
+ }
+
+ return messageList;
+ }
+
+ /// <summary>
+ /// Converts integer array chunks to integer array
+ /// </summary>
+ /// <param name="pipelineMessage">List of integer array chunks</param>
+ /// <returns>aggregated integer array</returns>
+ int[]
IPipelineDataConverter<int[]>.FullMessage(List<PipelineMessage<int[]>>
pipelineMessage)
+ {
+ var size = pipelineMessage.Select(x => x.Data.Length).Sum();
+ var data = new int[size];
+ var offset = 0;
+
+ foreach (var message in pipelineMessage)
+ {
+ Buffer.BlockCopy(message.Data, 0, data, offset,
message.Data.Length * sizeof(int));
+ offset += message.Data.Length * sizeof(int);
+ }
+
+ return data;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c43610fb/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
new file mode 100644
index 0000000..896a0d0
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Globalization;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IO.PartitionedData.Random;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// IMRU program that performs broadcast and reduce
+ /// </summary>
+ public sealed class PipelinedBroadcastAndReduce
+ {
+ private readonly IIMRUClient<int[], int[], int[]> _imruClient;
+
+ [Inject]
+ private PipelinedBroadcastAndReduce(IIMRUClient<int[], int[], int[]>
imruClient)
+ {
+ _imruClient = imruClient;
+ }
+
+ /// <summary>
+ /// Runs the actual broadcast and reduce job
+ /// </summary>
+ public void Run(int numberofMappers, int chunkSize, int numIterations,
int dim)
+ {
+ var updateFunctionConfig =
+
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[],
int[], int[]>.ConfigurationModule
+ .Set(IMRUUpdateConfiguration<int[], int[],
int[]>.UpdateFunction,
+
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build())
+ .BindNamedParameter(typeof
(BroadcastReduceConfiguration.NumberOfIterations),
+ numIterations.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter(typeof
(BroadcastReduceConfiguration.Dimensions),
+ dim.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter(typeof
(BroadcastReduceConfiguration.NumWorkers),
+ numberofMappers.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ var dataConverterConfig1 =
+ TangFactory.GetTang()
+
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
+
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
+
GenericType<PipelineIntDataConverter>.Class).Build())
+ .BindNamedParameter(typeof
(BroadcastReduceConfiguration.ChunkSize),
+ chunkSize.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ var dataConverterConfig2 =
+ TangFactory.GetTang()
+
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
+
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
+
GenericType<PipelineIntDataConverter>.Class).Build())
+ .BindNamedParameter(typeof
(BroadcastReduceConfiguration.ChunkSize),
+ chunkSize.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ var results = _imruClient.Submit(
+ new IMRUJobDefinitionBuilder()
+ .SetMapFunctionConfiguration(IMRUMapConfiguration<int[],
int[]>.ConfigurationModule
+ .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+
GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
+ .Build())
+ .SetUpdateFunctionConfiguration(updateFunctionConfig)
+
.SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
+ .Set(IMRUCodecConfiguration<int[]>.Codec,
GenericType<IntArrayStreamingCodec>.Class)
+ .Build())
+
.SetUpdateFunctionCodecsConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
+ .Set(IMRUCodecConfiguration<int[]>.Codec,
GenericType<IntArrayStreamingCodec>.Class)
+ .Build())
+
.SetReduceFunctionConfiguration(IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
+
.Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
+ GenericType<IntArraySumReduceFunction>.Class)
+ .Build())
+
.SetMapInputPipelineDataConverterConfiguration(dataConverterConfig1)
+
.SetMapOutputPipelineDataConverterConfiguration(dataConverterConfig2)
+ .SetPartitionedDatasetConfiguration(
+
RandomDataConfiguration.ConfigurationModule.Set(RandomDataConfiguration.NumberOfPartitions,
+ numberofMappers.ToString()).Build())
+ .SetJobName("BroadcastReduce")
+ .SetNumberOfMappers(numberofMappers)
+ .Build());
+ }
+ }
+}
\ No newline at end of file