Author: acmurthy
Date: Fri Dec  7 02:36:33 2012
New Revision: 1418173

URL: http://svn.apache.org/viewvc?rev=1418173&view=rev
Log:
MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. 
Contributed by Anver BenHanoch.

Added:
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Dec  7 
02:36:33 2012
@@ -11,6 +11,9 @@ Trunk (Unreleased)
     MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
     (Plamen Jeliazkov via shv)
 
+    MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
+    (Avner BenHanoch via acmurthy) 
+
   IMPROVEMENTS
 
     MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
 Fri Dec  7 02:36:33 2012
@@ -340,6 +340,7 @@ public class ReduceTask extends Task {
     // Initialize the codec
     codec = initCodec();
     RawKeyValueIterator rIter = null;
+    ShuffleConsumerPlugin shuffleConsumerPlugin = null; 
     
     boolean isLocal = false; 
     // local if
@@ -358,8 +359,14 @@ public class ReduceTask extends Task {
         (null != combinerClass) ? 
             new CombineOutputCollector(reduceCombineOutputCounter, reporter, 
conf) : null;
 
-      Shuffle shuffle = 
-        new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
+      Class<? extends ShuffleConsumerPlugin> clazz =
+            job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, 
ShuffleConsumerPlugin.class);
+                                               
+      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+      LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+      ShuffleConsumerPlugin.Context shuffleContext = 
+        new ShuffleConsumerPlugin.Context(getTaskID(), job, 
FileSystem.getLocal(job), umbilical, 
                     super.lDirAlloc, reporter, codec, 
                     combinerClass, combineCollector, 
                     spilledRecordsCounter, reduceCombineInputCounter,
@@ -368,7 +375,8 @@ public class ReduceTask extends Task {
                     mergedMapOutputsCounter,
                     taskStatus, copyPhase, sortPhase, this,
                     mapOutputFile);
-      rIter = shuffle.run();
+      shuffleConsumerPlugin.init(shuffleContext);
+      rIter = shuffleConsumerPlugin.run();
     } else {
       // local job runner doesn't have a copy phase
       copyPhase.complete();
@@ -399,6 +407,10 @@ public class ReduceTask extends Task {
       runOldReducer(job, umbilical, reporter, rIter, comparator, 
                     keyClass, valueClass);
     }
+
+    if (shuffleConsumerPlugin != null) {
+      shuffleConsumerPlugin.close();
+    }
     done(umbilical, reporter);
   }
 

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1418173&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
 Fri Dec  7 02:36:33 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * ShuffleConsumerPlugin for serving Reducers.  It may shuffle MOF files from
+ * either the built-in ShuffleHandler or from a 3rd party AuxiliaryService.
+ *
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public interface ShuffleConsumerPlugin<K, V> {
+
+  public void init(Context<K, V> context);
+
+  public RawKeyValueIterator run() throws IOException, InterruptedException;
+
+  public void close();
+
+  @InterfaceAudience.LimitedPrivate("mapreduce")
+  @InterfaceStability.Unstable
+  public static class Context<K,V> {
+    private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+    private final JobConf jobConf;
+    private final FileSystem localFS;
+    private final TaskUmbilicalProtocol umbilical;
+    private final LocalDirAllocator localDirAllocator;
+    private final Reporter reporter;
+    private final CompressionCodec codec;
+    private final Class<? extends Reducer> combinerClass;
+    private final CombineOutputCollector<K, V> combineCollector;
+    private final Counters.Counter spilledRecordsCounter;
+    private final Counters.Counter reduceCombineInputCounter;
+    private final Counters.Counter shuffledMapsCounter;
+    private final Counters.Counter reduceShuffleBytes;
+    private final Counters.Counter failedShuffleCounter;
+    private final Counters.Counter mergedMapOutputsCounter;
+    private final TaskStatus status;
+    private final Progress copyPhase;
+    private final Progress mergePhase;
+    private final Task reduceTask;
+    private final MapOutputFile mapOutputFile;
+
+    public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
+                   JobConf jobConf, FileSystem localFS,
+                   TaskUmbilicalProtocol umbilical,
+                   LocalDirAllocator localDirAllocator,
+                   Reporter reporter, CompressionCodec codec,
+                   Class<? extends Reducer> combinerClass,
+                   CombineOutputCollector<K,V> combineCollector,
+                   Counters.Counter spilledRecordsCounter,
+                   Counters.Counter reduceCombineInputCounter,
+                   Counters.Counter shuffledMapsCounter,
+                   Counters.Counter reduceShuffleBytes,
+                   Counters.Counter failedShuffleCounter,
+                   Counters.Counter mergedMapOutputsCounter,
+                   TaskStatus status, Progress copyPhase, Progress mergePhase,
+                   Task reduceTask, MapOutputFile mapOutputFile) {
+      this.reduceId = reduceId;
+      this.jobConf = jobConf;
+      this.localFS = localFS;
+      this. umbilical = umbilical;
+      this.localDirAllocator = localDirAllocator;
+      this.reporter = reporter;
+      this.codec = codec;
+      this.combinerClass = combinerClass;
+      this.combineCollector = combineCollector;
+      this.spilledRecordsCounter = spilledRecordsCounter;
+      this.reduceCombineInputCounter = reduceCombineInputCounter;
+      this.shuffledMapsCounter = shuffledMapsCounter;
+      this.reduceShuffleBytes = reduceShuffleBytes;
+      this.failedShuffleCounter = failedShuffleCounter;
+      this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+      this.status = status;
+      this.copyPhase = copyPhase;
+      this.mergePhase = mergePhase;
+      this.reduceTask = reduceTask;
+      this.mapOutputFile = mapOutputFile;
+    }
+
+    public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
+      return reduceId;
+    }
+    public JobConf getJobConf() {
+      return jobConf;
+    }
+    public FileSystem getLocalFS() {
+      return localFS;
+    }
+    public TaskUmbilicalProtocol getUmbilical() {
+      return umbilical;
+    }
+    public LocalDirAllocator getLocalDirAllocator() {
+      return localDirAllocator;
+    }
+    public Reporter getReporter() {
+      return reporter;
+    }
+    public CompressionCodec getCodec() {
+      return codec;
+    }
+    public Class<? extends Reducer> getCombinerClass() {
+      return combinerClass;
+    }
+    public CombineOutputCollector<K, V> getCombineCollector() {
+      return combineCollector;
+    }
+    public Counters.Counter getSpilledRecordsCounter() {
+      return spilledRecordsCounter;
+    }
+    public Counters.Counter getReduceCombineInputCounter() {
+      return reduceCombineInputCounter;
+    }
+    public Counters.Counter getShuffledMapsCounter() {
+      return shuffledMapsCounter;
+    }
+    public Counters.Counter getReduceShuffleBytes() {
+      return reduceShuffleBytes;
+    }
+    public Counters.Counter getFailedShuffleCounter() {
+      return failedShuffleCounter;
+    }
+    public Counters.Counter getMergedMapOutputsCounter() {
+      return mergedMapOutputsCounter;
+    }
+    public TaskStatus getStatus() {
+      return status;
+    }
+    public Progress getCopyPhase() {
+      return copyPhase;
+    }
+    public Progress getMergePhase() {
+      return mergePhase;
+    }
+    public Task getReduceTask() {
+      return reduceTask;
+    }
+    public MapOutputFile getMapOutputFile() {
+      return mapOutputFile;
+    }
+  } // end of public static class Context<K,V>
+
+}

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
 Fri Dec  7 02:36:33 2012
