Repository: incubator-reef Updated Branches: refs/heads/master 3670eabfc -> 2f0cf34d8
[REEF-554] Update IMRU APIs to enable writing of REEF IMRU This addressed the issue by * modifying the APIs * modifying MapperCount example to be compatible with new API * modifying InProcessIMRU to be compatible with the current API JIRA: [REEF-554](https://issues.apache.org/jira/browse/REEF-554) Pull Request: This closes #334 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2f0cf34d Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2f0cf34d Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2f0cf34d Branch: refs/heads/master Commit: 2f0cf34d85c988c9f4340f1f7a702ec588f22790 Parents: 3670eab Author: Dhruv <[email protected]> Authored: Mon Aug 3 15:52:57 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Aug 7 08:55:18 2015 -0700 ---------------------------------------------------------------------- .../MapperCountTest.cs | 2 +- lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs | 2 +- .../API/IMRUCodecConfiguration.cs | 46 +++++ .../API/IMRUConfiguration.cs | 82 --------- .../API/IMRUJobDefinition.cs | 130 ++++++++++++- .../API/IMRUJobDefinitionBuilder.cs | 184 +++++++++++++++++-- .../API/IMRUMapConfiguration.cs | 46 +++++ .../IMRUPipelineDataConverterConfiguration.cs | 49 +++++ .../API/IMRUReduceFunctionConfiguration.cs | 46 +++++ .../API/IMRUUpdateConfiguration.cs | 47 +++++ .../cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs | 2 +- .../Org.Apache.REEF.IMRU/API/IUpdateFunction.cs | 2 +- .../API/Parameters/MapInputCodec.cs | 33 ---- .../API/Parameters/MapOutputCodec.cs | 33 ---- .../API/Parameters/ResultCodec.cs | 33 ---- .../Examples/MapperCount/IdentityMapFunction.cs | 5 + .../MapperCount/IntSumReduceFunction.cs | 5 + .../Examples/MapperCount/MapperCount.cs | 46 +++-- .../MapperCount/MapperCountUpdateFunction.cs | 9 + .../InProcess/IMRURunner.cs | 6 +- .../InProcess/InProcessIMRUClient.cs | 19 +- .../InProcess/InProcessIMRUConfiguration.cs | 3 + .../Org.Apache.REEF.IMRU.csproj | 13 +- 23 files changed, 612 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs index 42f165f..ad0e477 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs @@ -46,7 +46,7 @@ namespace Org.Apache.REEF.IMRU.Tests .Build() ) .GetInstance<MapperCount>(); - var result = tested.Run(); + var result = tested.Run(NumberOfMappers); Assert.AreEqual(NumberOfMappers, result, "The result of the run should be the number of Mappers."); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs index 29c45cf..247d721 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs @@ -27,7 +27,7 @@ namespace Org.Apache.REEF.IMRU.API /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> /// <typeparam name="TResult">The return type of the computation.</typeparam> - public interface IIMRUClient<TMapInput, TMapOutput, TResult> + public interface IIMRUClient<TMapInput, TMapOutput, out TResult> { /// <summary> /// Submit the given job for execution. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs new file mode 100644 index 0000000..00df10c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// A configuraton module for specifying codecs for IMRU + /// </summary> + /// <typeparam name="T">Generic type</typeparam> + public sealed class IMRUCodecConfiguration<T> : ConfigurationModuleBuilder + { + /// <summary> + /// The codec to be used + /// </summary> + public static readonly RequiredImpl<IStreamingCodec<T>> Codec = + new RequiredImpl<IStreamingCodec<T>>(); + + /// <summary> + /// Configuration module + /// </summary> + public static ConfigurationModule ConfigurationModule = + new IMRUCodecConfiguration<T>() + .BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs deleted file mode 100644 index 595f64a..0000000 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.IMRU.API.Parameters; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.StreamingCodec; - -namespace Org.Apache.REEF.IMRU.API -{ - /// <summary> - /// A configuration module for IMRU. - /// </summary> - /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> - /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> - /// <typeparam name="TResult">The return type of the computation.</typeparam> - public sealed class IMRUConfiguration<TMapInput, TMapOutput, TResult> : ConfigurationModuleBuilder - { - /// <summary> - /// The codec to be used for the map input. - /// </summary> - public static readonly RequiredImpl<IStreamingCodec<TMapInput>> MapInputCodec = - new RequiredImpl<IStreamingCodec<TMapInput>>(); - - /// <summary> - /// The codec to be used for the map output. - /// </summary> - public static readonly RequiredImpl<IStreamingCodec<TMapOutput>> MapOutputCodec = - new RequiredImpl<IStreamingCodec<TMapOutput>>(); - - /// <summary> - /// The codec to be used for the result. - /// </summary> - public static readonly RequiredImpl<IStreamingCodec<TResult>> ResultCodec = - new RequiredImpl<IStreamingCodec<TResult>>(); - - /// <summary> - /// The IReduceFunction type to use. - /// </summary> - public static readonly RequiredImpl<IReduceFunction<TMapOutput>> ReduceFunction = - new RequiredImpl<IReduceFunction<TMapOutput>>(); - - /// <summary> - /// The IUpdateFunction type to use. - /// </summary> - public static readonly RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, TResult>> UpdateFunction = - new RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, TResult>>(); - - /// <summary> - /// The IMapFunction type to use. - /// </summary> - public static readonly RequiredImpl<IMapFunction<TMapInput, TMapOutput>> MapFunction = - new RequiredImpl<IMapFunction<TMapInput, TMapOutput>>(); - - public static ConfigurationModule ConfigurationModule = - new IMRUConfiguration<TMapInput, TMapOutput, TResult>() - .BindNamedParameter(GenericType<MapInputCodec<TMapInput>>.Class, MapInputCodec) - .BindNamedParameter(GenericType<MapOutputCodec<TMapOutput>>.Class, MapOutputCodec) - .BindNamedParameter(GenericType<ResultCodec<TResult>>.Class, ResultCodec) - .BindImplementation(GenericType<IReduceFunction<TMapOutput>>.Class, ReduceFunction) - .BindImplementation(GenericType<IUpdateFunction<TMapInput, TMapOutput, TResult>>.Class, UpdateFunction) - .BindImplementation(GenericType<IMapFunction<TMapInput, TMapOutput>>.Class, MapFunction) - .Build(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs index 5744be8..2f2bf26 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs @@ -27,23 +27,139 @@ namespace Org.Apache.REEF.IMRU.API /// <seealso cref="IMRUJobDefinitionBuilder" /> public sealed class IMRUJobDefinition { - private readonly IConfiguration _configuration; private readonly string _jobName; + private readonly IConfiguration _mapFunctionConfiguration; + private readonly IConfiguration _mapInputCodecConfiguration; + private readonly IConfiguration _updateFunctionCodecsConfiguration; + private readonly IConfiguration _reduceFunctionConfiguration; + private readonly IConfiguration _updateFunctionConfiguration; + private readonly IConfiguration _mapOutputPipelineDataConverterConfiguration; + private readonly IConfiguration _mapInputPipelineDataConverterConfiguration; + private readonly IConfiguration _partitionedDatasetConfiguration; + private readonly int _numberOfMappers; - internal IMRUJobDefinition(IConfiguration configuration, string jobName) + /// <summary> + /// Constructor + /// </summary> + /// <param name="mapFunctionConfiguration">Map function configuration</param> + /// <param name="mapInputCodecConfiguration">Map input codec configuration</param> + /// <param name="updateFunctionCodecsConfiguration">codec configuration for update + /// function. It is union of TMapInput, TMapOutput and TResult configuration</param> + /// <param name="reduceFunctionConfiguration">Reduce function configuration</param> + /// <param name="updateFunctionConfiguration">Update function configuration</param> + /// <param name="mapOutputPipelineDataConverterConfiguration">Configuration of + /// PipelineDataConverter for TMapOutput</param> + /// <param name="mapInputPipelineDataConverterConfiguration">Configuration of + /// PipelineDataConverter for TMapInput</param> + /// <param name="partitionedDatasetConfiguration">Configuration of partitioned + /// dataset</param> + /// <param name="numberOfMappers">Number of mappers</param> + /// <param name="jobName">Job name</param> + internal IMRUJobDefinition( + IConfiguration mapFunctionConfiguration, + IConfiguration mapInputCodecConfiguration, + IConfiguration updateFunctionCodecsConfiguration, + IConfiguration reduceFunctionConfiguration, + IConfiguration updateFunctionConfiguration, + IConfiguration mapOutputPipelineDataConverterConfiguration, + IConfiguration mapInputPipelineDataConverterConfiguration, + IConfiguration partitionedDatasetConfiguration, + int numberOfMappers, + string jobName) { - _configuration = configuration; + _mapFunctionConfiguration = mapFunctionConfiguration; + _mapInputCodecConfiguration = mapInputCodecConfiguration; + _updateFunctionCodecsConfiguration = updateFunctionCodecsConfiguration; + _reduceFunctionConfiguration = reduceFunctionConfiguration; + _updateFunctionConfiguration = updateFunctionConfiguration; + _mapOutputPipelineDataConverterConfiguration = mapOutputPipelineDataConverterConfiguration; + _mapInputPipelineDataConverterConfiguration = mapInputPipelineDataConverterConfiguration; + _partitionedDatasetConfiguration = partitionedDatasetConfiguration; + _numberOfMappers = numberOfMappers; _jobName = jobName; } - internal IConfiguration Configuration + /// <summary> + /// Name of the job + /// </summary> + internal string JobName { - get { return _configuration; } + get { return _jobName; } } - internal string JobName + /// <summary> + /// Configuration of map function + /// </summary> + internal IConfiguration MapFunctionConfiguration { - get { return _jobName; } + get { return _mapFunctionConfiguration; } + } + + /// <summary> + /// Configuration of codec for TMapInput + /// </summary> + internal IConfiguration MapInputCodecConfiguration + { + get { return _mapInputCodecConfiguration; } + } + + /// <summary> + /// Configuration of codecs needed by Update function + /// </summary> + internal IConfiguration UpdateFunctionCodecsConfiguration + { + get { return _updateFunctionCodecsConfiguration; } + } + + /// <summary> + /// Configuration of reduce function + /// </summary> + internal IConfiguration ReduceFunctionConfiguration + { + get { return _reduceFunctionConfiguration; } + } + + /// <summary> + /// Configuration of update function + /// </summary> + internal IConfiguration UpdateFunctionConfiguration + { + get { return _updateFunctionConfiguration; } + } + + /// <summary> + /// Configuration of PipelineDataConverter for Map outout + /// </summary> + internal IConfiguration MapOutputPipelineDataConverterConfiguration + { + get { return _mapOutputPipelineDataConverterConfiguration; } + } + + /// <summary> + /// Configuration of PipelineDataConverter for Map Input + /// </summary> + internal IConfiguration MapInputPipelineDataConverterConfiguration + { + get { return _mapInputPipelineDataConverterConfiguration; } + } + + /// <summary> + /// Configuration of partitioned dataset + /// </summary> + internal IConfiguration PartitionedDatasetConfgiuration + { + get { return _partitionedDatasetConfiguration; } + } + + /// <summary> + /// Number of mappers + /// </summary> + /// TODO: This is duplicate in a sense that it can be determined + /// TODO: automatically from IPartitionedDataset. However, right now + /// TODO: GroupComm. instantiated in IMRUDriver needs this parameter + /// TODO: in constructor. This will be removed once we remove it from GroupComm. + internal int NumberOfMappers { + get { return _numberOfMappers; } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs index 2427016..7f54459 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs @@ -18,7 +18,11 @@ */ using System; +using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.API { @@ -28,19 +32,30 @@ namespace Org.Apache.REEF.IMRU.API /// <seealso cref="IMRUJobDefinition" /> public sealed class IMRUJobDefinitionBuilder { - private IConfiguration _configuration; + private static readonly Logger Logger = Logger.GetLogger(typeof (IMRUJobDefinitionBuilder)); + private string _jobName; + private int _numberOfMappers; + private IConfiguration _mapFunctionConfiguration; + private IConfiguration _mapInputCodecConfiguration; + private IConfiguration _updateFunctionCodecsConfiguration; + private IConfiguration _reduceFunctionConfiguration; + private IConfiguration _updateFunctionConfiguration; + private IConfiguration _mapOutputPipelineDataConverterConfiguration; + private IConfiguration _mapInputPipelineDataConverterConfiguration; + private IConfiguration _partitionedDatasetConfiguration; + + private static readonly IConfiguration EmptyConfiguration = + TangFactory.GetTang().NewConfigurationBuilder().Build(); /// <summary> - /// Set the Configuration used to instantiate the IMapFunction, IReduceFunction, IUpdateFunction and all codec instances + /// Constructor /// </summary> - /// <param name="configuration">The Configuration used to instantiate the IMapFunction instance.</param> - /// <seealso cref="IMRUConfiguration{TMapInput,TMapOutput,TResult}" /> - /// <returns>this</returns> - public IMRUJobDefinitionBuilder SetConfiguration(IConfiguration configuration) + public IMRUJobDefinitionBuilder() { - _configuration = configuration; - return this; + _mapInputPipelineDataConverterConfiguration = EmptyConfiguration; + _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration; + _partitionedDatasetConfiguration = EmptyConfiguration; } /// <summary> @@ -55,21 +70,162 @@ namespace Org.Apache.REEF.IMRU.API } /// <summary> + /// Sets configuration of map function + /// </summary> + /// <param name="mapFunctionConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetMapFunctionConfiguration(IConfiguration mapFunctionConfiguration) + { + _mapFunctionConfiguration = mapFunctionConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of codec for TMapInput + /// </summary> + /// <param name="mapInputCodecConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetMapInputCodecConfiguration(IConfiguration mapInputCodecConfiguration) + { + _mapInputCodecConfiguration = mapInputCodecConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of codecs needed by Update function + /// </summary> + /// <param name="updateFunctionCodecsConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetUpdateFunctionCodecsConfiguration( + IConfiguration updateFunctionCodecsConfiguration) + { + _updateFunctionCodecsConfiguration = updateFunctionCodecsConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of reduce function + /// </summary> + /// <param name="reduceFunctionConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetReduceFunctionConfiguration(IConfiguration reduceFunctionConfiguration) + { + _reduceFunctionConfiguration = reduceFunctionConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of update function + /// </summary> + /// <param name="updateFunctionConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetUpdateFunctionConfiguration(IConfiguration updateFunctionConfiguration) + { + _updateFunctionConfiguration = updateFunctionConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of PipelineDataConverter for Map output + /// </summary> + /// <param name="mapOutputPipelineDataConverterConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetMapOutputPipelineDataConverterConfiguration( + IConfiguration mapOutputPipelineDataConverterConfiguration) + { + _mapOutputPipelineDataConverterConfiguration = mapOutputPipelineDataConverterConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of PipelineDataConverter for Map Input + /// </summary> + /// <param name="mapInputPipelineDataConverterConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetMapInputPipelineDataConverterConfiguration( + IConfiguration mapInputPipelineDataConverterConfiguration) + { + _mapInputPipelineDataConverterConfiguration = mapInputPipelineDataConverterConfiguration; + return this; + } + + /// <summary> + /// Sets configuration of partitioned dataset + /// </summary> + /// <param name="partitionedDatasetConfiguration">Configuration</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetPartitionedDatasetConfiguration( + IConfiguration partitionedDatasetConfiguration) + { + _partitionedDatasetConfiguration = partitionedDatasetConfiguration; + return this; + } + + /// <summary> + /// Sets Number of mappers + /// </summary> + /// <param name="numberOfMappers">Number of mappers</param> + /// <returns>this</returns> + /// TODO: This is duplicate in a sense that it can be determined + /// TODO: automatically from IPartitionedDataset. However, right now + /// TODO: GroupComm. instantiated in IMRUDriver needs this parameter + /// TODO: in constructor. This will be removed once we remove it from GroupComm. + public IMRUJobDefinitionBuilder SetNumberOfMappers(int numberOfMappers) + { + _numberOfMappers = numberOfMappers; + return this; + } + + /// <summary> /// Instantiate the IMRUJobDefinition. /// </summary> /// <returns>The IMRUJobDefintion configured.</returns> - /// <exception cref="NullReferenceException">If any of the required paremeters is not set.</exception> + /// <exception cref="NullReferenceException">If any of the required parameters is not set.</exception> public IMRUJobDefinition Build() { - if (null == _configuration) + if (null == _jobName) { - throw new NullReferenceException("Configuration can't be null."); + Exceptions.Throw(new NullReferenceException("Job name cannot be null"), + Logger); } - if (null == _jobName) + + if (null == _mapFunctionConfiguration) + { + Exceptions.Throw(new NullReferenceException("Map function configuration cannot be null"), Logger); + } + + if (null == _mapInputCodecConfiguration) { - throw new NullReferenceException("JobName can't be null."); + Exceptions.Throw(new NullReferenceException("Map input codec configuration cannot be null"), Logger); } - return new IMRUJobDefinition(_configuration, _jobName); + + if (null == _updateFunctionCodecsConfiguration) + { + Exceptions.Throw(new NullReferenceException("Update function codecs configuration cannot be null"), + Logger); + } + + if (null == _reduceFunctionConfiguration) + { + Exceptions.Throw(new NullReferenceException("Reduce function configuration cannot be null"), Logger); + } + + if (null == _updateFunctionConfiguration) + { + Exceptions.Throw(new NullReferenceException("Update function configuration cannot be null"), Logger); + } + + return new IMRUJobDefinition( + _mapFunctionConfiguration, + _mapInputCodecConfiguration, + _updateFunctionCodecsConfiguration, + _reduceFunctionConfiguration, + _updateFunctionConfiguration, + _mapOutputPipelineDataConverterConfiguration, + _mapInputPipelineDataConverterConfiguration, + _partitionedDatasetConfiguration, + _numberOfMappers, + _jobName); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs new file mode 100644 index 0000000..7925d63 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// A configuration module for IMRU IMapFunction. + /// </summary> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + public sealed class IMRUMapConfiguration<TMapInput, TMapOutput> : ConfigurationModuleBuilder + { + /// <summary> + /// The IMapFunction type to use. + /// </summary> + public static readonly RequiredImpl<IMapFunction<TMapInput, TMapOutput>> MapFunction = + new RequiredImpl<IMapFunction<TMapInput, TMapOutput>>(); + + /// <summary> + /// Configuration module + /// </summary> + public static ConfigurationModule ConfigurationModule = + new IMRUMapConfiguration<TMapInput, TMapOutput>() + .BindImplementation(GenericType<IMapFunction<TMapInput, TMapOutput>>.Class, MapFunction) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs new file mode 100644 index 0000000..fb63f2b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// A configuration module for The PipelineDataConverter used for chunking + /// and dechunking TMapInput and TMapOutput + /// </summary> + /// <typeparam name="T">Generic type</typeparam> + public sealed class IMRUPipelineDataConverterConfiguration<T> : ConfigurationModuleBuilder + { + /// <summary> + /// The PipelineDataConverter used for chunking and + /// dechunking TMapInput and TMapOutput + /// </summary> + public static readonly + RequiredImpl<IPipelineDataConverter<T>> MapInputPiplelineDataConverter = + new RequiredImpl<IPipelineDataConverter<T>>(); + + /// <summary> + /// Configuration module + /// </summary> + public static ConfigurationModule ConfigurationModule = + new IMRUPipelineDataConverterConfiguration<T>() + .BindImplementation(GenericType<IPipelineDataConverter<T>>.Class, MapInputPiplelineDataConverter) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs new file mode 100644 index 0000000..033ed69 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// A configuration module for IMRU Reduce function. + /// </summary> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + public sealed class IMRUReduceFunctionConfiguration<TMapOutput> : ConfigurationModuleBuilder + { + /// <summary> + /// The IReduceFunction type to use. + /// </summary> + public static readonly RequiredImpl<IReduceFunction<TMapOutput>> ReduceFunction = + new RequiredImpl<IReduceFunction<TMapOutput>>(); + + /// <summary> + /// Configuration module + /// </summary> + public static ConfigurationModule ConfigurationModule = + new IMRUReduceFunctionConfiguration<TMapOutput>() + .BindImplementation(GenericType<IReduceFunction<TMapOutput>>.Class, ReduceFunction) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs new file mode 100644 index 0000000..de999ce --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// A configuration module for IMRU Update function. + /// </summary> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + public sealed class IMRUUpdateConfiguration<TMapInput, TMapOutput, TResult> : ConfigurationModuleBuilder + { + /// <summary> + /// The IUpdateFunction type to use. + /// </summary> + public static readonly RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, TResult>> UpdateFunction = + new RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, TResult>>(); + + /// <summary> + /// Configuration module + /// </summary> + public static ConfigurationModule ConfigurationModule = + new IMRUUpdateConfiguration<TMapInput, TMapOutput, TResult>() + .BindImplementation(GenericType<IUpdateFunction<TMapInput, TMapOutput, TResult>>.Class, UpdateFunction) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs index 9c6479c..58c6088 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs @@ -29,7 +29,7 @@ namespace Org.Apache.REEF.IMRU.API /// </remarks> /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> - public interface IMapFunction<TMapInput, TMapOutput> + public interface IMapFunction<in TMapInput, out TMapOutput> { /// <summary> /// Computes new output based on the given side information and data. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs index bae1448..ce23085 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.IMRU.API /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> /// <typeparam name="TResult">The return type of the computation.</typeparam> - public interface IUpdateFunction<TMapInput, TMapOutput, TResult> + public interface IUpdateFunction<TMapInput, in TMapOutput, TResult> { /// <summary> /// The Update task for IMRU. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs deleted file mode 100644 index fc21e79..0000000 --- a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.StreamingCodec; - -namespace Org.Apache.REEF.IMRU.API.Parameters -{ - /// <summary> - /// The codec to be used for the map input. - /// </summary> - /// <typeparam name="TMapInput"></typeparam> - [NamedParameter("The codec to be used for the map input.")] - public sealed class MapInputCodec<TMapInput> : Name<IStreamingCodec<TMapInput>> - { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs deleted file mode 100644 index 095415d..0000000 --- a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.StreamingCodec; - -namespace Org.Apache.REEF.IMRU.API.Parameters -{ - /// <summary> - /// The codec to be used for the map output. - /// </summary> - /// <typeparam name="TMapOutput"></typeparam> - [NamedParameter("The codec to be used for the map output.")] - public sealed class MapOutputCodec<TMapOutput> : Name<IStreamingCodec<TMapOutput>> - { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs deleted file mode 100644 index d6a35a5..0000000 --- a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.StreamingCodec; - -namespace Org.Apache.REEF.IMRU.API.Parameters -{ - /// <summary> - /// The codec to be used for the result. - /// </summary> - /// <typeparam name="TResult"></typeparam> - [NamedParameter("The codec to be used for the result.")] - public class ResultCodec<TResult> : Name<IStreamingCodec<TResult>> - { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs index 366881b..19eaeb5 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs @@ -32,6 +32,11 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount { } + /// <summary> + /// Identity map function + /// </summary> + /// <param name="mapInput"></param> + /// <returns>mapInput itself</returns> public int Map(int mapInput) { return mapInput; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs index 03d9605..94c4332 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs @@ -34,6 +34,11 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount { } + /// <summary> + /// Reduce function that returns the sum of elements + /// </summary> + /// <param name="elements">List of elements</param> + /// <returns>The sum of elements</returns> public int Reduce(IEnumerable<int> elements) { return elements.Aggregate((x, y) => x + y); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs index 9b5b247..ae57e48 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs @@ -19,6 +19,7 @@ using System.Linq; using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IO.PartitionedData.Random; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; @@ -39,29 +40,44 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount } /// <summary> + /// Runs the actual mapper count job /// </summary> /// <returns>The number of MapFunction instances that are part of the job.</returns> - public int Run() + public int Run(int numberofMappers) { var results = _imruClient.Submit( new IMRUJobDefinitionBuilder() - .SetConfiguration( - IMRUConfiguration<int, int, int>.ConfigurationModule - .Set(IMRUConfiguration<int, int, int>.MapFunction, GenericType<IdentityMapFunction>.Class) - .Set(IMRUConfiguration<int, int, int>.ReduceFunction, - GenericType<IntSumReduceFunction>.Class) - .Set(IMRUConfiguration<int, int, int>.UpdateFunction, + .SetMapFunctionConfiguration(IMRUMapConfiguration<int, int>.ConfigurationModule + .Set(IMRUMapConfiguration<int, int>.MapFunction, GenericType<IdentityMapFunction>.Class) + .Build()) + .SetUpdateFunctionConfiguration( + IMRUUpdateConfiguration<int, int, int>.ConfigurationModule + .Set(IMRUUpdateConfiguration<int, int, int>.UpdateFunction, GenericType<MapperCountUpdateFunction>.Class) - .Set(IMRUConfiguration<int, int, int>.MapInputCodec, - GenericType<IntStreamingCodec>.Class) - .Set(IMRUConfiguration<int, int, int>.MapOutputCodec, - GenericType<IntStreamingCodec>.Class) - .Set(IMRUConfiguration<int, int, int>.ResultCodec, GenericType<IntStreamingCodec>.Class) .Build()) + .SetMapInputCodecConfiguration(IMRUCodecConfiguration<int>.ConfigurationModule + .Set(IMRUCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class) + .Build()) + .SetUpdateFunctionCodecsConfiguration(IMRUCodecConfiguration<int>.ConfigurationModule + .Set(IMRUCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class) + .Build()) + .SetReduceFunctionConfiguration(IMRUReduceFunctionConfiguration<int>.ConfigurationModule + .Set(IMRUReduceFunctionConfiguration<int>.ReduceFunction, + GenericType<IntSumReduceFunction>.Class) + .Build()) + .SetPartitionedDatasetConfiguration( + RandomDataConfiguration.ConfigurationModule.Set(RandomDataConfiguration.NumberOfPartitions, + numberofMappers.ToString()).Build()) .SetJobName("MapperCount") - .Build() - ); - return results.First(); + .SetNumberOfMappers(numberofMappers) + .Build()); + + if (results != null) + { + return results.First(); + } + + return -1; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs index f69605f..72f25ec 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs @@ -36,11 +36,20 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount { } + /// <summary> + /// Update function + /// </summary> + /// <param name="input">Input containing sum of all mappers</param> + /// <returns>The Update Result with only result</returns> public UpdateResult<int, int> Update(int input) { return UpdateResult<int, int>.Done(input); } + /// <summary> + /// Initialize function. Sends 1 to all mappers + /// </summary> + /// <returns>Map input</returns> public UpdateResult<int, int> Initialize() { return UpdateResult<int, int>.AnotherRound(1); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs index 8ff36ab..2009d4c 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs @@ -29,9 +29,9 @@ namespace Org.Apache.REEF.IMRU.InProcess /// <summary> /// Simple, single-threaded executor for IMRU Jobs. /// </summary> - /// <typeparam name="TMapInput"></typeparam> - /// <typeparam name="TMapOutput"></typeparam> - /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TMapInput">Input to map function</typeparam> + /// <typeparam name="TMapOutput">Output of map function</typeparam> + /// <typeparam name="TResult">Final result</typeparam> internal sealed class IMRURunner<TMapInput, TMapOutput, TResult> { private readonly ISet<IMapFunction<TMapInput, TMapOutput>> _mapfunctions; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs index 4207bd4..bb22945 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs @@ -17,11 +17,13 @@ * under the License. */ +using System; using System.Collections.Generic; using System.Diagnostics; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.InProcess.Parameters; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; @@ -52,19 +54,30 @@ namespace Org.Apache.REEF.IMRU.InProcess _numberOfMappers = numberOfMappers; } + /// <summary> + /// Submits the map job + /// </summary> + /// <param name="jobDefinition">Job definition given by the user</param> + /// <returns>The result of the job</returns> public IEnumerable<TResult> Submit(IMRUJobDefinition jobDefinition) { - var injector = TangFactory.GetTang().NewInjector(jobDefinition.Configuration); + var mergedConfig = Configurations.Merge( + jobDefinition.ReduceFunctionConfiguration, + jobDefinition.UpdateFunctionConfiguration); + + var injector = TangFactory.GetTang().NewInjector(mergedConfig); injector.BindVolatileInstance(GenericType<MapFunctions<TMapInput, TMapOutput>>.Class, - MakeMapFunctions(injector)); + MakeMapFunctions(jobDefinition.MapFunctionConfiguration)); var runner = injector.GetInstance<IMRURunner<TMapInput, TMapOutput, TResult>>(); return runner.Run(); } - private MapFunctions<TMapInput, TMapOutput> MakeMapFunctions(IInjector injector) + private MapFunctions<TMapInput, TMapOutput> MakeMapFunctions(IConfiguration configuration) { + var injector = TangFactory.GetTang().NewInjector(configuration); + ISet<IMapFunction<TMapInput, TMapOutput>> mappers = new HashSet<IMapFunction<TMapInput, TMapOutput>>(); for (var i = 0; i < _numberOfMappers; ++i) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs index 75346ea..6a2f01b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs @@ -37,6 +37,9 @@ namespace Org.Apache.REEF.IMRU.InProcess /// </summary> public static readonly OptionalParameter<int> NumberOfMappers = new OptionalParameter<int>(); + /// <summary> + /// Configuration module + /// </summary> public static ConfigurationModule ConfigurationModule = new InProcessIMRUConfiguration<TMapInput, TMapOutput, TResult>() .BindImplementation(GenericType<IIMRUClient<TMapInput, TMapOutput, TResult>>.Class, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index ca0ae8e..5ae900f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -41,15 +41,16 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> - <Compile Include="API\IMRUConfiguration.cs" /> <Compile Include="API\IIMRUClient.cs" /> <Compile Include="API\IMapFunction.cs" /> + <Compile Include="API\IMRUCodecConfiguration.cs" /> + <Compile Include="API\IMRUPipelineDataConverterConfiguration.cs" /> <Compile Include="API\IMRUJobDefinition.cs" /> <Compile Include="API\IMRUJobDefinitionBuilder.cs" /> + <Compile Include="API\IMRUMapConfiguration.cs" /> + <Compile Include="API\IMRUReduceFunctionConfiguration.cs" /> + <Compile Include="API\IMRUUpdateConfiguration.cs" /> <Compile Include="API\IUpdateFunction.cs" /> - <Compile Include="API\Parameters\MapInputCodec.cs" /> - <Compile Include="API\Parameters\MapOutputCodec.cs" /> - <Compile Include="API\Parameters\ResultCodec.cs" /> <Compile Include="API\UpdateResult.cs" /> <Compile Include="Examples\MapperCount\MapperCount.cs" /> <Compile Include="Examples\MapperCount\IdentityMapFunction.cs" /> @@ -95,6 +96,10 @@ under the License. <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> <Name>Org.Apache.REEF.Wake</Name> </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj"> + <Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project> + <Name>Org.Apache.REEF.IO</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <None Include="Org.Apache.REEF.IMRU.nuspec" />
