Repository: incubator-reef
Updated Branches:
refs/heads/master 9152ea788 -> 63e0c074c
[REEF-648] Add evaluator memory option to IMRU and its examples.
This addressed the issue by
* giving user an option to give the memory as input in Run.cs
* including the memory field in IMRUJobDefinition and
IMRUJobDefinitionBuilder
* including the memory bindings in driver configuration
* fixing a small bug in IMRUClient configuration for BroadcastReduce
in Run.cs
JIRA:
[REEF-648](https://issues.apache.org/jira/browse/REEF-648)
Pull Request:
This closes #414
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/63e0c074
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/63e0c074
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/63e0c074
Branch: refs/heads/master
Commit: 63e0c074c5df4453e1b7c5ecf10c6cd7bb0ccd80
Parents: 9152ea7
Author: Dhruv <[email protected]>
Authored: Tue Aug 25 19:45:04 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 26 11:46:23 2015 -0700
----------------------------------------------------------------------
.../PipelinedBroadcastAndReduce.cs | 4 ++-
lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 18 ++++++++++---
.../API/IMRUJobDefinition.cs | 23 ++++++++++++++++
.../API/IMRUJobDefinitionBuilder.cs | 28 ++++++++++++++++++++
.../OnREEF/Client/REEFIMRUClient.cs | 4 +++
5 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index 896a0d0..8f1cbc7 100644
---
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -43,7 +43,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Runs the actual broadcast and reduce job
/// </summary>
- public void Run(int numberofMappers, int chunkSize, int numIterations,
int dim)
+ public void Run(int numberofMappers, int chunkSize, int numIterations,
int dim, int mapperMemory, int updateTaskMemory)
{
var updateFunctionConfig =
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[],
int[], int[]>.ConfigurationModule
@@ -99,6 +99,8 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
numberofMappers.ToString()).Build())
.SetJobName("BroadcastReduce")
.SetNumberOfMappers(numberofMappers)
+ .SetMapperMemory(mapperMemory)
+ .SetUpdateTaskMemory(updateTaskMemory)
.Build());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index c0a6a3b..b1554d5 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -60,6 +60,8 @@ namespace Org.Apache.REEF.IMRU.Examples
int chunkSize = 2;
int dims = 10;
int iterations = 10;
+ int mapperMemory = 512;
+ int updateTaskMemory = 512;
if (args.Length > 0)
{
@@ -73,7 +75,17 @@ namespace Org.Apache.REEF.IMRU.Examples
if (args.Length > 2)
{
- iterations = Convert.ToInt32(args[2]);
+ mapperMemory = Convert.ToInt32(args[2]);
+ }
+
+ if (args.Length > 3)
+ {
+ updateTaskMemory = Convert.ToInt32(args[3]);
+ }
+
+ if (args.Length > 4)
+ {
+ iterations = Convert.ToInt32(args[4]);
}
IInjector injector;
@@ -87,10 +99,10 @@ namespace Org.Apache.REEF.IMRU.Examples
else
{
injector = TangFactory.GetTang()
- .NewInjector(OnREEFIMRURunTimeConfiguration<int, int,
int>.GetYarnIMRUConfiguration(), tcpPortConfig);
+ .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[],
int[]>.GetYarnIMRUConfiguration(), tcpPortConfig);
}
var broadcastReduceExample =
injector.GetInstance<PipelinedBroadcastAndReduce>();
- broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations,
dims);
+ broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations,
dims, mapperMemory, updateTaskMemory);
}
private static void Main(string[] args)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 6a955c2..ed8c211 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -37,6 +37,8 @@ namespace Org.Apache.REEF.IMRU.API
private readonly IConfiguration
_mapInputPipelineDataConverterConfiguration;
private readonly IConfiguration _partitionedDatasetConfiguration;
private readonly int _numberOfMappers;
+ private readonly int _memoryPerMapper;
+ private readonly int _updateTaskMemory;
/// <summary>
/// Constructor
@@ -54,6 +56,7 @@ namespace Org.Apache.REEF.IMRU.API
/// <param name="partitionedDatasetConfiguration">Configuration of
partitioned
/// dataset</param>
/// <param name="numberOfMappers">Number of mappers</param>
+ /// <param name="memoryPerMapper">Per Mapper memory.</param>
/// <param name="jobName">Job name</param>
internal IMRUJobDefinition(
IConfiguration mapFunctionConfiguration,
@@ -65,6 +68,8 @@ namespace Org.Apache.REEF.IMRU.API
IConfiguration mapInputPipelineDataConverterConfiguration,
IConfiguration partitionedDatasetConfiguration,
int numberOfMappers,
+ int memoryPerMapper,
+ int updateTaskMemory,
string jobName)
{
_mapFunctionConfiguration = mapFunctionConfiguration;
@@ -77,6 +82,8 @@ namespace Org.Apache.REEF.IMRU.API
_partitionedDatasetConfiguration = partitionedDatasetConfiguration;
_numberOfMappers = numberOfMappers;
_jobName = jobName;
+ _memoryPerMapper = memoryPerMapper;
+ _updateTaskMemory = updateTaskMemory;
}
/// <summary>
@@ -161,5 +168,21 @@ namespace Org.Apache.REEF.IMRU.API
internal int NumberOfMappers {
get { return _numberOfMappers; }
}
+
+ /// <summary>
+ /// Memory for each mapper in MB
+ /// </summary>
+ internal int MapperMemory
+ {
+ get { return _memoryPerMapper; }
+ }
+
+ /// <summary>
+ /// Memory for update task in MB
+ /// </summary>
+ internal int UpdateTaskMemory
+ {
+ get { return _updateTaskMemory; }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index 7f54459..730fb54 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -36,6 +36,8 @@ namespace Org.Apache.REEF.IMRU.API
private string _jobName;
private int _numberOfMappers;
+ private int _memoryPerMapper;
+ private int _updateTaskMemory;
private IConfiguration _mapFunctionConfiguration;
private IConfiguration _mapInputCodecConfiguration;
private IConfiguration _updateFunctionCodecsConfiguration;
@@ -56,6 +58,8 @@ namespace Org.Apache.REEF.IMRU.API
_mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
_mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
_partitionedDatasetConfiguration = EmptyConfiguration;
+ _memoryPerMapper = 512;
+ _updateTaskMemory = 512;
}
/// <summary>
@@ -177,6 +181,28 @@ namespace Org.Apache.REEF.IMRU.API
}
/// <summary>
+ /// Sets mapper memory
+ /// </summary>
+ /// <param name="memory">memory in MB</param>
+ /// <returns></returns>
+ public IMRUJobDefinitionBuilder SetMapperMemory(int memory)
+ {
+ _memoryPerMapper = memory;
+ return this;
+ }
+
+ /// <summary>
+ /// Set update task memory
+ /// </summary>
+ /// <param name="memory">memory in MB</param>
+ /// <returns></returns>
+ public IMRUJobDefinitionBuilder SetUpdateTaskMemory(int memory)
+ {
+ _updateTaskMemory = memory;
+ return this;
+ }
+
+ /// <summary>
/// Instantiate the IMRUJobDefinition.
/// </summary>
/// <returns>The IMRUJobDefintion configured.</returns>
@@ -225,6 +251,8 @@ namespace Org.Apache.REEF.IMRU.API
_mapInputPipelineDataConverterConfiguration,
_partitionedDatasetConfiguration,
_numberOfMappers,
+ _memoryPerMapper,
+ _updateTaskMemory,
_jobName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 99f5313..9466086 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -102,6 +102,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
_configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
.BindNamedParameter(typeof (SerializedReduceConfiguration),
_configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+ .BindNamedParameter(typeof (MemoryPerMapper),
+
jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter(typeof (MemoryForUpdateTask),
+
jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture))
.Build();
// The JobSubmission contains the Driver configuration as well as
the files needed on the Driver.