Repository: incubator-reef
Updated Branches:
  refs/heads/master 3cac2d4af -> 582284e6d


[REEF-730] Fix issues with InProcessIMRU

This addressed the issue by
  * using codecs so that there are no issues in case Map and Update tasks try to
    use/reuse the same fields in MapInput and MapOutput and codec functions are
    also debugged for On-REEF runs.
  * using Per-Map configuration to give mapper specific configuration also.
  * using PartitionedDataset configuration so that Partition configuration is
    merged with each mapper configuration.

JIRA:
  [REEF-730](https://issues.apache.org/jira/browse/REEF-730)

Pull Request:
  This closes #473


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/582284e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/582284e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/582284e6

Branch: refs/heads/master
Commit: 582284e6db28848e149aa738bb96d1a1e5eda1a7
Parents: 3cac2d4
Author: Dhruv <[email protected]>
Authored: Tue Sep 8 19:11:35 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Sep 14 10:36:07 2015 -0700

----------------------------------------------------------------------
 .../InProcess/IMRURunner.cs                     | 40 +++++++++++++--
 .../InProcess/InProcessIMRUClient.cs            | 53 +++++++++++++++++---
 .../InProcess/InputCodecWrapper.cs              | 35 +++++++++++++
 .../InProcess/OutputCodecWrapper.cs             | 35 +++++++++++++
 .../Org.Apache.REEF.IMRU.csproj                 |  2 +
 5 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
index 2009d4c..a08e3d6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
@@ -19,10 +19,12 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
-using System.Linq;
+using System.IO;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.IMRU.InProcess
 {
@@ -37,15 +39,21 @@ namespace Org.Apache.REEF.IMRU.InProcess
         private readonly ISet<IMapFunction<TMapInput, TMapOutput>> 
_mapfunctions;
         private readonly IReduceFunction<TMapOutput> _reduceTask;
         private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> 
_updateTask;
+        private readonly IStreamingCodec<TMapInput> _mapInputCodec;
+        private readonly IStreamingCodec<TMapOutput> _mapOutputCodec;
 
         [Inject]
         private IMRURunner(MapFunctions<TMapInput, TMapOutput> mapfunctions,
             IReduceFunction<TMapOutput> reduceTask,
-            IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask)
+            IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
+            InputCodecWrapper<TMapInput> mapInputCodec,
+            OutputCodecWrapper<TMapOutput> mapOutputCodec)
         {
             _mapfunctions = mapfunctions.Mappers;
             _reduceTask = reduceTask;
             _updateTask = updateTask;
+            _mapInputCodec = mapInputCodec.Codec;
+            _mapOutputCodec = mapOutputCodec.Codec;
         }
 
         internal IList<TResult> Run()
@@ -59,16 +67,42 @@ namespace Org.Apache.REEF.IMRU.InProcess
                 {
                     results.Add(updateResult.Result);
                 }
+
                 Debug.Assert(updateResult.HasMapInput);
                 var mapinput = updateResult.MapInput;
-                var mapOutputs = _mapfunctions.Select(x => x.Map(mapinput));
+                var mapOutputs = new List<TMapOutput>();
+
+                foreach (var mapfunc in _mapfunctions)
+                {
+                    //We create a copy by doing coding and decoding since the 
map task might 
+                    //reuse the fields in next iteration and meanwhile update 
task might update it.
+                    using (MemoryStream mapInputStream = new MemoryStream(), 
mapOutputStream = new MemoryStream())
+                    {
+                        var mapInputWriter = new 
StreamDataWriter(mapInputStream);
+                        _mapInputCodec.Write(mapinput, mapInputWriter);
+                        mapInputStream.Position = 0;
+                        var mapInputReader = new 
StreamDataReader(mapInputStream);
+                        var output = 
mapfunc.Map(_mapInputCodec.Read(mapInputReader));
+
+                        var mapOutputWriter = new 
StreamDataWriter(mapOutputStream);
+                        _mapOutputCodec.Write(output, mapOutputWriter);
+                        mapOutputStream.Position = 0;
+                        var mapOutputReader = new 
StreamDataReader(mapOutputStream);
+                        output = _mapOutputCodec.Read(mapOutputReader);
+
+                        mapOutputs.Add(output);
+                    }
+                }
+
                 var mapOutput = _reduceTask.Reduce(mapOutputs);
                 updateResult = _updateTask.Update(mapOutput);
             }
+
             if (updateResult.HasResult)
             {
                 results.Add(updateResult.Result);
             }
+
             return results;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
index bb22945..77f7d9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
@@ -20,13 +20,18 @@
 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Linq;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.InProcess.Parameters;
