IGNITE-2343: Hadoop: InputSplit is now singleton, it resolved the problem. This closes #399.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f5b2021 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f5b2021 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f5b2021 Branch: refs/heads/ignite-2324 Commit: 1f5b2021c5218a3b448a9445445d81c2363911a4 Parents: 5f4a113 Author: iveselovskiy <[email protected]> Authored: Mon Jan 25 12:15:56 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Jan 25 12:15:56 2016 +0300 ---------------------------------------------------------------------- .../processors/hadoop/v2/HadoopV2Context.java | 2 ++ .../processors/hadoop/v2/HadoopV2MapTask.java | 23 +++++--------------- 2 files changed, 7 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1f5b2021/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java index 5108e2d..2ff2945 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java @@ -101,11 +101,13 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); } else + { try { inputSplit = (InputSplit) ((HadoopV2TaskContext)ctx).getNativeSplit(split); } catch (IgniteCheckedException e) { throw new IllegalStateException(e); } + } } return inputSplit; http://git-wip-us.apache.org/repos/asf/ignite/blob/1f5b2021/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java index 989260c..fafa79b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java @@ -17,20 +17,16 @@ package org.apache.ignite.internal.processors.hadoop.v2; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobContextImpl; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -48,26 +44,17 @@ public class HadoopV2MapTask extends HadoopV2Task { /** {@inheritDoc} */ @SuppressWarnings({"ConstantConditions", "unchecked"}) @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { - HadoopInputSplit split = info().inputSplit(); - - InputSplit nativeSplit; - - if (split instanceof HadoopFileBlock) { - HadoopFileBlock block = (HadoopFileBlock)split; - - nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null); - } - else - nativeSplit = (InputSplit)taskCtx.getNativeSplit(split); - - assert nativeSplit != null; - OutputFormat outputFormat = null; Exception err = null; JobContextImpl jobCtx = taskCtx.jobContext(); try { + InputSplit nativeSplit = hadoopContext().getInputSplit(); + + if (nativeSplit == null) + throw new IgniteCheckedException("Input split cannot be null."); + InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(), hadoopContext().getConfiguration());
