Repository: incubator-reef Updated Branches: refs/heads/master a4c78c4a4 -> d92ab9885
[REEF-628] A Launcher for MapperCount and BroadcastReduce examples This adds a launcher for MapperCount and BroadcastReduce examples with the option for user to specify - example to run, number of nodes, runtime - local or yarn, port specification and some other example specific parameters JIRA: [REEF-628](https://issues.apache.org/jira/browse/REEF-628) Pull Request: This closes #401 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d92ab988 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d92ab988 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d92ab988 Branch: refs/heads/master Commit: d92ab9885f6cd955b5f9098e6e47164fa216b6c2 Parents: a4c78c4 Author: Dhruv <[email protected]> Authored: Fri Aug 21 14:19:54 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Aug 24 12:49:24 2015 -0700 ---------------------------------------------------------------------- .../OnREEFIMRURunTimeConfiguration.cs | 80 ++++++++++++ .../Org.Apache.REEF.IMRU.Examples.csproj | 1 + lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 128 ++++++++++++++++++- 3 files changed, 206 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d92ab988/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs new file mode 100644 index 0000000..870e010 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Globalization; +using Org.Apache.REEF.Client.Local; +using Org.Apache.REEF.Client.YARN; +using Org.Apache.REEF.IMRU.OnREEF.Client; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.IMRU.Examples +{ + /// <summary> + /// Configuration for Runtime for IMRU on REEF. + /// </summary> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + internal static class OnREEFIMRURunTimeConfiguration<TMapInput, TMapOutput, TResult> + { + /// <summary> + /// Function that specifies local runtime configuration for IMRU + /// </summary> + /// <returns>The local runtime configuration</returns> + internal static IConfiguration GetLocalIMRUConfiguration(int numNodes, params string[] runTimeDir) + { + IConfiguration runtimeConfig; + IConfiguration imruClientConfig = + REEFIMRUClientConfiguration<TMapInput, TMapOutput, TResult>.ConfigurationModule.Build(); + + if (runTimeDir.Length != 0) + { + runtimeConfig = LocalRuntimeClientConfiguration.ConfigurationModule + .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, + numNodes.ToString(CultureInfo.InvariantCulture)) + .Set(LocalRuntimeClientConfiguration.RuntimeFolder, runTimeDir[0]) + .Build(); + } + else + { + runtimeConfig = LocalRuntimeClientConfiguration.ConfigurationModule + .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, + numNodes.ToString(CultureInfo.InvariantCulture)) + .Build(); + } + + return Configurations.Merge(runtimeConfig, imruClientConfig); + } + + /// <summary> + /// Function that specifies yarn runtime configuration for IMRU on the cluster + /// </summary> + /// <returns>The yarn runtime configuration</returns> + internal static IConfiguration GetYarnIMRUConfiguration() + { + IConfiguration imruClientConfig = + REEFIMRUClientConfiguration<TMapInput, TMapOutput, TResult>.ConfigurationModule.Build(); + + IConfiguration runtimeConfig = + YARNClientConfiguration.ConfigurationModule.Build(); + return Configurations.Merge(runtimeConfig, imruClientConfig); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d92ab988/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj index ceaa644..9096b1f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj @@ -46,6 +46,7 @@ under the License. <Compile Include="MapperCount\IntSumReduceFunction.cs" /> <Compile Include="MapperCount\MapperCount.cs" /> <Compile Include="MapperCount\MapperCountUpdateFunction.cs" /> + <Compile Include="OnREEFIMRURunTimeConfiguration.cs" /> <Compile Include="PipelinedBroadcastReduce\BroadcastReceiverReduceSenderMapFunction.cs" /> <Compile Include="PipelinedBroadcastReduce\BroadcastReduceConfiguration.cs" /> <Compile Include="PipelinedBroadcastReduce\BroadcastSenderReduceReceiverUpdateFunction.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d92ab988/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs index 9d3892d..c0a6a3b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs @@ -20,10 +20,11 @@ using System; using System.Globalization; using System.Linq; -using Org.Apache.REEF.IMRU.OnREEF; +using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Wake.Remote.Parameters; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote.Parameters; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.Examples { @@ -32,8 +33,129 @@ namespace Org.Apache.REEF.IMRU.Examples /// </summary> public class Run { + private static readonly Logger Logger = Logger.GetLogger(typeof(Run)); + + private static void RunMapperTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes) + { + IInjector injector; + + if (!runOnYarn) + { + injector = + TangFactory.GetTang() + .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, int>.GetLocalIMRUConfiguration(numNodes), tcpPortConfig); + } + else + { + injector = TangFactory.GetTang() + .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, int>.GetYarnIMRUConfiguration(), tcpPortConfig); + } + + var mapperCountExample = injector.GetInstance<MapperCount.MapperCount>(); + mapperCountExample.Run(numNodes - 1); + } + + private static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, string[] args) + { + int chunkSize = 2; + int dims = 10; + int iterations = 10; + + if (args.Length > 0) + { + dims = Convert.ToInt32(args[0]); + } + + if (args.Length > 1) + { + chunkSize = Convert.ToInt32(args[1]); + } + + if (args.Length > 2) + { + iterations = Convert.ToInt32(args[2]); + } + + IInjector injector; + + if (!runOnYarn) + { + injector = + TangFactory.GetTang() + .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[], int[]>.GetLocalIMRUConfiguration(numNodes), tcpPortConfig); + } + else + { + injector = TangFactory.GetTang() + .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, int>.GetYarnIMRUConfiguration(), tcpPortConfig); + } + var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>(); + broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims); + } + private static void Main(string[] args) - { + { + Logger.Log(Level.Info, "start running client: " + DateTime.Now); + string methodName = "MapperCount"; + bool runOnYarn = false; + int numNodes = 2; + int startPort = 8900; + int portRange = 1000; + + if (args != null) + { + if (args.Length > 0) + { + runOnYarn = bool.Parse(args[0].ToLower()); + } + + if (args.Length > 1) + { + numNodes = int.Parse(args[1]); + } + + if (args.Length > 2) + { + startPort = int.Parse(args[2]); + } + + if (args.Length > 3) + { + portRange = int.Parse(args[3]); + } + + if (args.Length > 4) + { + methodName = args[4]; + } + } + + var tcpPortConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter(typeof(TcpPortRangeStart), + startPort.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(TcpPortRangeCount), + portRange.ToString(CultureInfo.InvariantCulture)) + .Build(); + + switch (methodName.ToLower()) + { + case "mappercount": + Logger.Log(Level.Info, "Running Mapper count"); + RunMapperTest(tcpPortConfig, runOnYarn, numNodes); + Logger.Log(Level.Info, "Done Running Mapper count"); + return; + + case "broadcastandreduce": + Logger.Log(Level.Info, "Running Broadcast and Reduce"); + RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, args.Skip(5).ToArray()); + Logger.Log(Level.Info, "Done Running Broadcast and Reduce"); + return; + + default: + Logger.Log(Level.Info, "wrong test name"); + return; + + } } } }
