http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs new file mode 100644 index 0000000..f2b1c4a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks +{ + public class SlaveTask : ITask + { + private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask)); + + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IScatterReceiver<int> _scatterReceiver; + private readonly IReduceSender<int> _sumSender; + + [Inject] + public SlaveTask(IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from slave task"); + + _mpiClient = mpiClient; + _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _scatterReceiver = _commGroup.GetScatterReceiver<int>(GroupTestConstants.ScatterOperatorName); + _sumSender = _commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + List<int> data = _scatterReceiver.Receive(); + _logger.Log(Level.Info, "Received data: {0}", string.Join(" ", data)); + + int sum = data.Sum(); + _logger.Log(Level.Info, "Sending back sum: {0}", sum); + _sumSender.Send(sum); + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj b/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj new file mode 100644 index 0000000..6914923 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <ProjectGuid>{B1B43B60-DDD0-4805-A9B4-BA84A0CCB7C7}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Network.Examples</RootNamespace> + <AssemblyName>Org.Apache.REEF.Network.Examples</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <RestorePackages>true</RestorePackages> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir> + </PropertyGroup> + <PropertyGroup> + <StartupObject /> + </PropertyGroup> + <Import Project="$(SolutionDir)\build.props" /> + <PropertyGroup> + <BuildPackage>false</BuildPackage> + </PropertyGroup> + <ItemGroup> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\BroadcastReduceDriver.cs" /> + <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\MasterTask.cs" /> + <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\SlaveTask.cs" /> + <Compile Include="GroupCommunication\GroupTestConfig.cs" /> + <Compile Include="GroupCommunication\GroupTestConstants.cs" /> + <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedBroadcastReduceDriver.cs" /> + <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedMasterTask.cs" /> + <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedSlaveTask.cs" /> + <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\MasterTask.cs" /> + <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\ScatterReduceDriver.cs" /> + <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\SlaveTask.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> + <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project> + <Name>Org.Apache.REEF.Common</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj"> + <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project> + <Name>Org.Apache.REEF.Driver</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj"> + <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project> + <Name>Org.Apache.REEF.Network</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..8fc8a33 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +using System.Reflection; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Network.Examples")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Network.Examples")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("a3eafd9d-1be4-44a7-9f1c-9a81a1e59897")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf b/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf deleted file mode 100644 index 67256f5..0000000 Binary files a/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs new file mode 100644 index 0000000..94529c1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.Examples.GroupCommunication; +using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Group +{ + [TestClass] + public class BroadcastReduceTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + [TestMethod] + public void TestBroadcastAndReduceOnLocalRuntime() + { + int numTasks = 9; + TestBroadcastAndReduce(false, numTasks); + ValidateSuccessForLocalRuntime(numTasks); + } + + [Ignore] + [TestMethod] + public void TestBroadcastAndReduceOnYarn() + { + int numTasks = 9; + TestBroadcastAndReduce(true, numTasks); + } + + [TestMethod] + public void TestBroadcastAndReduce(bool runOnYarn, int numTasks) + { + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId) + .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId) + .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName) + .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs new file mode 100644 index 0000000..0c918b5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.Examples.GroupCommunication; +using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Group +{ + [TestClass] + public class PipelinedBroadcastReduceTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + [TestMethod] + public void TestPipelinedBroadcastAndReduceOnLocalRuntime() + { + const int numTasks = 9; + TestBroadcastAndReduce(false, numTasks); + ValidateSuccessForLocalRuntime(numTasks); + } + + [Ignore] + [TestMethod] + public void TestPipelinedBroadcastAndReduceOnYarn() + { + const int numTasks = 9; + TestBroadcastAndReduce(true, numTasks); + } + + public void TestBroadcastAndReduce(bool runOnYarn, int numTasks) + { + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.ChunkSize, int>( + GenericType<GroupTestConfig.ChunkSize>.Class, + GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId) + .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId) + .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName) + .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs new file mode 100644 index 0000000..083261a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.Examples.GroupCommunication; +using Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Group +{ + [TestClass] + public class ScatterReduceTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + [TestMethod] + public void TestScatterAndReduceOnLocalRuntime() + { + int numTasks = 5; + TestScatterAndReduce(false, numTasks); + ValidateSuccessForLocalRuntime(numTasks); + } + + [Ignore] + [TestMethod] + public void TestScatterAndReduceOnYarn() + { + int numTasks = 5; + TestScatterAndReduce(true, numTasks); + } + + [TestMethod] + public void TestScatterAndReduce(bool runOnYarn, int numTasks) + { + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId) + .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId) + .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName) + .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString()) + .Build(); + + IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs deleted file mode 100644 index 8cc32b8..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Linq; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Network.Group.Config; -using Org.Apache.REEF.Network.Group.Driver; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Operators.Impl; -using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote.Impl; - -namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest -{ - public class BroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(BroadcastReduceDriver)); - - private readonly int _numEvaluators; - private readonly int _numIterations; - - private readonly IMpiDriver _mpiDriver; - private readonly ICommunicationGroupDriver _commGroup; - private readonly TaskStarter _mpiTaskStarter; - - [Inject] - public BroadcastReduceDriver( - [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, - [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations, - MpiDriver mpiDriver) - { - Identifier = "BroadcastStartHandler"; - _numEvaluators = numEvaluators; - _numIterations = numIterations; - _mpiDriver = mpiDriver; - _commGroup = _mpiDriver.DefaultGroup - .AddBroadcast<int, IntCodec>( - MpiTestConstants.BroadcastOperatorName, - MpiTestConstants.MasterTaskId) - .AddReduce<int, IntCodec>( - MpiTestConstants.ReduceOperatorName, - MpiTestConstants.MasterTaskId, - new SumFunction()) - .Build(); - - _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); - - CreateClassHierarchy(); - } - - public string Identifier { get; set; } - - public void OnNext(IEvaluatorRequestor evaluatorRequestor) - { - EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); - evaluatorRequestor.Submit(request); - } - - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); - IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); - allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); - } - - public void OnNext(IActiveContext activeContext) - { - if (_mpiDriver.IsMasterTaskContext(activeContext)) - { - // Configure Master Task - IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId) - .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class) - .Build()) - .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( - GenericType<MpiTestConfig.NumEvaluators>.Class, - _numEvaluators.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.NumIterations, int>( - GenericType<MpiTestConfig.NumIterations>.Class, - _numIterations.ToString(CultureInfo.InvariantCulture)) - .Build(); - - _commGroup.AddTask(MpiTestConstants.MasterTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); - } - else - { - // Configure Slave Task - string slaveTaskId = "SlaveTask-" + activeContext.Id; - IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, slaveTaskId) - .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class) - .Build()) - .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( - GenericType<MpiTestConfig.NumEvaluators>.Class, - _numEvaluators.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.NumIterations, int>( - GenericType<MpiTestConfig.NumIterations>.Class, - _numIterations.ToString(CultureInfo.InvariantCulture)) - .Build(); - - _commGroup.AddTask(slaveTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); - } - } - - public void OnNext(IFailedEvaluator value) - { - } - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - - private void CreateClassHierarchy() - { - HashSet<string> clrDlls = new HashSet<string>(); - clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(ITask).Assembly.GetName().Name); - clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); - clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - ClrHandlerHelper.GenerateClassHierarchy(clrDlls); - } - - private class SumFunction : IReduceFunction<int> - { - [Inject] - public SumFunction() - { - } - - public int Reduce(IEnumerable<int> elements) - { - return elements.Sum(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs deleted file mode 100644 index fea51de..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using System.Globalization; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Network.Group.Config; -using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest -{ - [TestClass] - public class BroadcastReduceTest : ReefFunctionalTest - { - [TestInitialize] - public void TestSetup() - { - CleanUp(); - } - - [TestCleanup] - public void TestCleanup() - { - CleanUp(); - } - - [TestMethod] - public void TestBroadcastAndReduceOnLocalRuntime() - { - int numTasks = 9; - TestBroadcastAndReduce(false, numTasks); - ValidateSuccessForLocalRuntime(numTasks); - } - - [Ignore] - [TestMethod] - public void TestBroadcastAndReduceOnYarn() - { - int numTasks = 9; - TestBroadcastAndReduce(true, numTasks); - } - - [TestMethod] - public void TestBroadcastAndReduce(bool runOnYarn, int numTasks) - { - IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( - DriverBridgeConfiguration.ConfigurationModule - .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) - .Build()) - .BindNamedParameter<MpiTestConfig.NumIterations, int>( - GenericType<MpiTestConfig.NumIterations>.Class, - MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( - GenericType<MpiTestConfig.NumEvaluators>.Class, - numTasks.ToString(CultureInfo.InvariantCulture)) - .Build(); - - IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() - .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId) - .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId) - .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName) - .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) - .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) - .Build(); - - IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs deleted file mode 100644 index fd3084d..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest -{ - public class MasterTask : ITask - { - private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask)); - - private readonly int _numIters; - private readonly int _numReduceSenders; - - private readonly IMpiClient _mpiClient; - private readonly ICommunicationGroupClient _commGroup; - private readonly IBroadcastSender<int> _broadcastSender; - private readonly IReduceReceiver<int> _sumReducer; - - [Inject] - public MasterTask( - [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, - [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, - IMpiClient mpiClient) - { - _logger.Log(Level.Info, "Hello from master task"); - _numIters = numIters; - _numReduceSenders = numEvaluators - 1; - _mpiClient = mpiClient; - - _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); - _broadcastSender = _commGroup.GetBroadcastSender<int>(MpiTestConstants.BroadcastOperatorName); - _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName); - } - - public byte[] Call(byte[] memento) - { - for (int i = 1; i <= _numIters; i++) - { - // Each slave task calculates the nth triangle number - _broadcastSender.Send(i); - - // Sum up all of the calculated triangle numbers - int sum = _sumReducer.Reduce(); - _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); - - int expected = TriangleNumber(i) * _numReduceSenders; - if (sum != TriangleNumber(i) * _numReduceSenders) - { - throw new Exception("Expected " + expected + " but got " + sum); - } - } - - return null; - } - - public void Dispose() - { - _mpiClient.Dispose(); - } - - private int TriangleNumber(int n) - { - return Enumerable.Range(1, n).Sum(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs deleted file mode 100644 index 08fbde4..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest -{ - public class SlaveTask : ITask - { - private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask)); - - private readonly int _numIterations; - private readonly IMpiClient _mpiClient; - private readonly ICommunicationGroupClient _commGroup; - private readonly IBroadcastReceiver<int> _broadcastReceiver; - private readonly IReduceSender<int> _triangleNumberSender; - - [Inject] - public SlaveTask( - [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, - IMpiClient mpiClient) - { - _logger.Log(Level.Info, "Hello from slave task"); - - _numIterations = numIters; - _mpiClient = mpiClient; - _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); - _broadcastReceiver = _commGroup.GetBroadcastReceiver<int>(MpiTestConstants.BroadcastOperatorName); - _triangleNumberSender = _commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName); - } - - public byte[] Call(byte[] memento) - { - for (int i = 0; i < _numIterations; i++) - { - // Receive n from Master Task - int n = _broadcastReceiver.Receive(); - _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n); - - // Calculate the nth Triangle number and send it back to driver - int triangleNum = TriangleNumber(n); - _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); - _triangleNumberSender.Send(triangleNum); - } - - return null; - } - - public void Dispose() - { - _mpiClient.Dispose(); - } - - private int TriangleNumber(int n) - { - return Enumerable.Range(1, n).Sum(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs deleted file mode 100644 index a3989a0..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Tests.Functional.MPI -{ - public class MpiTestConfig - { - [NamedParameter("Number of iterations of messages to send")] - public class NumIterations : Name<int> - { - } - - [NamedParameter("Number of Evaluators")] - public class NumEvaluators : Name<int> - { - } - - [NamedParameter("tree width")] - public class FanOut : Name<int> - { - } - - [NamedParameter("integer id of the evaluator")] - public class EvaluatorId : Name<string> - { - } - - [NamedParameter("Size of the array")] - public class ArraySize : Name<int> - { - } - - [NamedParameter("Chunk size for pipelining")] - public class ChunkSize : Name<int> - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs deleted file mode 100644 index 668add3..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.REEF.Tests.Functional.MPI -{ - internal class MpiTestConstants - { - public const string DriverId = "BroadcastReduceDriver"; - public const string GroupName = "BroadcastReduceGroup"; - public const string BroadcastOperatorName = "Broadcast"; - public const string ReduceOperatorName = "Reduce"; - public const string ScatterOperatorName = "Scatter"; - public const string MasterTaskId = "MasterTask"; - public const string SlaveTaskId = "SlaveTask-"; - public const int NumIterations = 10; - public const int FanOut = 2; - public const int ChunkSize = 2; - public const int ArrayLength = 6; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs deleted file mode 100644 index 98a68dd..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Linq; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Network.Group.Driver; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Pipelining; -using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Network.Group.Topology; - -namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest -{ - public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> - { - private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver)); - - private readonly int _numEvaluators; - private readonly int _numIterations; - private readonly int _chunkSize; - - private readonly IMpiDriver _mpiDriver; - private readonly ICommunicationGroupDriver _commGroup; - private readonly TaskStarter _mpiTaskStarter; - - [Inject] - public PipelinedBroadcastReduceDriver( - [Parameter(typeof (MpiTestConfig.NumEvaluators))] int numEvaluators, - [Parameter(typeof (MpiTestConfig.NumIterations))] int numIterations, - [Parameter(typeof (MpiTestConfig.ChunkSize))] int chunkSize, - MpiDriver mpiDriver) - { - Logger.Log(Level.Info, "*******entering the driver code " + chunkSize); - - Identifier = "BroadcastStartHandler"; - _numEvaluators = numEvaluators; - _numIterations = numIterations; - _chunkSize = chunkSize; - - _mpiDriver = mpiDriver; - - _commGroup = _mpiDriver.DefaultGroup - .AddBroadcast<int[], IntArrayCodec>( - MpiTestConstants.BroadcastOperatorName, - MpiTestConstants.MasterTaskId, - TopologyTypes.Tree, - new PipelineIntDataConverter(_chunkSize)) - .AddReduce<int[], IntArrayCodec>( - MpiTestConstants.ReduceOperatorName, - MpiTestConstants.MasterTaskId, - new ArraySumFunction(), - TopologyTypes.Tree, - new PipelineIntDataConverter(_chunkSize)) - .Build(); - - _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); - - CreateClassHierarchy(); - } - - public string Identifier { get; set; } - - public void OnNext(IEvaluatorRequestor evaluatorRequestor) - { - EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); - evaluatorRequestor.Submit(request); - } - - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); - IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); - allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); - } - - public void OnNext(IActiveContext activeContext) - { - if (_mpiDriver.IsMasterTaskContext(activeContext)) - { - Logger.Log(Level.Info, "******* Master ID " + activeContext.Id ); - - // Configure Master Task - IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId) - .Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class) - .Build()) - .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( - GenericType<MpiTestConfig.NumEvaluators>.Class, - _numEvaluators.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.NumIterations, int>( - GenericType<MpiTestConfig.NumIterations>.Class, - _numIterations.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.ArraySize, int>( - GenericType<MpiTestConfig.ArraySize>.Class, - MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture)) - .Build(); - - _commGroup.AddTask(MpiTestConstants.MasterTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); - } - else - { - // Configure Slave Task - string slaveTaskId = "SlaveTask-" + activeContext.Id; - IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, slaveTaskId) - .Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class) - .Build()) - .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( - GenericType<MpiTestConfig.NumEvaluators>.Class, - _numEvaluators.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.NumIterations, int>( - GenericType<MpiTestConfig.NumIterations>.Class, - _numIterations.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.ArraySize, int>( - GenericType<MpiTestConfig.ArraySize>.Class, - MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture)) - .Build(); - - _commGroup.AddTask(slaveTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); - } - } - - public void OnNext(IFailedEvaluator value) - { - } - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - - private void CreateClassHierarchy() - { - HashSet<string> clrDlls = new HashSet<string>(); - clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(ITask).Assembly.GetName().Name); - clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); - clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - ClrHandlerHelper.GenerateClassHierarchy(clrDlls); - } - - private class SumFunction : IReduceFunction<int> - { - [Inject] - public SumFunction() - { - } - - public int Reduce(IEnumerable<int> elements) - { - return elements.Sum(); - } - } - - private class ArraySumFunction : IReduceFunction<int[]> - { - [Inject] - public ArraySumFunction() - { - } - - public int[] Reduce(IEnumerable<int[]> elements) - { - int[] result = null; - int count = 0; - - foreach (var element in elements) - { - if (count == 0) - { - result = element.Clone() as int[]; - } - else - { - if (element.Length != result.Length) - { - throw new Exception("integer arrays are of different sizes"); - } - - for (int i = 0; i < result.Length; i++) - { - result[i] += element[i]; - } - } - - count++; - } - - return result; - } - } - - - private class IntArrayCodec : ICodec<int[]> - { - [Inject] - public IntArrayCodec() - { - } - - public byte[] Encode(int[] obj) - { - byte[] result = new byte[sizeof(Int32) * obj.Length]; - Buffer.BlockCopy(obj, 0, result, 0, result.Length); - return result; - } - - public int[] Decode(byte[] data) - { - if (data.Length % sizeof(Int32) != 0) - { - throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size"); - } - - int[] result = new int[data.Length / sizeof(Int32)]; - Buffer.BlockCopy(data, 0, result, 0, data.Length); - return result; - } - } - - public class PipelineIntDataConverter : IPipelineDataConverter<int[]> - { - readonly int _chunkSize; - - [Inject] - public PipelineIntDataConverter([Parameter(typeof(MpiTestConfig.ChunkSize))] int chunkSize) - { - _chunkSize = chunkSize; - } - - public List<PipelineMessage<int[]>> PipelineMessage(int[] message) - { - List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>(); - int totalChunks = message.Length / _chunkSize; - - if (message.Length % _chunkSize != 0) - { - totalChunks++; - } - - int counter = 0; - for (int i = 0; i < message.Length; i += _chunkSize) - { - int[] data = new int[Math.Min(_chunkSize, message.Length - i)]; - Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int)); - - messageList.Add(counter == totalChunks - 1 - ? new PipelineMessage<int[]>(data, true) - : new PipelineMessage<int[]>(data, false)); - - counter++; - } - - return messageList; - } - - public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage) - { - int size = pipelineMessage.Select(x => x.Data.Length).Sum(); - int[] data = new int[size]; - int offset = 0; - - foreach (var message in pipelineMessage) - { - Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int)); - offset += message.Data.Length * sizeof(int); - } - - return data; - } - - public IConfiguration GetConfiguration() - { - return TangFactory.GetTang().NewConfigurationBuilder() - .BindNamedParameter<MpiTestConfig.ChunkSize, int>(GenericType<MpiTestConfig.ChunkSize>.Class, _chunkSize.ToString(CultureInfo.InvariantCulture)) - .Build(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs deleted file mode 100644 index 05d0b9b..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using System.Globalization; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Network.Group.Config; -using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest -{ - [TestClass] - public class PipelinedBroadcastReduceTest : ReefFunctionalTest - { - [TestInitialize] - public void TestSetup() - { - CleanUp(); - } - - [TestCleanup] - public void TestCleanup() - { - CleanUp(); - } - - [TestMethod] - public void TestPipelinedBroadcastAndReduceOnLocalRuntime() - { - const int numTasks = 9; - TestBroadcastAndReduce(false, numTasks); - ValidateSuccessForLocalRuntime(numTasks); - } - - [Ignore] - [TestMethod] - public void TestPipelinedBroadcastAndReduceOnYarn() - { - const int numTasks = 9; - TestBroadcastAndReduce(true, numTasks); - } - - public void TestBroadcastAndReduce(bool runOnYarn, int numTasks) - { - IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( - DriverBridgeConfiguration.ConfigurationModule - .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class) - .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) - .Build()) - .BindNamedParameter<MpiTestConfig.NumIterations, int>( - GenericType<MpiTestConfig.NumIterations>.Class, - MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( - GenericType<MpiTestConfig.NumEvaluators>.Class, - numTasks.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter<MpiTestConfig.ChunkSize, int>( - GenericType<MpiTestConfig.ChunkSize>.Class, - MpiTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture)) - .Build(); - - IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() - .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId) - .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId) - .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName) - .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) - .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) - .Build(); - - IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); - - HashSet<string> appDlls = new HashSet<string>(); - appDlls.Add(typeof(IDriver).Assembly.GetName().Name); - appDlls.Add(typeof(ITask).Assembly.GetName().Name); - appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); - appDlls.Add(typeof(INameClient).Assembly.GetName().Name); - appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - - TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs deleted file mode 100644 index 922f294..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest -{ - public class PipelinedMasterTask : ITask - { - private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedMasterTask)); - - private readonly int _numIters; - private readonly int _numReduceSenders; - private readonly int _arraySize; - - private readonly IMpiClient _mpiClient; - private readonly ICommunicationGroupClient _commGroup; - private readonly IBroadcastSender<int[]> _broadcastSender; - private readonly IReduceReceiver<int[]> _sumReducer; - - [Inject] - public PipelinedMasterTask( - [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, - [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, - [Parameter(typeof(MpiTestConfig.ArraySize))] int arraySize, - IMpiClient mpiClient) - { - Logger.Log(Level.Info, "Hello from master task"); - _numIters = numIters; - _numReduceSenders = numEvaluators - 1; - _arraySize = arraySize; - _mpiClient = mpiClient; - - _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); - _broadcastSender = _commGroup.GetBroadcastSender<int[]>(MpiTestConstants.BroadcastOperatorName); - _sumReducer = _commGroup.GetReduceReceiver<int[]>(MpiTestConstants.ReduceOperatorName); - Logger.Log(Level.Info, "finished master task constructor"); - } - - public byte[] Call(byte[] memento) - { - int[] intArr = new int[_arraySize]; - - for (int i = 1; i <= _numIters; i++) - { - for (int j = 0; j < _arraySize; j++) - { - intArr[j] = i; - } - - _broadcastSender.Send(intArr); - int[] sum = _sumReducer.Reduce(); - - Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); - - int expected = TriangleNumber(i) * _numReduceSenders; - - for (int j = 0; j < intArr.Length; j++) - { - if (sum[j] != TriangleNumber(i) * _numReduceSenders) - { - throw new Exception("Expected " + expected + " but got " + sum); - } - } - } - - return null; - } - - public void Dispose() - { - _mpiClient.Dispose(); - } - - private int TriangleNumber(int n) - { - return Enumerable.Range(1, n).Sum(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs deleted file mode 100644 index 5455121..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest -{ - public class PipelinedSlaveTask : ITask - { - private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedSlaveTask)); - - private readonly int _numIterations; - private readonly IMpiClient _mpiClient; - private readonly ICommunicationGroupClient _commGroup; - private readonly IBroadcastReceiver<int[]> _broadcastReceiver; - private readonly IReduceSender<int[]> _triangleNumberSender; - - [Inject] - public PipelinedSlaveTask( - [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, - IMpiClient mpiClient) - { - Logger.Log(Level.Info, "Hello from slave task"); - - _numIterations = numIters; - _mpiClient = mpiClient; - _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); - _broadcastReceiver = _commGroup.GetBroadcastReceiver<int[]>(MpiTestConstants.BroadcastOperatorName); - _triangleNumberSender = _commGroup.GetReduceSender<int[]>(MpiTestConstants.ReduceOperatorName); - } - - public byte[] Call(byte[] memento) - { - for (int i = 0; i < _numIterations; i++) - { - // Receive n from Master Task - int[] intVec = _broadcastReceiver.Receive(); - - Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", intVec[0]); - - // Calculate the nth Triangle number and send it back to driver - int triangleNum = TriangleNumber(intVec[0]); - Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); - - int[] resArr = new int[intVec.Length]; - - for (int j = 0; j < resArr.Length; j++) - { - resArr[j] = triangleNum; - } - - _triangleNumberSender.Send(resArr); - } - - return null; - } - - public void Dispose() - { - _mpiClient.Dispose(); - } - - private int TriangleNumber(int n) - { - return Enumerable.Range(1, n).Sum(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs deleted file mode 100644 index e563534..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest -{ - public class MasterTask : ITask - { - private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask)); - - private readonly IMpiClient _mpiClient; - private readonly ICommunicationGroupClient _commGroup; - private readonly IScatterSender<int> _scatterSender; - private readonly IReduceReceiver<int> _sumReducer; - - [Inject] - public MasterTask(IMpiClient mpiClient) - { - _logger.Log(Level.Info, "Hello from master task"); - _mpiClient = mpiClient; - - _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); - _scatterSender = _commGroup.GetScatterSender<int>(MpiTestConstants.ScatterOperatorName); - _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName); - } - - public byte[] Call(byte[] memento) - { - List<int> data = Enumerable.Range(1, 100).ToList(); - _scatterSender.Send(data); - - int sum = _sumReducer.Reduce(); - _logger.Log(Level.Info, "Received sum: {0}", sum); - - return null; - } - - public void Dispose() - { - _mpiClient.Dispose(); - } - - private List<string> GetScatterOrder() - { - return new List<string> { "SlaveTask-4", "SlaveTask-3", "SlaveTask-2", "SlaveTask-1" }; - } - } -}
