Repository: incubator-reef
Updated Branches:
refs/heads/master 3cac2d4af -> 582284e6d
[REEF-730] Fix issues with InProcessIMRU
This addressed the issue by
* using codecs so that there are no issues in case Map and Update tasks try to
use/reuse the same fields in MapInput and MapOutput and codec functions are
also debugged for On-REEF runs.
* using Per-Map configuration to give mapper specific configuration also.
* using PartitionedDataset configuration so that Partition configuration is
merged with each mapper configuration.
JIRA:
[REEF-730](https://issues.apache.org/jira/browse/REEF-730)
Pull Request:
This closes #473
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/582284e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/582284e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/582284e6
Branch: refs/heads/master
Commit: 582284e6db28848e149aa738bb96d1a1e5eda1a7
Parents: 3cac2d4
Author: Dhruv <[email protected]>
Authored: Tue Sep 8 19:11:35 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Sep 14 10:36:07 2015 -0700
----------------------------------------------------------------------
.../InProcess/IMRURunner.cs | 40 +++++++++++++--
.../InProcess/InProcessIMRUClient.cs | 53 +++++++++++++++++---
.../InProcess/InputCodecWrapper.cs | 35 +++++++++++++
.../InProcess/OutputCodecWrapper.cs | 35 +++++++++++++
.../Org.Apache.REEF.IMRU.csproj | 2 +
5 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
index 2009d4c..a08e3d6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
@@ -19,10 +19,12 @@
using System.Collections.Generic;
using System.Diagnostics;
-using System.Linq;
+using System.IO;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.IMRU.InProcess
{
@@ -37,15 +39,21 @@ namespace Org.Apache.REEF.IMRU.InProcess
private readonly ISet<IMapFunction<TMapInput, TMapOutput>>
_mapfunctions;
private readonly IReduceFunction<TMapOutput> _reduceTask;
private readonly IUpdateFunction<TMapInput, TMapOutput, TResult>
_updateTask;
+ private readonly IStreamingCodec<TMapInput> _mapInputCodec;
+ private readonly IStreamingCodec<TMapOutput> _mapOutputCodec;
[Inject]
private IMRURunner(MapFunctions<TMapInput, TMapOutput> mapfunctions,
IReduceFunction<TMapOutput> reduceTask,
- IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask)
+ IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
+ InputCodecWrapper<TMapInput> mapInputCodec,
+ OutputCodecWrapper<TMapOutput> mapOutputCodec)
{
_mapfunctions = mapfunctions.Mappers;
_reduceTask = reduceTask;
_updateTask = updateTask;
+ _mapInputCodec = mapInputCodec.Codec;
+ _mapOutputCodec = mapOutputCodec.Codec;
}
internal IList<TResult> Run()
@@ -59,16 +67,42 @@ namespace Org.Apache.REEF.IMRU.InProcess
{
results.Add(updateResult.Result);
}
+
Debug.Assert(updateResult.HasMapInput);
var mapinput = updateResult.MapInput;
- var mapOutputs = _mapfunctions.Select(x => x.Map(mapinput));
+ var mapOutputs = new List<TMapOutput>();
+
+ foreach (var mapfunc in _mapfunctions)
+ {
+ //We create a copy by doing coding and decoding since the
map task might
+ //reuse the fields in next iteration and meanwhile update
task might update it.
+ using (MemoryStream mapInputStream = new MemoryStream(),
mapOutputStream = new MemoryStream())
+ {
+ var mapInputWriter = new
StreamDataWriter(mapInputStream);
+ _mapInputCodec.Write(mapinput, mapInputWriter);
+ mapInputStream.Position = 0;
+ var mapInputReader = new
StreamDataReader(mapInputStream);
+ var output =
mapfunc.Map(_mapInputCodec.Read(mapInputReader));
+
+ var mapOutputWriter = new
StreamDataWriter(mapOutputStream);
+ _mapOutputCodec.Write(output, mapOutputWriter);
+ mapOutputStream.Position = 0;
+ var mapOutputReader = new
StreamDataReader(mapOutputStream);
+ output = _mapOutputCodec.Read(mapOutputReader);
+
+ mapOutputs.Add(output);
+ }
+ }
+
var mapOutput = _reduceTask.Reduce(mapOutputs);
updateResult = _updateTask.Update(mapOutput);
}
+
if (updateResult.HasResult)
{
results.Add(updateResult.Result);
}
+
return results;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
index bb22945..77f7d9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
@@ -20,13 +20,18 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Linq;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.InProcess.Parameters;
+using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.IMRU.InProcess
{
@@ -41,6 +46,9 @@ namespace Org.Apache.REEF.IMRU.InProcess
/// <typeparam name="TResult">The return type of the
computation.</typeparam>
public class InProcessIMRUClient<TMapInput, TMapOutput, TResult> :
IIMRUClient<TMapInput, TMapOutput, TResult>
{
+ private static readonly Logger Logger =
+ Logger.GetLogger(typeof (InProcessIMRUClient<TMapInput,
TMapOutput, TResult>));
+
private readonly int _numberOfMappers;
/// <summary>
@@ -61,27 +69,60 @@ namespace Org.Apache.REEF.IMRU.InProcess
/// <returns>The result of the job</returns>
public IEnumerable<TResult> Submit(IMRUJobDefinition jobDefinition)
{
+ IConfiguration overallPerMapConfig = null;
+ try
+ {
+ overallPerMapConfig =
Configurations.Merge(jobDefinition.PerMapConfigGeneratorConfig.ToArray());
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Issues in merging PerMapCOnfigGenerator
configurations", Logger);
+ }
+
var mergedConfig = Configurations.Merge(
jobDefinition.ReduceFunctionConfiguration,
- jobDefinition.UpdateFunctionConfiguration);
+ jobDefinition.UpdateFunctionConfiguration,
+ jobDefinition.UpdateFunctionCodecsConfiguration,
+ overallPerMapConfig);
var injector = TangFactory.GetTang().NewInjector(mergedConfig);
+ ISet<IPerMapperConfigGenerator> perMapConfigGenerators =
+ (ISet<IPerMapperConfigGenerator>)
injector.GetNamedInstance(typeof (PerMapConfigGeneratorSet));
+
injector.BindVolatileInstance(GenericType<MapFunctions<TMapInput,
TMapOutput>>.Class,
- MakeMapFunctions(jobDefinition.MapFunctionConfiguration));
+ MakeMapFunctions(jobDefinition.MapFunctionConfiguration,
jobDefinition.PartitionedDatasetConfiguration, perMapConfigGenerators));
var runner = injector.GetInstance<IMRURunner<TMapInput,
TMapOutput, TResult>>();
return runner.Run();
}
- private MapFunctions<TMapInput, TMapOutput>
MakeMapFunctions(IConfiguration configuration)
+ /// <summary>
+ /// We also need IPartition at each map function
+ /// </summary>
+ /// <param name="mapConfiguration">Map configuration given by
user</param>
+ /// <param name="partitionedDataSetConfig">Partitioned dataset
configuration</param>
+ /// <param name="perMapConfigGenerators">Per map configuration
generators</param>
+ /// <returns></returns>
+ private MapFunctions<TMapInput, TMapOutput>
MakeMapFunctions(IConfiguration mapConfiguration, IConfiguration
partitionedDataSetConfig, ISet<IPerMapperConfigGenerator>
perMapConfigGenerators)
{
- var injector = TangFactory.GetTang().NewInjector(configuration);
+ IPartitionedDataSet dataset =
+
TangFactory.GetTang().NewInjector(partitionedDataSetConfig).GetInstance<IPartitionedDataSet>();
ISet<IMapFunction<TMapInput, TMapOutput>> mappers = new
HashSet<IMapFunction<TMapInput, TMapOutput>>();
- for (var i = 0; i < _numberOfMappers; ++i)
+
+ int counter = 0;
+ foreach(var descriptor in dataset )
{
-
mappers.Add(injector.ForkInjector().GetInstance<IMapFunction<TMapInput,
TMapOutput>>());
+ var emptyConfig =
TangFactory.GetTang().NewConfigurationBuilder().Build();
+ IConfiguration perMapConfig =
perMapConfigGenerators.Aggregate(emptyConfig,
+ (current, configGenerator) =>
+ Configurations.Merge(current,
configGenerator.GetMapperConfiguration(counter, dataset.Count)));
+
+ var injector = TangFactory.GetTang()
+ .NewInjector(mapConfiguration,
descriptor.GetPartitionConfiguration(), perMapConfig);
+ mappers.Add(injector.GetInstance<IMapFunction<TMapInput,
TMapOutput>>());
+ counter++;
}
return new MapFunctions<TMapInput, TMapOutput>(mappers);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
new file mode 100644
index 0000000..31d28c4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.InProcess
+{
+ internal class InputCodecWrapper<T>
+ {
+ [Inject]
+ private InputCodecWrapper(IStreamingCodec<T> codec)
+ {
+ Codec = codec;
+ }
+
+ internal IStreamingCodec<T> Codec { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
new file mode 100644
index 0000000..fd6cc47
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.InProcess
+{
+ internal class OutputCodecWrapper<T>
+ {
+ [Inject]
+ private OutputCodecWrapper(IStreamingCodec<T> codec)
+ {
+ Codec = codec;
+ }
+
+ internal IStreamingCodec<T> Codec { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index e3fecf9..332137b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -58,6 +58,8 @@ under the License.
<Compile Include="InProcess\IMRURunner.cs" />
<Compile Include="InProcess\InProcessIMRUClient.cs" />
<Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
+ <Compile Include="InProcess\OutputCodecWrapper.cs" />
+ <Compile Include="InProcess\InputCodecWrapper.cs" />
<Compile Include="InProcess\MapFunctions.cs" />
<Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
<Compile Include="OnREEF\Client\REEFIMRUClientConfiguration.cs" />