Repository: incubator-reef
Updated Branches:
  refs/heads/master a4c78c4a4 -> d92ab9885


[REEF-628] A Launcher for MapperCount and BroadcastReduce examples

This adds a launcher for MapperCount and BroadcastReduce examples with
the option for user to specify - example to run, number of nodes,
runtime - local or yarn, port specification and some other example
specific parameters

JIRA:
  [REEF-628](https://issues.apache.org/jira/browse/REEF-628)

Pull Request:
  This closes #401


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/d92ab988
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/d92ab988
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/d92ab988

Branch: refs/heads/master
Commit: d92ab9885f6cd955b5f9098e6e47164fa216b6c2
Parents: a4c78c4
Author: Dhruv <[email protected]>
Authored: Fri Aug 21 14:19:54 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 24 12:49:24 2015 -0700

----------------------------------------------------------------------
 .../OnREEFIMRURunTimeConfiguration.cs           |  80 ++++++++++++
 .../Org.Apache.REEF.IMRU.Examples.csproj        |   1 +
 lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs    | 128 ++++++++++++++++++-
 3 files changed, 206 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d92ab988/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
new file mode 100644
index 0000000..870e010
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Globalization;
+using Org.Apache.REEF.Client.Local;
+using Org.Apache.REEF.Client.YARN;
+using Org.Apache.REEF.IMRU.OnREEF.Client;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.IMRU.Examples
+{
+    /// <summary>
+    /// Configuration for Runtime for IMRU on REEF.
+    /// </summary>
+    /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
+    /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
+    /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
+    internal static class OnREEFIMRURunTimeConfiguration<TMapInput, 
TMapOutput, TResult>
+    {
+        /// <summary>
+        /// Function that specifies local runtime configuration for IMRU
+        /// </summary>
+        /// <returns>The local runtime configuration</returns>
+        internal static IConfiguration GetLocalIMRUConfiguration(int numNodes, 
params string[] runTimeDir)
+        {
+            IConfiguration runtimeConfig;
+            IConfiguration imruClientConfig =
+                REEFIMRUClientConfiguration<TMapInput, TMapOutput, 
TResult>.ConfigurationModule.Build();
+
+            if (runTimeDir.Length != 0)
+            {
+                runtimeConfig = 
LocalRuntimeClientConfiguration.ConfigurationModule
+                    .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators,
+                        numNodes.ToString(CultureInfo.InvariantCulture))
+                    .Set(LocalRuntimeClientConfiguration.RuntimeFolder, 
runTimeDir[0])
+                    .Build();
+            }
+            else
+            {
+                runtimeConfig = 
LocalRuntimeClientConfiguration.ConfigurationModule
+                   .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators,
+                       numNodes.ToString(CultureInfo.InvariantCulture))
+                   .Build();
+            }
+
+            return Configurations.Merge(runtimeConfig, imruClientConfig);
+        }
+
+        /// <summary>
+        /// Function that specifies yarn runtime configuration for IMRU on the 
cluster
+        /// </summary>
+        /// <returns>The yarn runtime configuration</returns>
+        internal static IConfiguration GetYarnIMRUConfiguration()
+        {
+            IConfiguration imruClientConfig =
+                REEFIMRUClientConfiguration<TMapInput, TMapOutput, 
TResult>.ConfigurationModule.Build();
+
+            IConfiguration runtimeConfig =
+                YARNClientConfiguration.ConfigurationModule.Build();
+            return Configurations.Merge(runtimeConfig, imruClientConfig);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d92ab988/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index ceaa644..9096b1f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,6 +46,7 @@ under the License.
     <Compile Include="MapperCount\IntSumReduceFunction.cs" />
     <Compile Include="MapperCount\MapperCount.cs" />
     <Compile Include="MapperCount\MapperCountUpdateFunction.cs" />
+    <Compile Include="OnREEFIMRURunTimeConfiguration.cs" />
     <Compile 
Include="PipelinedBroadcastReduce\BroadcastReceiverReduceSenderMapFunction.cs" 
/>
     <Compile 
Include="PipelinedBroadcastReduce\BroadcastReduceConfiguration.cs" />
     <Compile 
Include="PipelinedBroadcastReduce\BroadcastSenderReduceReceiverUpdateFunction.cs"
 />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/d92ab988/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index 9d3892d..c0a6a3b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -20,10 +20,11 @@
 using System;
 using System.Globalization;
 using System.Linq;
-using Org.Apache.REEF.IMRU.OnREEF;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Wake.Remote.Parameters;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IMRU.Examples
 {
@@ -32,8 +33,129 @@ namespace Org.Apache.REEF.IMRU.Examples
     /// </summary>
     public class Run
     {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(Run));
+
+        private static void RunMapperTest(IConfiguration tcpPortConfig, bool 
runOnYarn, int numNodes)
+        {
+            IInjector injector;
+
+            if (!runOnYarn)
+            {
+                injector =
+                    TangFactory.GetTang()
+                        .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, 
int>.GetLocalIMRUConfiguration(numNodes), tcpPortConfig);
+            }
+            else
+            {
+                injector = TangFactory.GetTang()
+                    .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, 
int>.GetYarnIMRUConfiguration(), tcpPortConfig);
+            }
+
+            var mapperCountExample = 
injector.GetInstance<MapperCount.MapperCount>();
+            mapperCountExample.Run(numNodes - 1);
+        }
+
+        private static void RunBroadcastReduceTest(IConfiguration 
tcpPortConfig, bool runOnYarn, int numNodes, string[] args)
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+
+            if (args.Length > 0)
+            {
+                dims = Convert.ToInt32(args[0]);
+            }
+
+            if (args.Length > 1)
+            {
+                chunkSize = Convert.ToInt32(args[1]);
+            }
+
+            if (args.Length > 2)
+            {
+                iterations = Convert.ToInt32(args[2]);
+            }
+
+            IInjector injector;
+
+            if (!runOnYarn)
+            {
+                injector =
+                    TangFactory.GetTang()
+                        .NewInjector(OnREEFIMRURunTimeConfiguration<int[], 
int[], int[]>.GetLocalIMRUConfiguration(numNodes), tcpPortConfig);
+            }
+            else
+            {
+                injector = TangFactory.GetTang()
+                    .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, 
int>.GetYarnIMRUConfiguration(), tcpPortConfig);
+            }
+            var broadcastReduceExample = 
injector.GetInstance<PipelinedBroadcastAndReduce>();
+            broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, 
dims);
+        }
+
         private static void Main(string[] args)
