Repository: incubator-reef Updated Branches: refs/heads/master 535ed1501 -> cf4a460fd
[REEF-788] Improve memory footprint of IMRU Map and Update host tasks This addressed the issue by * removing the maintenance of input and output between iterations. * allowing the use of GC.Collect in between iterations. JIRA: [REEF-788](https://issues.apache.org/jira/browse/REEF-788) Pull Request: This closes #527 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/cf4a460f Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/cf4a460f Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/cf4a460f Branch: refs/heads/master Commit: cf4a460fd85f3d2dad019592a316575baf168f4b Parents: 535ed15 Author: Dhruv <[email protected]> Authored: Sun Sep 27 15:25:03 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Sep 30 12:50:48 2015 -0700 ---------------------------------------------------------------------- .../API/IMRUJobDefinition.cs | 14 ++++++- .../API/IMRUJobDefinitionBuilder.cs | 17 +++++++- .../OnREEF/Client/REEFIMRUClient.cs | 2 + .../OnREEF/Driver/IMRUDriver.cs | 4 ++ .../OnREEF/IMRUTasks/MapTaskHost.cs | 41 ++++++++++++++++---- .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 39 ++++++++++++++----- .../MapInputWithControlMessage.cs | 8 +++- .../OnREEF/Parameters/InvokeGC .cs | 26 +++++++++++++ .../Org.Apache.REEF.IMRU.csproj | 1 + 9 files changed, 132 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs index 1d42711..2f0596d 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs @@ -41,6 +41,7 @@ namespace Org.Apache.REEF.IMRU.API private readonly int _memoryPerMapper; private readonly int _updateTaskMemory; private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig; + private readonly bool _invokeGC; /// <summary> /// Constructor @@ -61,6 +62,7 @@ namespace Org.Apache.REEF.IMRU.API /// <param name="numberOfMappers">Number of mappers</param> /// <param name="memoryPerMapper">Per Mapper memory.</param> /// <param name="jobName">Job name</param> + /// <param name="invokeGC">Whether to call garbage collector after each iteration</param> internal IMRUJobDefinition( IConfiguration mapFunctionConfiguration, IConfiguration mapInputCodecConfiguration, @@ -74,7 +76,8 @@ namespace Org.Apache.REEF.IMRU.API int numberOfMappers, int memoryPerMapper, int updateTaskMemory, - string jobName) + string jobName, + bool invokeGC) { _mapFunctionConfiguration = mapFunctionConfiguration; _mapInputCodecConfiguration = mapInputCodecConfiguration; @@ -89,6 +92,7 @@ namespace Org.Apache.REEF.IMRU.API _memoryPerMapper = memoryPerMapper; _updateTaskMemory = updateTaskMemory; _perMapConfigGeneratorConfig = perMapConfigGeneratorConfig; + _invokeGC = invokeGC; } /// <summary> @@ -197,5 +201,13 @@ namespace Org.Apache.REEF.IMRU.API { get { return _perMapConfigGeneratorConfig; } } + + /// <summary> + /// Whether to call Garbage Collector after each iteration + /// </summary> + internal bool InvokeGarbageCollectorAfterIteration + { + get { return _invokeGC; } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs index 985c372..e8dafc8 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs @@ -51,6 +51,7 @@ namespace Org.Apache.REEF.IMRU.API private IConfiguration _mapInputPipelineDataConverterConfiguration; private IConfiguration _partitionedDatasetConfiguration; private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig; + private bool _invokeGC; private static readonly IConfiguration EmptyConfiguration = TangFactory.GetTang().NewConfigurationBuilder().Build(); @@ -65,6 +66,7 @@ namespace Org.Apache.REEF.IMRU.API _partitionedDatasetConfiguration = EmptyConfiguration; _memoryPerMapper = 512; _updateTaskMemory = 512; + _invokeGC = true; _perMapConfigGeneratorConfig = new HashSet<IConfiguration>(); } @@ -220,6 +222,18 @@ namespace Org.Apache.REEF.IMRU.API } /// <summary> + /// Whether to invoke Garbage Collector after each IMRU iteration + /// </summary> + /// <param name="invokeGC">variable telling whether to invoke or not</param> + /// <returns>The modified definition builder</returns> + public IMRUJobDefinitionBuilder InvokeGarbageCollectorAfterIteration(bool invokeGC) + { + _invokeGC = invokeGC; + return this; + } + + + /// <summary> /// Instantiate the IMRUJobDefinition. /// </summary> /// <returns>The IMRUJobDefintion configured.</returns> @@ -271,7 +285,8 @@ namespace Org.Apache.REEF.IMRU.API _numberOfMappers, _memoryPerMapper, _updateTaskMemory, - _jobName); + _jobName, + _invokeGC); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs index 131fe4b..a7f0aed 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs @@ -130,6 +130,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter(typeof (MemoryForUpdateTask), jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof (InvokeGC), + jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture)) .Build(); // The JobSubmission contains the Driver configuration as well as the files needed on the Driver. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 5d55878..719533b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -76,6 +76,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver private readonly int _allowedFailedEvaluators; private int _currentFailedEvaluators = 0; private bool _reachedUpdateTaskActiveContext = false; + private readonly bool _invokeGC; private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput> _serviceAndContextConfigurationProvider; @@ -90,6 +91,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver [Parameter(typeof (MemoryPerMapper))] int memoryPerMapper, [Parameter(typeof (MemoryForUpdateTask))] int memoryForUpdateTask, [Parameter(typeof (AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction, + [Parameter(typeof(InvokeGC))] bool invokeGC, IGroupCommDriver groupCommDriver) { _dataSet = dataSet; @@ -103,6 +105,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver _perMapperConfigs = perMapperConfigs; _completedTasks = new ConcurrentBag<ICompletedTask>(); _allowedFailedEvaluators = (int) (failedEvaluatorsFraction*dataSet.Count); + _invokeGC = invokeGC; AddGroupCommunicationOperators(); _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _dataSet.Count + 1); @@ -202,6 +205,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver _configurationManager.MapFunctionConfiguration, mapSpecificConfig }) + .BindNamedParameter(typeof (InvokeGC), _invokeGC.ToString()) .Build(); _commGroup.AddTask(taskId); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs index b1af870..c64706a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. +using System; +using System.Diagnostics; +using System.Runtime; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.Driver; using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; @@ -38,18 +42,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly IBroadcastReceiver<MapInputWithControlMessage<TMapInput>> _dataAndMessageReceiver; private readonly IReduceSender<TMapOutput> _dataReducer; private readonly IMapFunction<TMapInput, TMapOutput> _mapTask; + private readonly bool _invokeGC; /// <summary> /// </summary> /// <param name="mapTask">The MapTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> + /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> [Inject] - private MapTaskHost(IMapFunction<TMapInput, TMapOutput> mapTask, IGroupCommClient groupCommunicationsClient) + private MapTaskHost( + IMapFunction<TMapInput, TMapOutput> mapTask, + IGroupCommClient groupCommunicationsClient, + [Parameter(typeof (InvokeGC))] bool invokeGC) { _mapTask = mapTask; var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); - _dataAndMessageReceiver = cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); + _dataAndMessageReceiver = + cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); _dataReducer = cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName); + _invokeGC = invokeGC; } /// <summary> @@ -59,15 +70,29 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <returns></returns> public byte[] Call(byte[] memento) { - MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive(); - - while (mapInput.ControlMessage == MapControlMessage.AnotherRound) + while (true) { - var result = _mapTask.Map(mapInput.Message); + if (_invokeGC) + { + Logger.Log(Level.Verbose,"Calling Garbage Collector"); + GC.Collect(); + GC.WaitForPendingFinalizers(); + } + + TMapOutput result; + + using ( + MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive()) + { + if (mapInput.ControlMessage == MapControlMessage.Stop) + { + break; + } + result = _mapTask.Map(mapInput.Message); + } + _dataReducer.Send(result); - mapInput = _dataAndMessageReceiver.Receive(); } - return null; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index f858eaf..a0d5af3 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. +using System; +using System.Diagnostics; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.Driver; using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; @@ -39,20 +42,24 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly IReduceReceiver<TMapOutput> _dataReceiver; private readonly IBroadcastSender<MapInputWithControlMessage<TMapInput>> _dataAndControlMessageSender; private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> _updateTask; + private readonly bool _invokeGC; /// <summary> /// </summary> /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> + /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> [Inject] private UpdateTaskHost( IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask, - IGroupCommClient groupCommunicationsClient) + IGroupCommClient groupCommunicationsClient, + [Parameter(typeof(InvokeGC))] bool invokeGC) { _updateTask = updateTask; var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); _dataAndControlMessageSender = cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); _dataReceiver = cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName); + _invokeGC = invokeGC; } /// <summary> @@ -63,22 +70,36 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks public byte[] Call(byte[] memento) { var updateResult = _updateTask.Initialize(); - MapInputWithControlMessage<TMapInput> message = - new MapInputWithControlMessage<TMapInput>(MapControlMessage.AnotherRound); while (updateResult.HasMapInput) - { - message.Message = updateResult.MapInput; - _dataAndControlMessageSender.Send(message); - updateResult = _updateTask.Update(_dataReceiver.Reduce()); + { + using ( + var message = new MapInputWithControlMessage<TMapInput>(updateResult.MapInput, + MapControlMessage.AnotherRound)) + { + _dataAndControlMessageSender.Send(message); + } + + var input = _dataReceiver.Reduce(); + + if (_invokeGC) + { + Logger.Log(Level.Verbose, "Calling Garbage Collector"); + GC.Collect(); + GC.WaitForPendingFinalizers(); + } + + updateResult = _updateTask.Update(input); + if (updateResult.HasResult) { // TODO[REEF-576]: Emit output somewhere. } } - message.ControlMessage = MapControlMessage.Stop; - _dataAndControlMessageSender.Send(message); + MapInputWithControlMessage<TMapInput> stopMessage = + new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop); + _dataAndControlMessageSender.Send(stopMessage); if (updateResult.HasResult) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs index 8ca5eee..dd43612 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/MapInputWithControlMessage/MapInputWithControlMessage.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using System; + namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage { /// <summary> @@ -23,7 +25,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage /// message from UpdateTask /// </summary> /// <typeparam name="TMapInput"></typeparam> - internal class MapInputWithControlMessage<TMapInput> + internal class MapInputWithControlMessage<TMapInput> : IDisposable { /// <summary> /// Internal constructor @@ -54,5 +56,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage /// Control message from Update Task to Map task /// </summary> internal MapControlMessage ControlMessage { get; set; } + + public void Dispose() + { + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/InvokeGC .cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/InvokeGC .cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/InvokeGC .cs new file mode 100644 index 0000000..7f2d84e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/InvokeGC .cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.OnREEF.Parameters +{ + [NamedParameter("Whether to invoke GC after each map or update step", "callgc", "false")] + internal sealed class InvokeGC : Name<bool> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cf4a460f/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 8459279..491ff25 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -75,6 +75,7 @@ under the License. <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs" /> + <Compile Include="OnREEF\Parameters\InvokeGC .cs" /> <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" /> <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" /> <Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
