IGNITE-2343: Hadoop: InputSplit is now singleton, it resolved the problem. This 
closes #399.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f5b2021
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f5b2021
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f5b2021

Branch: refs/heads/ignite-2324
Commit: 1f5b2021c5218a3b448a9445445d81c2363911a4
Parents: 5f4a113
Author: iveselovskiy <[email protected]>
Authored: Mon Jan 25 12:15:56 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Mon Jan 25 12:15:56 2016 +0300

----------------------------------------------------------------------
 .../processors/hadoop/v2/HadoopV2Context.java   |  2 ++
 .../processors/hadoop/v2/HadoopV2MapTask.java   | 23 +++++---------------
 2 files changed, 7 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1f5b2021/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
index 5108e2d..2ff2945 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
@@ -101,11 +101,13 @@ public class HadoopV2Context extends JobContextImpl 
implements MapContext, Reduc
                 inputSplit = new FileSplit(new Path(fileBlock.file()), 
fileBlock.start(), fileBlock.length(), null);
             }
             else
+            {
                 try {
                     inputSplit = (InputSplit) 
((HadoopV2TaskContext)ctx).getNativeSplit(split);
                 } catch (IgniteCheckedException e) {
                     throw new IllegalStateException(e);
                 }
+            }
         }
 
         return inputSplit;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f5b2021/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
index 989260c..fafa79b 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
@@ -17,20 +17,16 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 
@@ -48,26 +44,17 @@ public class HadoopV2MapTask extends HadoopV2Task {
     /** {@inheritDoc} */
     @SuppressWarnings({"ConstantConditions", "unchecked"})
     @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
-        HadoopInputSplit split = info().inputSplit();
-
-        InputSplit nativeSplit;
-
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock block = (HadoopFileBlock)split;
-
-            nativeSplit = new FileSplit(new Path(block.file().toString()), 
block.start(), block.length(), null);
-        }
-        else
-            nativeSplit = (InputSplit)taskCtx.getNativeSplit(split);
-
-        assert nativeSplit != null;
-
         OutputFormat outputFormat = null;
         Exception err = null;
 
         JobContextImpl jobCtx = taskCtx.jobContext();
 
         try {
+            InputSplit nativeSplit = hadoopContext().getInputSplit();
+
+            if (nativeSplit == null)
+                throw new IgniteCheckedException("Input split cannot be 
null.");
+
             InputFormat inFormat = 
ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
                 hadoopContext().getConfiguration());
 

Reply via email to