@@ -85,6 +85,9 @@ public interface MRConfig {
 
   public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
 
+  public static final String SHUFFLE_CONSUMER_PLUGIN =
+    "mapreduce.job.reduce.shuffle.consumer.plugin.class";
+
   /**
    * Configuration key to enable/disable IFile readahead.
    */

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
 Fri Dec  7 02:36:33 2012
@@ -34,73 +34,63 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("mapreduce")
 @InterfaceStability.Unstable
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class Shuffle<K, V> implements ExceptionReporter {
+public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, 
ExceptionReporter {
   private static final int PROGRESS_FREQUENCY = 2000;
   private static final int MAX_EVENTS_TO_FETCH = 10000;
   private static final int MIN_EVENTS_TO_FETCH = 100;
   private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
   
-  private final TaskAttemptID reduceId;
-  private final JobConf jobConf;
-  private final Reporter reporter;
-  private final ShuffleClientMetrics metrics;
-  private final TaskUmbilicalProtocol umbilical;
+  private ShuffleConsumerPlugin.Context context;
+
+  private TaskAttemptID reduceId;
+  private JobConf jobConf;
+  private Reporter reporter;
+  private ShuffleClientMetrics metrics;
+  private TaskUmbilicalProtocol umbilical;
   
-  private final ShuffleScheduler<K,V> scheduler;
-  private final MergeManager<K, V> merger;
+  private ShuffleScheduler<K,V> scheduler;
+  private MergeManager<K, V> merger;
   private Throwable throwable = null;
   private String throwingThreadName = null;
-  private final Progress copyPhase;
-  private final TaskStatus taskStatus;
-  private final Task reduceTask; //Used for status updates
-  
-  public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
-                 TaskUmbilicalProtocol umbilical,
-                 LocalDirAllocator localDirAllocator,  
-                 Reporter reporter,
-                 CompressionCodec codec,
-                 Class<? extends Reducer> combinerClass,
-                 CombineOutputCollector<K,V> combineCollector,
-                 Counters.Counter spilledRecordsCounter,
-                 Counters.Counter reduceCombineInputCounter,
-                 Counters.Counter shuffledMapsCounter,
-                 Counters.Counter reduceShuffleBytes,
-                 Counters.Counter failedShuffleCounter,
-                 Counters.Counter mergedMapOutputsCounter,
-                 TaskStatus status,
-                 Progress copyPhase,
-                 Progress mergePhase,
-                 Task reduceTask,
-                 MapOutputFile mapOutputFile) {
-    this.reduceId = reduceId;
-    this.jobConf = jobConf;
-    this.umbilical = umbilical;
-    this.reporter = reporter;
+  private Progress copyPhase;
+  private TaskStatus taskStatus;
+  private Task reduceTask; //Used for status updates
+
+  @Override
+  public void init(ShuffleConsumerPlugin.Context context) {
+    this.context = context;
+
+    this.reduceId = context.getReduceId();
+    this.jobConf = context.getJobConf();
+    this.umbilical = context.getUmbilical();
+    this.reporter = context.getReporter();
     this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
-    this.copyPhase = copyPhase;
-    this.taskStatus = status;
-    this.reduceTask = reduceTask;
+    this.copyPhase = context.getCopyPhase();
+    this.taskStatus = context.getStatus();
+    this.reduceTask = context.getReduceTask();
     
     scheduler = 
-      new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase, 
-                                shuffledMapsCounter, 
-                                reduceShuffleBytes, failedShuffleCounter);
-    merger = new MergeManager<K, V>(reduceId, jobConf, localFS, 
-                                    localDirAllocator, reporter, codec, 
-                                    combinerClass, combineCollector, 
-                                    spilledRecordsCounter, 
-                                    reduceCombineInputCounter, 
-                                    mergedMapOutputsCounter, 
-                                    this, mergePhase, mapOutputFile);
+      new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase, 
+                                context.getShuffledMapsCounter(), 
+                                context.getReduceShuffleBytes(), 
context.getFailedShuffleCounter());
+    merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
+                                    context.getLocalDirAllocator(), reporter, 
context.getCodec(),
+                                    context.getCombinerClass(), 
context.getCombineCollector(), 
+                                    context.getSpilledRecordsCounter(), 
+                                    context.getReduceCombineInputCounter(), 
+                                    context.getMergedMapOutputsCounter(), 
+                                    this, context.getMergePhase(), 
context.getMapOutputFile());
   }
 
+  @Override
   public RawKeyValueIterator run() throws IOException, InterruptedException {
     // Scale the maximum events we fetch per RPC call to mitigate OOM issues
     // on the ApplicationMaster when a thundering herd of reducers fetch events
@@ -171,6 +161,10 @@ public class Shuffle<K, V> implements Ex
     return kvIter;
   }
 
+  @Override
+  public void close(){
+  }
+
   public synchronized void reportException(Throwable t) {
     if (throwable == null) {
       throwable = t;

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 Fri Dec  7 02:36:33 2012
@@ -748,6 +748,16 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
+  <value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value>
+  <description> 
+  Name of the class whose instance will be used 
+  to send shuffle requests by reducetasks of this job.
+  The class must be an instance of 
org.apache.hadoop.mapred.ShuffleConsumerPlugin.
+  </description>
+</property>
+
 <!-- MR YARN Application properties -->
 
 <property>

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1418173&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
 Fri Dec  7 02:36:33 2012
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.ReduceTask;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+  * A JUnit for testing availability and accessibility of shuffle related API.
+  * It is needed for maintaining comptability with external sub-classes of
+  * ShuffleConsumerPlugin and AuxiliaryService(s) like ShuffleHandler.
+  *
+  * The importance of this test is for preserving API with 3rd party plugins.
+  */
+public class TestShufflePlugin<K, V> {
+
+  static class TestShuffleConsumerPlugin<K, V> implements 
ShuffleConsumerPlugin<K, V> {
+
+    @Override
+    public void init(ShuffleConsumerPlugin.Context<K, V> context) {
+        // just verify that Context has kept its public interface
+      context.getReduceId();
+      context.getJobConf();
+      context.getLocalFS();
+      context.getUmbilical();
+      context.getLocalDirAllocator();
+      context.getReporter();
+      context.getCodec();
+      context.getCombinerClass();
+      context.getCombineCollector();
+      context.getSpilledRecordsCounter();
+      context.getReduceCombineInputCounter();
+      context.getShuffledMapsCounter();
+      context.getReduceShuffleBytes();
+      context.getFailedShuffleCounter();
+      context.getMergedMapOutputsCounter();
+      context.getStatus();
+      context.getCopyPhase();
+      context.getMergePhase();
+      context.getReduceTask();
+      context.getMapOutputFile();
+    }
+
+    @Override
+    public void close(){
+    }
+
+    @Override
+    public RawKeyValueIterator run() throws java.io.IOException, 
java.lang.InterruptedException{
+      return null;
+    }
+  }
+
+
+
+  @Test
+  /**
+   * A testing method instructing core hadoop to load an external 
ShuffleConsumerPlugin
+   * as if it came from a 3rd party.
+   */
+  public void testPluginAbility() {
+
+    try{
+      // create JobConf with 
mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
+      JobConf jobConf = new JobConf();
+      jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
+                       TestShufflePlugin.TestShuffleConsumerPlugin.class,
+                       ShuffleConsumerPlugin.class);
+
+      ShuffleConsumerPlugin shuffleConsumerPlugin = null;
+      Class<? extends ShuffleConsumerPlugin> clazz =
+        jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, 
ShuffleConsumerPlugin.class);
+      assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, 
clazz);
+
+      // load 3rd party plugin through core's factory method
+      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
+      assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, 
shuffleConsumerPlugin);
+    }
+    catch (Exception e) {
+      assertTrue("Threw exception:" + e, false);
+    }
+  }
+
+  @Test
+  /**
+   * A testing method verifying availability and accessibility of API that is 
needed
+   * for sub-classes of ShuffleConsumerPlugin
+   */
+  public void testConsumerApi() {
+
+    JobConf jobConf = new JobConf();
+    ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new 
TestShuffleConsumerPlugin<K, V>();
+
+    //mock creation
+    ReduceTask mockReduceTask = mock(ReduceTask.class);
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = 
jobConf.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+    CombineOutputCollector<K, V>  mockCombineOutputCollector =
+      (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+      mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+
+    try {
+      String [] dirs = jobConf.getLocalDirs();
+      // verify that these APIs are available through super class handler
+      ShuffleConsumerPlugin.Context<K, V> context =
+           new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, 
mockFileSystem,
+                                                mockUmbilical, 
mockLocalDirAllocator,
+                                                mockReporter, 
mockCompressionCodec,
+                                                combinerClass, 
mockCombineOutputCollector,
+                                                mockCounter, mockCounter, 
mockCounter,
+                                                mockCounter, mockCounter, 
mockCounter,
+                                                mockTaskStatus, mockProgress, 
mockProgress,
+                                                mockTask, mockMapOutputFile);
+      shuffleConsumerPlugin.init(context);
+      shuffleConsumerPlugin.run();
+      shuffleConsumerPlugin.close();
+    }
+    catch (Exception e) {
+      assertTrue("Threw exception:" + e, false);
+    }
+
+    // verify that these APIs are available for 3rd party plugins
+    mockReduceTask.getTaskID();
+    mockReduceTask.getJobID();
+    mockReduceTask.getNumMaps();
+    mockReduceTask.getPartition();
+    mockReporter.progress();
+  }
+
+  @Test
+  /**
+   * A testing method verifying availability and accessibility of API needed 
for
+   * AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd 
party plugins)
+   */
+  public void testProviderApi() {
+
+    ApplicationId mockApplicationId = mock(ApplicationId.class);
+    mockApplicationId.setClusterTimestamp(new Long(10));
+    mockApplicationId.setId(mock(JobID.class).getId());
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    JobConf mockJobConf = mock(JobConf.class);
+    try {
+      mockLocalDirAllocator.getLocalPathToRead("", mockJobConf);
+    }
+    catch (Exception e) {
+      assertTrue("Threw exception:" + e, false);
+    }
+  }
+}


Reply via email to