Author: acmurthy Date: Fri Dec 7 02:36:33 2012 New Revision: 1418173 URL: http://svn.apache.org/viewvc?rev=1418173&view=rev Log: MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. Contributed by Anver BenHanoch.
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1418173&r1=1418172&r2=1418173&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Dec 7 02:36:33 2012 @@ -11,6 +11,9 @@ Trunk (Unreleased) MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation. (Plamen Jeliazkov via shv) + MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. + (Avner BenHanoch via acmurthy) + IMPROVEMENTS MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1418173&r1=1418172&r2=1418173&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Fri Dec 7 02:36:33 2012 @@ -340,6 +340,7 @@ public class ReduceTask extends Task { // Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; + ShuffleConsumerPlugin shuffleConsumerPlugin = null; boolean isLocal = false; // local if @@ -358,8 +359,14 @@ public class ReduceTask extends Task { (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; - Shuffle shuffle = - new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, + Class<? extends ShuffleConsumerPlugin> clazz = + job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); + + shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); + LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); + + ShuffleConsumerPlugin.Context shuffleContext = + new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, @@ -368,7 +375,8 @@ public class ReduceTask extends Task { mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile); - rIter = shuffle.run(); + shuffleConsumerPlugin.init(shuffleContext); + rIter = shuffleConsumerPlugin.run(); } else { // local job runner doesn't have a copy phase copyPhase.complete(); @@ -399,6 +407,10 @@ public class ReduceTask extends Task { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } + + if (shuffleConsumerPlugin != null) { + shuffleConsumerPlugin.close(); + } done(umbilical, reporter); } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1418173&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Fri Dec 7 02:36:33 2012 @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * ShuffleConsumerPlugin for serving Reducers. It may shuffle MOF files from + * either the built-in ShuffleHandler or from a 3rd party AuxiliaryService. + * + */ +@InterfaceAudience.LimitedPrivate("mapreduce") +@InterfaceStability.Unstable +public interface ShuffleConsumerPlugin<K, V> { + + public void init(Context<K, V> context); + + public RawKeyValueIterator run() throws IOException, InterruptedException; + + public void close(); + + @InterfaceAudience.LimitedPrivate("mapreduce") + @InterfaceStability.Unstable + public static class Context<K,V> { + private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId; + private final JobConf jobConf; + private final FileSystem localFS; + private final TaskUmbilicalProtocol umbilical; + private final LocalDirAllocator localDirAllocator; + private final Reporter reporter; + private final CompressionCodec codec; + private final Class<? extends Reducer> combinerClass; + private final CombineOutputCollector<K, V> combineCollector; + private final Counters.Counter spilledRecordsCounter; + private final Counters.Counter reduceCombineInputCounter; + private final Counters.Counter shuffledMapsCounter; + private final Counters.Counter reduceShuffleBytes; + private final Counters.Counter failedShuffleCounter; + private final Counters.Counter mergedMapOutputsCounter; + private final TaskStatus status; + private final Progress copyPhase; + private final Progress mergePhase; + private final Task reduceTask; + private final MapOutputFile mapOutputFile; + + public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, + JobConf jobConf, FileSystem localFS, + TaskUmbilicalProtocol umbilical, + LocalDirAllocator localDirAllocator, + Reporter reporter, CompressionCodec codec, + Class<? extends Reducer> combinerClass, + CombineOutputCollector<K,V> combineCollector, + Counters.Counter spilledRecordsCounter, + Counters.Counter reduceCombineInputCounter, + Counters.Counter shuffledMapsCounter, + Counters.Counter reduceShuffleBytes, + Counters.Counter failedShuffleCounter, + Counters.Counter mergedMapOutputsCounter, + TaskStatus status, Progress copyPhase, Progress mergePhase, + Task reduceTask, MapOutputFile mapOutputFile) { + this.reduceId = reduceId; + this.jobConf = jobConf; + this.localFS = localFS; + this. umbilical = umbilical; + this.localDirAllocator = localDirAllocator; + this.reporter = reporter; + this.codec = codec; + this.combinerClass = combinerClass; + this.combineCollector = combineCollector; + this.spilledRecordsCounter = spilledRecordsCounter; + this.reduceCombineInputCounter = reduceCombineInputCounter; + this.shuffledMapsCounter = shuffledMapsCounter; + this.reduceShuffleBytes = reduceShuffleBytes; + this.failedShuffleCounter = failedShuffleCounter; + this.mergedMapOutputsCounter = mergedMapOutputsCounter; + this.status = status; + this.copyPhase = copyPhase; + this.mergePhase = mergePhase; + this.reduceTask = reduceTask; + this.mapOutputFile = mapOutputFile; + } + + public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() { + return reduceId; + } + public JobConf getJobConf() { + return jobConf; + } + public FileSystem getLocalFS() { + return localFS; + } + public TaskUmbilicalProtocol getUmbilical() { + return umbilical; + } + public LocalDirAllocator getLocalDirAllocator() { + return localDirAllocator; + } + public Reporter getReporter() { + return reporter; + } + public CompressionCodec getCodec() { + return codec; + } + public Class<? extends Reducer> getCombinerClass() { + return combinerClass; + } + public CombineOutputCollector<K, V> getCombineCollector() { + return combineCollector; + } + public Counters.Counter getSpilledRecordsCounter() { + return spilledRecordsCounter; + } + public Counters.Counter getReduceCombineInputCounter() { + return reduceCombineInputCounter; + } + public Counters.Counter getShuffledMapsCounter() { + return shuffledMapsCounter; + } + public Counters.Counter getReduceShuffleBytes() { + return reduceShuffleBytes; + } + public Counters.Counter getFailedShuffleCounter() { + return failedShuffleCounter; + } + public Counters.Counter getMergedMapOutputsCounter() { + return mergedMapOutputsCounter; + } + public TaskStatus getStatus() { + return status; + } + public Progress getCopyPhase() { + return copyPhase; + } + public Progress getMergePhase() { + return mergePhase; + } + public Task getReduceTask() { + return reduceTask; + } + public MapOutputFile getMapOutputFile() { + return mapOutputFile; + } + } // end of public static class Context<K,V> + +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1418173&r1=1418172&r2=1418173&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Dec 7 02:36:33 2012 @@ -85,6 +85,9 @@ public interface MRConfig { public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; + public static final String SHUFFLE_CONSUMER_PLUGIN = + "mapreduce.job.reduce.shuffle.consumer.plugin.class"; + /** * Configuration key to enable/disable IFile readahead. */ Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1418173&r1=1418172&r2=1418173&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Fri Dec 7 02:36:33 2012 @@ -34,73 +34,63 @@ import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapred.ShuffleConsumerPlugin; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progress; -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("mapreduce") @InterfaceStability.Unstable @SuppressWarnings({"unchecked", "rawtypes"}) -public class Shuffle<K, V> implements ExceptionReporter { +public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MIN_EVENTS_TO_FETCH = 100; private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; - private final TaskAttemptID reduceId; - private final JobConf jobConf; - private final Reporter reporter; - private final ShuffleClientMetrics metrics; - private final TaskUmbilicalProtocol umbilical; + private ShuffleConsumerPlugin.Context context; + + private TaskAttemptID reduceId; + private JobConf jobConf; + private Reporter reporter; + private ShuffleClientMetrics metrics; + private TaskUmbilicalProtocol umbilical; - private final ShuffleScheduler<K,V> scheduler; - private final MergeManager<K, V> merger; + private ShuffleScheduler<K,V> scheduler; + private MergeManager<K, V> merger; private Throwable throwable = null; private String throwingThreadName = null; - private final Progress copyPhase; - private final TaskStatus taskStatus; - private final Task reduceTask; //Used for status updates - - public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, - TaskUmbilicalProtocol umbilical, - LocalDirAllocator localDirAllocator, - Reporter reporter, - CompressionCodec codec, - Class<? extends Reducer> combinerClass, - CombineOutputCollector<K,V> combineCollector, - Counters.Counter spilledRecordsCounter, - Counters.Counter reduceCombineInputCounter, - Counters.Counter shuffledMapsCounter, - Counters.Counter reduceShuffleBytes, - Counters.Counter failedShuffleCounter, - Counters.Counter mergedMapOutputsCounter, - TaskStatus status, - Progress copyPhase, - Progress mergePhase, - Task reduceTask, - MapOutputFile mapOutputFile) { - this.reduceId = reduceId; - this.jobConf = jobConf; - this.umbilical = umbilical; - this.reporter = reporter; + private Progress copyPhase; + private TaskStatus taskStatus; + private Task reduceTask; //Used for status updates + + @Override + public void init(ShuffleConsumerPlugin.Context context) { + this.context = context; + + this.reduceId = context.getReduceId(); + this.jobConf = context.getJobConf(); + this.umbilical = context.getUmbilical(); + this.reporter = context.getReporter(); this.metrics = new ShuffleClientMetrics(reduceId, jobConf); - this.copyPhase = copyPhase; - this.taskStatus = status; - this.reduceTask = reduceTask; + this.copyPhase = context.getCopyPhase(); + this.taskStatus = context.getStatus(); + this.reduceTask = context.getReduceTask(); scheduler = - new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase, - shuffledMapsCounter, - reduceShuffleBytes, failedShuffleCounter); - merger = new MergeManager<K, V>(reduceId, jobConf, localFS, - localDirAllocator, reporter, codec, - combinerClass, combineCollector, - spilledRecordsCounter, - reduceCombineInputCounter, - mergedMapOutputsCounter, - this, mergePhase, mapOutputFile); + new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase, + context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(), + context.getLocalDirAllocator(), reporter, context.getCodec(), + context.getCombinerClass(), context.getCombineCollector(), + context.getSpilledRecordsCounter(), + context.getReduceCombineInputCounter(), + context.getMergedMapOutputsCounter(), + this, context.getMergePhase(), context.getMapOutputFile()); } + @Override public RawKeyValueIterator run() throws IOException, InterruptedException { // Scale the maximum events we fetch per RPC call to mitigate OOM issues // on the ApplicationMaster when a thundering herd of reducers fetch events @@ -171,6 +161,10 @@ public class Shuffle<K, V> implements Ex return kvIter; } + @Override + public void close(){ + } + public synchronized void reportException(Throwable t) { if (throwable == null) { throwable = t; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1418173&r1=1418172&r2=1418173&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Dec 7 02:36:33 2012 @@ -748,6 +748,16 @@ </description> </property> +<property> + <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name> + <value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value> + <description> + Name of the class whose instance will be used + to send shuffle requests by reducetasks of this job. + The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin. + </description> +</property> + <!-- MR YARN Application properties --> <property> Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1418173&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Fri Dec 7 02:36:33 2012 @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.hadoop.mapreduce; + +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapreduce.task.reduce.Shuffle; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.ReduceTask; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapred.ShuffleConsumerPlugin; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.Reducer; + +/** + * A JUnit for testing availability and accessibility of shuffle related API. + * It is needed for maintaining comptability with external sub-classes of + * ShuffleConsumerPlugin and AuxiliaryService(s) like ShuffleHandler. + * + * The importance of this test is for preserving API with 3rd party plugins. + */ +public class TestShufflePlugin<K, V> { + + static class TestShuffleConsumerPlugin<K, V> implements ShuffleConsumerPlugin<K, V> { + + @Override + public void init(ShuffleConsumerPlugin.Context<K, V> context) { + // just verify that Context has kept its public interface + context.getReduceId(); + context.getJobConf(); + context.getLocalFS(); + context.getUmbilical(); + context.getLocalDirAllocator(); + context.getReporter(); + context.getCodec(); + context.getCombinerClass(); + context.getCombineCollector(); + context.getSpilledRecordsCounter(); + context.getReduceCombineInputCounter(); + context.getShuffledMapsCounter(); + context.getReduceShuffleBytes(); + context.getFailedShuffleCounter(); + context.getMergedMapOutputsCounter(); + context.getStatus(); + context.getCopyPhase(); + context.getMergePhase(); + context.getReduceTask(); + context.getMapOutputFile(); + } + + @Override + public void close(){ + } + + @Override + public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{ + return null; + } + } + + + + @Test + /** + * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin + * as if it came from a 3rd party. + */ + public void testPluginAbility() { + + try{ + // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin + JobConf jobConf = new JobConf(); + jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, + TestShufflePlugin.TestShuffleConsumerPlugin.class, + ShuffleConsumerPlugin.class); + + ShuffleConsumerPlugin shuffleConsumerPlugin = null; + Class<? extends ShuffleConsumerPlugin> clazz = + jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); + assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz); + + // load 3rd party plugin through core's factory method + shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf); + assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin); + } + catch (Exception e) { + assertTrue("Threw exception:" + e, false); + } + } + + @Test + /** + * A testing method verifying availability and accessibility of API that is needed + * for sub-classes of ShuffleConsumerPlugin + */ + public void testConsumerApi() { + + JobConf jobConf = new JobConf(); + ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>(); + + //mock creation + ReduceTask mockReduceTask = mock(ReduceTask.class); + TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); + Reporter mockReporter = mock(Reporter.class); + FileSystem mockFileSystem = mock(FileSystem.class); + Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass(); + @SuppressWarnings("unchecked") // needed for mock with generic + CombineOutputCollector<K, V> mockCombineOutputCollector = + (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); + org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = + mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); + Counter mockCounter = mock(Counter.class); + TaskStatus mockTaskStatus = mock(TaskStatus.class); + Progress mockProgress = mock(Progress.class); + MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); + Task mockTask = mock(Task.class); + + try { + String [] dirs = jobConf.getLocalDirs(); + // verify that these APIs are available through super class handler + ShuffleConsumerPlugin.Context<K, V> context = + new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem, + mockUmbilical, mockLocalDirAllocator, + mockReporter, mockCompressionCodec, + combinerClass, mockCombineOutputCollector, + mockCounter, mockCounter, mockCounter, + mockCounter, mockCounter, mockCounter, + mockTaskStatus, mockProgress, mockProgress, + mockTask, mockMapOutputFile); + shuffleConsumerPlugin.init(context); + shuffleConsumerPlugin.run(); + shuffleConsumerPlugin.close(); + } + catch (Exception e) { + assertTrue("Threw exception:" + e, false); + } + + // verify that these APIs are available for 3rd party plugins + mockReduceTask.getTaskID(); + mockReduceTask.getJobID(); + mockReduceTask.getNumMaps(); + mockReduceTask.getPartition(); + mockReporter.progress(); + } + + @Test + /** + * A testing method verifying availability and accessibility of API needed for + * AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins) + */ + public void testProviderApi() { + + ApplicationId mockApplicationId = mock(ApplicationId.class); + mockApplicationId.setClusterTimestamp(new Long(10)); + mockApplicationId.setId(mock(JobID.class).getId()); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + JobConf mockJobConf = mock(JobConf.class); + try { + mockLocalDirAllocator.getLocalPathToRead("", mockJobConf); + } + catch (Exception e) { + assertTrue("Threw exception:" + e, false); + } + } +}