+using Org.Apache.REEF.IO.PartitionedData;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.IMRU.InProcess
 {
@@ -41,6 +46,9 @@ namespace Org.Apache.REEF.IMRU.InProcess
     /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
     public class InProcessIMRUClient<TMapInput, TMapOutput, TResult> : 
IIMRUClient<TMapInput, TMapOutput, TResult>
     {
+        private static readonly Logger Logger =
+            Logger.GetLogger(typeof (InProcessIMRUClient<TMapInput, 
TMapOutput, TResult>));
+
         private readonly int _numberOfMappers;
 
         /// <summary>
@@ -61,27 +69,60 @@ namespace Org.Apache.REEF.IMRU.InProcess
         /// <returns>The result of the job</returns>
         public IEnumerable<TResult> Submit(IMRUJobDefinition jobDefinition)
         {
+            IConfiguration overallPerMapConfig = null;
+            try
+            {
+                overallPerMapConfig = 
Configurations.Merge(jobDefinition.PerMapConfigGeneratorConfig.ToArray());
+            }
+            catch (Exception e)
+            {
+                Exceptions.Throw(e, "Issues in merging PerMapCOnfigGenerator 
configurations", Logger);
+            }
+
             var mergedConfig = Configurations.Merge(
                 jobDefinition.ReduceFunctionConfiguration,
-                jobDefinition.UpdateFunctionConfiguration);
+                jobDefinition.UpdateFunctionConfiguration,
+                jobDefinition.UpdateFunctionCodecsConfiguration,
+                overallPerMapConfig);
 
             var injector = TangFactory.GetTang().NewInjector(mergedConfig);
 
+            ISet<IPerMapperConfigGenerator> perMapConfigGenerators =
+                (ISet<IPerMapperConfigGenerator>) 
injector.GetNamedInstance(typeof (PerMapConfigGeneratorSet));
+
             injector.BindVolatileInstance(GenericType<MapFunctions<TMapInput, 
TMapOutput>>.Class,
-                MakeMapFunctions(jobDefinition.MapFunctionConfiguration));
+                MakeMapFunctions(jobDefinition.MapFunctionConfiguration, 
jobDefinition.PartitionedDatasetConfiguration, perMapConfigGenerators));
 
             var runner = injector.GetInstance<IMRURunner<TMapInput, 
TMapOutput, TResult>>();
             return runner.Run();
         }
 
-        private MapFunctions<TMapInput, TMapOutput> 
MakeMapFunctions(IConfiguration configuration)
+        /// <summary>
+        /// We also need IPartition at each map function
+        /// </summary>
+        /// <param name="mapConfiguration">Map configuration given by 
user</param>
+        /// <param name="partitionedDataSetConfig">Partitioned dataset 
configuration</param>
+        /// <param name="perMapConfigGenerators">Per map configuration 
generators</param>
+        /// <returns></returns>
+        private MapFunctions<TMapInput, TMapOutput> 
MakeMapFunctions(IConfiguration mapConfiguration, IConfiguration 
partitionedDataSetConfig, ISet<IPerMapperConfigGenerator> 
perMapConfigGenerators)
         {
-            var injector = TangFactory.GetTang().NewInjector(configuration);
+            IPartitionedDataSet dataset =
+                
TangFactory.GetTang().NewInjector(partitionedDataSetConfig).GetInstance<IPartitionedDataSet>();
 
             ISet<IMapFunction<TMapInput, TMapOutput>> mappers = new 
HashSet<IMapFunction<TMapInput, TMapOutput>>();
-            for (var i = 0; i < _numberOfMappers; ++i)
+
+            int counter = 0;
+            foreach(var descriptor in dataset )
             {
-                
mappers.Add(injector.ForkInjector().GetInstance<IMapFunction<TMapInput, 
TMapOutput>>());
+                var emptyConfig = 
TangFactory.GetTang().NewConfigurationBuilder().Build();
+                IConfiguration perMapConfig = 
perMapConfigGenerators.Aggregate(emptyConfig,
+                    (current, configGenerator) =>
+                        Configurations.Merge(current, 
configGenerator.GetMapperConfiguration(counter, dataset.Count)));
+
+                var injector = TangFactory.GetTang()
+                    .NewInjector(mapConfiguration, 
descriptor.GetPartitionConfiguration(), perMapConfig);
+                mappers.Add(injector.GetInstance<IMapFunction<TMapInput, 
TMapOutput>>());
+                counter++;
             }
             return new MapFunctions<TMapInput, TMapOutput>(mappers);
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
new file mode 100644
index 0000000..31d28c4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InputCodecWrapper.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.InProcess
+{
+    internal class InputCodecWrapper<T>
+    {
+        [Inject]
+        private InputCodecWrapper(IStreamingCodec<T> codec)
+        {
+            Codec = codec;
+        }
+
+        internal IStreamingCodec<T> Codec { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
new file mode 100644
index 0000000..fd6cc47
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/OutputCodecWrapper.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.InProcess
+{
+    internal class OutputCodecWrapper<T>
+    {
+        [Inject]
+        private OutputCodecWrapper(IStreamingCodec<T> codec)
+        {
+            Codec = codec;
+        }
+
+        internal IStreamingCodec<T> Codec { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/582284e6/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index e3fecf9..332137b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -58,6 +58,8 @@ under the License.
     <Compile Include="InProcess\IMRURunner.cs" />
     <Compile Include="InProcess\InProcessIMRUClient.cs" />
     <Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
+    <Compile Include="InProcess\OutputCodecWrapper.cs" />
+    <Compile Include="InProcess\InputCodecWrapper.cs" />
     <Compile Include="InProcess\MapFunctions.cs" />
     <Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
     <Compile Include="OnREEF\Client\REEFIMRUClientConfiguration.cs" />

Reply via email to