-        {      
+        {
+            Logger.Log(Level.Info, "start running client: " + DateTime.Now);
+            string methodName = "MapperCount";
+            bool runOnYarn = false;
+            int numNodes = 2;
+            int startPort = 8900;
+            int portRange = 1000;
+
+            if (args != null)
+            {
+                if (args.Length > 0)
+                {
+                    runOnYarn = bool.Parse(args[0].ToLower());
+                }
+
+                if (args.Length > 1)
+                {
+                    numNodes = int.Parse(args[1]);
+                }
+
+                if (args.Length > 2)
+                {
+                    startPort = int.Parse(args[2]);
+                }
+
+                if (args.Length > 3)
+                {
+                    portRange = int.Parse(args[3]);
+                }
+
+                if (args.Length > 4)
+                {
+                    methodName = args[4];
+                }
+            }
+
+            var tcpPortConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter(typeof(TcpPortRangeStart),
+                    startPort.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(TcpPortRangeCount),
+                    portRange.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            switch (methodName.ToLower())
+            {
+                case "mappercount":
+                    Logger.Log(Level.Info, "Running Mapper count");
+                    RunMapperTest(tcpPortConfig, runOnYarn, numNodes);
+                    Logger.Log(Level.Info, "Done Running Mapper count");
+                    return;
+
+                case "broadcastandreduce":
+                    Logger.Log(Level.Info, "Running Broadcast and Reduce");
+                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, 
args.Skip(5).ToArray());
+                    Logger.Log(Level.Info, "Done Running Broadcast and 
Reduce");
+                    return;
+
+                default:
+                    Logger.Log(Level.Info, "wrong test name");
+                    return;
+
+            }
         }
     }
 }

Reply via email to