IGNITE-4461: Hadoop: added automatic resolution of "raw" comparator for Text class.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f406887c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f406887c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f406887c Branch: refs/heads/ignite-3477 Commit: f406887c274550317e1b6fbbe1bb302f53a5eaad Parents: beb242b Author: devozerov <voze...@gridgain.com> Authored: Thu Jan 5 14:48:06 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Thu Jan 5 14:48:35 2017 +0300 ---------------------------------------------------------------------- .../hadoop/impl/v2/HadoopV2TaskContext.java | 64 ++++++++++++++------ 1 file changed, 46 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f406887c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java index e9cae1c..d328550 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -41,6 +41,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.hadoop.io.PartiallyRawComparator; +import org.apache.ignite.hadoop.io.TextPartiallyRawComparator; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit; @@ -76,6 +77,8 @@ import java.io.File; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -99,6 +102,9 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); + /** Default partial comparator mappings. */ + private static final Map<String, String> PARTIAL_COMPARATORS = new HashMap<>(); + /** * This method is called with reflection upon Job finish with class loader of each task. * This will clean up all the Fs created for specific task. @@ -111,24 +117,6 @@ public class HadoopV2TaskContext extends HadoopTaskContext { fsMap.close(); } - /** - * Check for combiner grouping support (available since Hadoop 2.3). - */ - static { - boolean ok; - - try { - JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator"); - - ok = true; - } - catch (NoSuchMethodException ignore) { - ok = false; - } - - COMBINE_KEY_GROUPING_SUPPORTED = ok; - } - /** Flag is set if new context-object code is used for running the mapper. */ private final boolean useNewMapper; @@ -153,6 +141,23 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** Counters for task. */ private final HadoopCounters cntrs = new HadoopCountersImpl(); + static { + boolean ok; + + try { + JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator"); + + ok = true; + } + catch (NoSuchMethodException ignore) { + ok = false; + } + + COMBINE_KEY_GROUPING_SUPPORTED = ok; + + PARTIAL_COMPARATORS.put(Text.class.getName(), TextPartiallyRawComparator.class.getName()); + } + /** * @param taskInfo Task info. * @param job Job. @@ -181,6 +186,8 @@ public class HadoopV2TaskContext extends HadoopTaskContext { // For map-reduce jobs prefer local writes. jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true); + initializePartiallyRawComparator(jobConf); + jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId())); useNewMapper = jobConf.getUseNewMapper(); @@ -447,6 +454,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Comparator<Object> groupComparator() { Comparator<?> res; @@ -581,4 +589,24 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw new IgniteCheckedException(e); } } + + /** + * Try initializing partially raw comparator for job. + * + * @param conf Configuration. + */ + private void initializePartiallyRawComparator(JobConf conf) { + String clsName = conf.get(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), null); + + if (clsName == null) { + Class keyCls = conf.getMapOutputKeyClass(); + + if (keyCls != null) { + clsName = PARTIAL_COMPARATORS.get(keyCls.getName()); + + if (clsName != null) + conf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), clsName); + } + } + } } \ No newline at end of file