Repository: incubator-reef Updated Branches: refs/heads/master ee1a6295e -> 61e882a15
[REEF-735] Update test cases and examples to use new REEF Client API Updated existing tests cases and examples that still use old REEF Client and mark ClrClientHelper.Run as Obsolete JIRA: [REEF-735](https://issues.apache.org/jira/browse/REEF-735) Pull Request: This closes #481 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/61e882a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/61e882a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/61e882a1 Branch: refs/heads/master Commit: 61e882a159f03ea62ae22bc4da730d9926b299ce Parents: ee1a629 Author: Julia Wang <[email protected]> Authored: Thu Sep 10 13:27:41 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Sep 10 18:27:44 2015 -0700 ---------------------------------------------------------------------- .../Bridge/ClrClientHelper.cs | 99 ++++++++++---------- .../BroadcastAndReduceClient.cs | 52 +++++++--- .../PipelineBroadcastAndReduceClient.cs | 18 +--- .../Functional/Bridge/TestBridgeClient.cs | 10 +- .../Bridge/TestSimpleEventHandlers.cs | 5 +- .../Functional/Driver/TestDriver.cs | 18 ++-- .../Functional/Group/BroadcastReduceTest.cs | 33 +++---- .../Group/PipelinedBroadcastReduceTest.cs | 32 +++---- .../Functional/Group/ScatterReduceTest.cs | 56 +++++------ .../Functional/ML/KMeans/TestKMeans.cs | 19 ++-- .../Functional/Messaging/TestTaskMessage.cs | 33 +++---- .../Functional/ReefFunctionalTest.cs | 21 +---- 12 files changed, 195 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrClientHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrClientHelper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrClientHelper.cs index d032241..72b740a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrClientHelper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrClientHelper.cs @@ -32,7 +32,10 @@ namespace Org.Apache.REEF.Driver.Bridge { private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrClientHelper)); - public static void Run(HashSet<string> appDlls, IConfiguration driverBridgeConfig, DriverSubmissionSettings driverSubmissionSettings, string reefJar = Constants.JavaBridgeJarFileName, string runCommand = "run.cmd", string clrFolder = ".", string className = Constants.BridgeLaunchClass) + [Obsolete("please use ReefClient API")] + public static void Run(HashSet<string> appDlls, IConfiguration driverBridgeConfig, + DriverSubmissionSettings driverSubmissionSettings, string reefJar = Constants.JavaBridgeJarFileName, + string runCommand = "run.cmd", string clrFolder = ".", string className = Constants.BridgeLaunchClass) { using (LOGGER.LogFunction("ClrHandlerHelper::Run")) { @@ -44,12 +47,12 @@ namespace Org.Apache.REEF.Driver.Bridge using (LOGGER.LogScope("ClrHandlerHelper::serialize driverBridgeConfig to clrRuntimeConfigFile")) { - string clrRuntimeConfigFile = Path.Combine(clrFolder, Constants.DriverBridgeConfiguration); + var clrRuntimeConfigFile = Path.Combine(clrFolder, Constants.DriverBridgeConfiguration); new AvroConfigurationSerializer().ToFile(driverBridgeConfig, clrRuntimeConfigFile); LOGGER.Log(Level.Info, "CLR driver bridge configurations written to " + clrRuntimeConfigFile); } - ProcessStartInfo startInfo = new ProcessStartInfo(); + var startInfo = new ProcessStartInfo(); if (driverSubmissionSettings.RunOnYarn) { startInfo.FileName = runCommand; @@ -59,7 +62,7 @@ namespace Org.Apache.REEF.Driver.Bridge else { startInfo.FileName = GetJavaBinary(); - string loggingPrefix = string.Empty; + var loggingPrefix = string.Empty; if (driverSubmissionSettings.JavaLogLevel == JavaLoggingSetting.VERBOSE_TO_CLR) { loggingPrefix = Constants.JavaToCLRLoggingConfig + " "; @@ -75,39 +78,40 @@ namespace Org.Apache.REEF.Driver.Bridge startInfo.UseShellExecute = false; startInfo.CreateNoWindow = false; LOGGER.Log(Level.Info, "Executing\r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); - using (Process process = Process.Start(startInfo)) + using (var process = Process.Start(startInfo)) { process.WaitForExit(); } } } + [Obsolete("please use ReefClient API")] public static void UpdateJarFileWithAssemblies(string reefJar) { using (LOGGER.LogFunction("ClrHandlerHelper::UpdateJarFileWithAssemblies")) { - string assembliesList = ClrHandlerHelper.GetAssembliesListForReefDriverApp(); + var assembliesList = ClrHandlerHelper.GetAssembliesListForReefDriverApp(); if (!File.Exists(reefJar)) { throw new InvalidOperationException("cannot find reef jar file: " + reefJar); } - ProcessStartInfo startInfo = new ProcessStartInfo() - { - FileName = GetJarBinary(), - Arguments = @"uf " + reefJar + " " + assembliesList, - RedirectStandardOutput = true, - RedirectStandardError = true, - UseShellExecute = false, - CreateNoWindow = true - }; + var startInfo = new ProcessStartInfo + { + FileName = GetJarBinary(), + Arguments = @"uf " + reefJar + " " + assembliesList, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true + }; LOGGER.Log(Level.Info, "updating jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); - using (Process process = Process.Start(startInfo)) + using (var process = Process.Start(startInfo)) { - StreamReader outReader = process.StandardOutput; - StreamReader errorReader = process.StandardError; - string output = outReader.ReadToEnd(); - string error = errorReader.ReadToEnd(); + var outReader = process.StandardOutput; + var errorReader = process.StandardError; + var output = outReader.ReadToEnd(); + var error = errorReader.ReadToEnd(); process.WaitForExit(); if (process.ExitCode != 0) { @@ -121,37 +125,38 @@ namespace Org.Apache.REEF.Driver.Bridge public static void ExtractConfigfileFromJar(string reefJar, IList<string> configFiles, string dropFolder) { - var configFileNames = string.Join(" ", configFiles.ToArray()); - ProcessStartInfo startInfo = new ProcessStartInfo() - { - FileName = GetJarBinary(), - Arguments = @"xf " + reefJar + " " + configFileNames, - RedirectStandardOutput = true, - RedirectStandardError = true, - UseShellExecute = false, - CreateNoWindow = true - }; + var configFileNames = string.Join(" ", configFiles.ToArray()); + var startInfo = new ProcessStartInfo + { + FileName = GetJarBinary(), + Arguments = @"xf " + reefJar + " " + configFileNames, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true + }; - LOGGER.Log(Level.Info, "extracting files from jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); - using (Process process = Process.Start(startInfo)) + LOGGER.Log(Level.Info, + "extracting files from jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); + using (var process = Process.Start(startInfo)) + { + var outReader = process.StandardOutput; + var errorReader = process.StandardError; + var output = outReader.ReadToEnd(); + var error = errorReader.ReadToEnd(); + process.WaitForExit(); + if (process.ExitCode != 0) { - StreamReader outReader = process.StandardOutput; - StreamReader errorReader = process.StandardError; - string output = outReader.ReadToEnd(); - string error = errorReader.ReadToEnd(); - process.WaitForExit(); - if (process.ExitCode != 0) - { - throw new InvalidOperationException("Failed to extract files from jar file with stdout :" + output + - "and stderr:" + error); - } + throw new InvalidOperationException("Failed to extract files from jar file with stdout :" + output + + "and stderr:" + error); } - LOGGER.Log(Level.Info, "files are extracted."); + } + LOGGER.Log(Level.Info, "files are extracted."); } - + private static string GetJarBinary() { - string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); + var javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); if (string.IsNullOrWhiteSpace(javaHome)) { LOGGER.Log(Level.Info, "JAVA_HOME not set. Please set JAVA_HOME environment variable first. Exiting..."); @@ -162,7 +167,7 @@ namespace Org.Apache.REEF.Driver.Bridge private static string GetJavaBinary() { - string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); + var javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); if (string.IsNullOrWhiteSpace(javaHome)) { LOGGER.Log(Level.Info, "JAVA_HOME not set. Please set JAVA_HOME environment variable first. Exiting..."); @@ -171,4 +176,4 @@ namespace Org.Apache.REEF.Driver.Bridge return Path.Combine(javaHome, "bin", "java.exe"); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs index 9d21d8b..1c02ecd 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs @@ -17,17 +17,18 @@ * under the License. */ -using System.Collections.Generic; +using System; using System.Globalization; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; +using System.IO; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Local; +using Org.Apache.REEF.Client.YARN; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Network.Examples.GroupCommunication; using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -38,6 +39,10 @@ namespace Org.Apache.REEF.Network.Examples.Client { class BroadcastAndReduceClient { + const string Local = "local"; + const string Yarn = "yarn"; + const string DefaultRuntimeFolder = "REEF_LOCAL_RUNTIME"; + public void RunBroadcastAndReduce(bool runOnYarn, int numTasks, int startingPortNo, int portRange) { const int numIterations = 10; @@ -85,14 +90,39 @@ namespace Org.Apache.REEF.Network.Examples.Client merged = Configurations.Merge(merged, taskConfig); - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + string runPlatform = runOnYarn ? "yarn" : "local"; + TestRun(merged, typeof(BroadcastReduceDriver), numTasks, "BroadcastReduceDriver", runPlatform); + } + + internal static void TestRun(IConfiguration driverCondig, Type globalAssemblyType, int numberOfEvaluator, string jobIdentifier = "myDriver", string runOnYarn = "local", string runtimeFolder = DefaultRuntimeFolder) + { + IInjector injector = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, numberOfEvaluator, runtimeFolder)); + var reefClient = injector.GetInstance<IREEFClient>(); + var jobSubmissionBuilderFactory = injector.GetInstance<JobSubmissionBuilderFactory>(); + var jobSubmission = jobSubmissionBuilderFactory.GetJobSubmissionBuilder() + .AddDriverConfiguration(driverCondig) + .AddGlobalAssemblyForType(globalAssemblyType) + .SetJobIdentifier(jobIdentifier) + .Build(); + + reefClient.Submit(jobSubmission); + } - ClrClientHelper.Run(appDlls, merged, new DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = JavaLoggingSetting.VERBOSE }); + internal static IConfiguration GetRuntimeConfiguration(string runOnYarn, int numberOfEvaluator, string runtimeFolder) + { + switch (runOnYarn) + { + case Local: + var dir = Path.Combine(".", runtimeFolder); + return LocalRuntimeClientConfiguration.ConfigurationModule + .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, numberOfEvaluator.ToString()) + .Set(LocalRuntimeClientConfiguration.RuntimeFolder, dir) + .Build(); + case Yarn: + return YARNClientConfiguration.ConfigurationModule.Build(); + default: + throw new Exception("Unknown runtime: " + runOnYarn); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs index 76aa58b..40e1449 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs @@ -17,21 +17,13 @@ * under the License. */ -using System; -using System.Collections.Generic; using System.Globalization; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Network.Examples.GroupCommunication; using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -90,14 +82,8 @@ namespace Org.Apache.REEF.Network.Examples.Client merged = Configurations.Merge(merged, taskConfig); - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - ClrClientHelper.Run(appDlls, merged, new DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = JavaLoggingSetting.VERBOSE }); + string runPlatform = runOnYarn ? "yarn" : "local"; + BroadcastAndReduceClient.TestRun(merged, typeof(PipelinedBroadcastReduceDriver), numTasks, "PipelinedBroadcastReduceDriver", runPlatform); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs index 77f3154..aa582c4 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestBridgeClient.cs @@ -47,7 +47,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge [Ignore] //This test needs to be run on Yarn environment with test framework installed. public void CanRunClrBridgeExampleOnYarn() { - RunClrBridgeClient(true); + string testRuntimeFolder = DefaultRuntimeFolder + TestNumber++; + RunClrBridgeClient(true, testRuntimeFolder); } [TestMethod, Priority(1), TestCategory("FunctionalGated")] @@ -56,12 +57,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge [Timeout(180 * 1000)] public void CanRunClrBridgeExampleOnLocalRuntime() { - RunClrBridgeClient(false); + string testRuntimeFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testRuntimeFolder); + RunClrBridgeClient(false, testRuntimeFolder); } - private void RunClrBridgeClient(bool runOnYarn) + private void RunClrBridgeClient(bool runOnYarn, string testRuntimeFolder) { - string testRuntimeFolder = DefaultRuntimeFolder + TestNumber++; string[] a = new[] { runOnYarn ? "yarn" : "local", testRuntimeFolder }; AllHandlers.Run(a); ValidateSuccessForLocalRuntime(2, testRuntimeFolder); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs index eac060f..660af9c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs @@ -49,13 +49,14 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public void RunSimpleEventHandlerOnLocalRuntime() { string testFolder = DefaultRuntimeFolder + TestNumber++; - TestRun(DriverConfiguration(), typeof(HelloSimpleEventHandlers), "simpleHandler", "local", testFolder); + CleanUp(testFolder); + TestRun(DriverConfigurations(), typeof(HelloSimpleEventHandlers), 2, "simpleHandler", "local", testFolder); ValidateSuccessForLocalRuntime(1, testFolder); ValidateEvaluatorSetting(testFolder); CleanUp(testFolder); } - public IConfiguration DriverConfiguration() + public IConfiguration DriverConfigurations() { var helloDriverConfiguration = REEF.Driver.DriverConfiguration.ConfigurationModule .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, GenericType<HelloSimpleEventHandlers>.Class) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs index 3de3ed2..02e5b3d 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs @@ -44,7 +44,7 @@ namespace Org.Apache.REEF.Tests.Functional.Driver } /// <summary> - /// This is to test DriverTestStartHandler. No evaluator and tasks are involked. + /// This is to test DriverTestStartHandler. No evaluator and tasks are involved. /// </summary> [TestMethod, Priority(1), TestCategory("FunctionalGated")] [Description("Test DriverTestStartHandler. No evaluator and tasks are invoked")] @@ -52,18 +52,20 @@ namespace Org.Apache.REEF.Tests.Functional.Driver [Timeout(180 * 1000)] public void TestDriverStart() { + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestRun(DriverConfigurations(), typeof(DriverTestStartHandler), 0, "DriverTestStartHandler", "local", testFolder); + ValidateSuccessForLocalRuntime(0, testFolder); + } + + public IConfiguration DriverConfigurations() + { IConfiguration driverConfig = DriverConfiguration.ConfigurationModule .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverTestStartHandler>.Class) .Set(DriverConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) .Build(); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(DriverTestStartHandler).Assembly.GetName().Name); - - TestRun(appDlls, driverConfig); - - ValidateSuccessForLocalRuntime(0); + return driverConfig; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs index 6150d41..7119ffb 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs @@ -17,18 +17,14 @@ * under the License. */ -using System.Collections.Generic; using System.Globalization; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Network.Examples.GroupCommunication; using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -56,8 +52,10 @@ namespace Org.Apache.REEF.Tests.Functional.Group public void TestBroadcastAndReduceOnLocalRuntime() { int numTasks = 9; - TestBroadcastAndReduce(false, numTasks); - ValidateSuccessForLocalRuntime(numTasks); + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestBroadcastAndReduce(false, numTasks, testFolder); + ValidateSuccessForLocalRuntime(numTasks, testFolder); } [Ignore] @@ -65,11 +63,17 @@ namespace Org.Apache.REEF.Tests.Functional.Group public void TestBroadcastAndReduceOnYarn() { int numTasks = 9; - TestBroadcastAndReduce(true, numTasks); + string testFolder = DefaultRuntimeFolder + TestNumber++; + TestBroadcastAndReduce(true, numTasks, testFolder); } - [TestMethod] - public void TestBroadcastAndReduce(bool runOnYarn, int numTasks) + private void TestBroadcastAndReduce(bool runOnYarn, int numTasks, string testFolder) + { + string runPlatform = runOnYarn ? "yarn" : "local"; + TestRun(DriverConfigurations(numTasks), typeof(BroadcastReduceDriver), numTasks, "BroadcastReduceDriver", runPlatform, testFolder); + } + + public IConfiguration DriverConfigurations(int numTasks) { IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( DriverConfiguration.ConfigurationModule @@ -102,16 +106,7 @@ namespace Org.Apache.REEF.Tests.Functional.Group .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(NameClient).Assembly.GetName().Name) .Build(); - merged = Configurations.Merge(merged, taskConfig); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); + return Configurations.Merge(merged, taskConfig); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs index 583960e..0be72e3 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs @@ -17,18 +17,14 @@ * under the License. */ -using System.Collections.Generic; using System.Globalization; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Network.Examples.GroupCommunication; using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -56,8 +52,10 @@ namespace Org.Apache.REEF.Tests.Functional.Group public void TestPipelinedBroadcastAndReduceOnLocalRuntime() { const int numTasks = 9; - TestBroadcastAndReduce(false, numTasks); - ValidateSuccessForLocalRuntime(numTasks); + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestBroadcastAndReduce(false, numTasks, testFolder); + ValidateSuccessForLocalRuntime(numTasks, testFolder); } [Ignore] @@ -65,10 +63,17 @@ namespace Org.Apache.REEF.Tests.Functional.Group public void TestPipelinedBroadcastAndReduceOnYarn() { const int numTasks = 9; - TestBroadcastAndReduce(true, numTasks); + string testFolder = DefaultRuntimeFolder + TestNumber++; + TestBroadcastAndReduce(true, numTasks, testFolder); } - public void TestBroadcastAndReduce(bool runOnYarn, int numTasks) + private void TestBroadcastAndReduce(bool runOnYarn, int numTasks, string testFolder) + { + string runPlatform = runOnYarn ? "yarn" : "local"; + TestRun(DriverConfigurations(numTasks), typeof(PipelinedBroadcastReduceDriver), numTasks, "PipelinedBroadcastReduceDriver", runPlatform, testFolder); + } + + public IConfiguration DriverConfigurations(int numTasks) { IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( DriverConfiguration.ConfigurationModule @@ -104,16 +109,7 @@ namespace Org.Apache.REEF.Tests.Functional.Group .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(NameClient).Assembly.GetName().Name) .Build(); - merged = Configurations.Merge(merged, taskConfig); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); + return Configurations.Merge(merged, taskConfig); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs index 5b4e28b..0404cfb 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs @@ -17,18 +17,14 @@ * under the License. */ -using System.Collections.Generic; using System.Globalization; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Network.Examples.GroupCommunication; using Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -56,8 +52,10 @@ namespace Org.Apache.REEF.Tests.Functional.Group public void TestScatterAndReduceOnLocalRuntime() { int numTasks = 5; - TestScatterAndReduce(false, numTasks); - ValidateSuccessForLocalRuntime(numTasks); + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestScatterAndReduce(false, numTasks, testFolder); + ValidateSuccessForLocalRuntime(numTasks, testFolder); } [Ignore] @@ -65,24 +63,31 @@ namespace Org.Apache.REEF.Tests.Functional.Group public void TestScatterAndReduceOnYarn() { int numTasks = 5; - TestScatterAndReduce(true, numTasks); + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestScatterAndReduce(true, numTasks, testFolder); } - [TestMethod] - public void TestScatterAndReduce(bool runOnYarn, int numTasks) + private void TestScatterAndReduce(bool runOnYarn, int numTasks, string testFolder) + { + string runPlatform = runOnYarn ? "yarn" : "local"; + TestRun(DriverConfigurations(numTasks), typeof(ScatterReduceDriver), numTasks, "ScatterReduceDriver", runPlatform, testFolder); + } + + public IConfiguration DriverConfigurations(int numTasks) { IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( - DriverConfiguration.ConfigurationModule - .Set(DriverConfiguration.OnDriverStarted, GenericType<ScatterReduceDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ScatterReduceDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<ScatterReduceDriver>.Class) - .Set(DriverConfiguration.OnContextActive, GenericType<ScatterReduceDriver>.Class) - .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) - .Build()) - .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( - GenericType<GroupTestConfig.NumEvaluators>.Class, - numTasks.ToString(CultureInfo.InvariantCulture)) - .Build(); + DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<ScatterReduceDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ScatterReduceDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<ScatterReduceDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<ScatterReduceDriver>.Class) + .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() .BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(GroupTestConstants.DriverId) @@ -99,16 +104,7 @@ namespace Org.Apache.REEF.Tests.Functional.Group .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(NameClient).Assembly.GetName().Name) .Build(); - merged = Configurations.Merge(merged, taskConfig); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); + return Configurations.Merge(merged, taskConfig); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs index 87ce4db..a088c3c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs @@ -127,8 +127,10 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans public void TestKMeansOnLocalRuntimeWithGroupCommunications() { IsOnLocalRuntiime = true; - TestRun(AssembliesToCopy(), DriverConfiguration()); - ValidateSuccessForLocalRuntime(Partitions + 1); + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder); + ValidateSuccessForLocalRuntime(Partitions + 1, testFolder); } [TestMethod, Priority(1), TestCategory("FunctionalGated")] @@ -139,7 +141,8 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans [Ignore] // ignored by default public void TestKMeansOnYarnOneBoxWithGroupCommunications() { - TestRun(AssembliesToCopy(), DriverConfiguration(), runOnYarn: true); + string testFolder = DefaultRuntimeFolder + TestNumber++; + TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder); Assert.IsNotNull("BreakPointChecker"); } @@ -177,15 +180,5 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans return Configurations.Merge(merged, taskConfig); } - - private HashSet<string> AssembliesToCopy() - { - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(LegacyKMeansTask).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(KMeansDriverHandlers).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - return appDlls; - } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs index 063e0ff..009c1eb 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs @@ -17,7 +17,6 @@ * under the License. */ -using System.Collections.Generic; using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; @@ -57,28 +56,30 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging [Timeout(180 * 1000)] public void TestSendTaskMessage() { + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestRun(DriverConfigurations(), typeof(MessageDriver), 1, "simpleHandler", "local", testFolder); + ValidateSuccessForLocalRuntime(1, testFolder); + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurations() + { IConfiguration driverConfig = DriverConfiguration.ConfigurationModule - .Set(DriverConfiguration.OnDriverStarted, GenericType<MessageDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<MessageDriver>.Class) - .Set(DriverConfiguration.OnTaskMessage, GenericType<MessageDriver>.Class) - .Set(DriverConfiguration.OnTaskRunning, GenericType<MessageDriver>.Class) - .Set(DriverConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) - .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) - .Build(); + .Set(DriverConfiguration.OnDriverStarted, GenericType<MessageDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<MessageDriver>.Class) + .Set(DriverConfiguration.OnTaskMessage, GenericType<MessageDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<MessageDriver>.Class) + .Set(DriverConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) + .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build(); IConfiguration taskConfig = TangFactory.GetTang().NewConfigurationBuilder() .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(MessageTask).Assembly.GetName().Name) .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(NameClient).Assembly.GetName().Name) .Build(); - IConfiguration merged = Configurations.Merge(driverConfig, taskConfig); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(MessageDriver).Assembly.GetName().Name); - appDlls.Add(typeof(MessageTask).Assembly.GetName().Name); - - TestRun(appDlls, merged); - ValidateSuccessForLocalRuntime(MessageDriver.NumerOfEvaluator); + return Configurations.Merge(driverConfig, taskConfig); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/61e882a1/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index 597b0fe..38a773d 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -18,7 +18,6 @@ */ using System; -using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; @@ -31,9 +30,6 @@ using Microsoft.WindowsAzure.Storage.Blob; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Local; using Org.Apache.REEF.Client.YARN; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Examples.AllHandlers; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; @@ -59,10 +55,6 @@ namespace Org.Apache.REEF.Tests.Functional private readonly static Logger Logger = Logger.GetLogger(typeof(ReefFunctionalTest)); private const string StorageAccountKeyEnvironmentVariable = "REEFTestStorageAccountKey"; private const string StorageAccountNameEnvironmentVariable = "REEFTestStorageAccountName"; - private readonly string _className = Constants.BridgeLaunchClass; - private readonly string _clrFolder = "."; - private readonly string _reefJar = Path.Combine(_binFolder, Constants.JavaBridgeJarFileName); - private readonly string _runCommand = Path.Combine(_binFolder, _cmdFile); private bool _testSuccess = false; private bool _onLocalRuntime = false; @@ -114,11 +106,6 @@ namespace Org.Apache.REEF.Tests.Functional } } - protected void TestRun(HashSet<string> appDlls, IConfiguration driverBridgeConfig, bool runOnYarn = false, JavaLoggingSetting javaLogSettings = JavaLoggingSetting.INFO) - { - ClrClientHelper.Run(appDlls, driverBridgeConfig, new DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = javaLogSettings }, _reefJar, _runCommand, _clrFolder, _className); - } - protected void CleanUp(string testFolder = DefaultRuntimeFolder) { Console.WriteLine("Cleaning up test."); @@ -250,9 +237,9 @@ namespace Org.Apache.REEF.Tests.Functional return result; } - protected void TestRun(IConfiguration driverCondig, Type globalAssemblyType, string jobIdentifier = "myDriver", string runOnYarn = "local", string runtimeFolder = DefaultRuntimeFolder) + protected void TestRun(IConfiguration driverCondig, Type globalAssemblyType, int numberOfEvaluator, string jobIdentifier = "myDriver", string runOnYarn = "local", string runtimeFolder = DefaultRuntimeFolder) { - IInjector injector = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, runtimeFolder)); + IInjector injector = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, numberOfEvaluator, runtimeFolder)); var reefClient = injector.GetInstance<IREEFClient>(); var jobSubmissionBuilderFactory = injector.GetInstance<JobSubmissionBuilderFactory>(); var jobSubmission = jobSubmissionBuilderFactory.GetJobSubmissionBuilder() @@ -264,14 +251,14 @@ namespace Org.Apache.REEF.Tests.Functional reefClient.Submit(jobSubmission); } - private IConfiguration GetRuntimeConfiguration(string runOnYarn, string runtimeFolder) + private IConfiguration GetRuntimeConfiguration(string runOnYarn, int numberOfEvaluator, string runtimeFolder) { switch (runOnYarn) { case Local: var dir = Path.Combine(".", runtimeFolder); return LocalRuntimeClientConfiguration.ConfigurationModule - .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, "2") + .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, numberOfEvaluator.ToString()) .Set(LocalRuntimeClientConfiguration.RuntimeFolder, dir) .Build(); case YARN:
