Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 273d2f975 -> 6b03ec513


MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between 
copySucceeded() in one thread and copyFailed() in another thread on the same 
host. Contributed by Junping Du.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b03ec51
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b03ec51
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b03ec51

Branch: refs/heads/branch-2.7
Commit: 6b03ec51373b175fa44e5d5fb5fae3c64eb620d9
Parents: 273d2f9
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Wed May 13 00:28:17 2015 +0900
Committer: Junping Du <junping...@apache.org>
Committed: Tue May 19 12:01:14 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  4 ++
 .../task/reduce/ShuffleSchedulerImpl.java       | 14 +++-
 .../task/reduce/TestShuffleScheduler.java       | 70 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b03ec51/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index a8cbb8f..710018f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -36,6 +36,10 @@ Release 2.7.1 - UNRELEASED
     that they don't fail on history-server backed by DFSes with not so strong
     guarantees. (Craig Welch via vinodkv)
 
+    MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
+    copySucceeded() in one thread and copyFailed() in another thread on the
+    same host. (Junping Du via ozawa)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b03ec51/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index 37f4af3..cce36de 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl<K,V> implements 
ShuffleScheduler<K,V> {
   }
   
   private void updateStatus() {
-    updateStatus(null);        
+    updateStatus(null);
   }
 
   public synchronized void hostFailed(String hostname) {
@@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl<K,V> implements 
ShuffleScheduler<K,V> {
       failureCounts.put(mapId, new IntWritable(1));
     }
     String hostname = host.getHostName();
+    IntWritable hostFailedNum = hostFailures.get(hostname);
+    // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
+    // thread with copySucceeded.
+    // In this case, add back hostname to hostFailures to get rid of NPE issue.
+    if (hostFailedNum == null) {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
     //report failure if already retried maxHostFailures times
-    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() 
? true : false;
-    
+    boolean hostFail = hostFailures.get(hostname).get() >
+        getMaxHostFailures() ? true : false;
+
     if (failures >= abortFailureLimit) {
       try {
         throw new IOException(failures + " failures downloading " + mapId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b03ec51/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 6ac2320..654b748 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -213,6 +213,76 @@ public class TestShuffleScheduler {
     Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = 
job.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+    CombineOutputCollector<K, V>  mockCombineOutputCollector =
+        (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+        mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output = mock(MapOutput.class);
+
+    ShuffleConsumerPlugin.Context<K, V> context =
+        new ShuffleConsumerPlugin.Context<K, V>(
+            mockTaskAttemptID, job, mockFileSystem,
+            mockUmbilical, mockLocalDirAllocator,
+            mockReporter, mockCompressionCodec,
+            combinerClass, mockCombineOutputCollector,
+            mockCounter, mockCounter, mockCounter,
+            mockCounter, mockCounter, mockCounter,
+            mockTaskStatus, mockProgress, mockProgress,
+            mockTask, mockMapOutputFile, null);
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+    ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
+        status, null, null, progress, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+
+    MapHost host1 = new MapHost("host1", null);
+    TaskAttemptID failedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 0), 0);
+
+    TaskAttemptID succeedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 1), 1);
+
+    // handle output fetch failure for failedAttemptID, part I
+    scheduler.hostFailed(host1.getHostName());
+
+    // handle output fetch succeed for succeedAttemptID
+    long bytes = (long)500 * 1024 * 1024;
+    scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
+
+    // handle output fetch failure for failedAttemptID, part II
+    // for MAPREDUCE-6361: verify no NPE exception get thrown out
+    scheduler.copyFailed(failedAttemptID, host1, true, false);
+  }
+
   private static String copyMessage(int attemptNo, double rate1, double rate2) 
{
     int attemptZero = attemptNo - 1;
     return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at 
%1.2f MB/s)"

Reply via email to