Author: vinodkv Date: Fri Oct 19 20:21:32 2012 New Revision: 1400267 URL: http://svn.apache.org/viewvc?rev=1400267&view=rev Log: MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. Contributed by Jason Lowe. svn merge --ignore-ancestry -c 1400264 ../../trunk/
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Oct 19 20:21:32 2012 @@ -37,6 +37,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4479. Fix parameter order in assertEquals() in TestCombineInputFileFormat.java (Mariappan Asokan via bobby) + MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many + reducers complete consecutively. (Jason Lowe via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Oct 19 20:21:32 2012 @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRa import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -253,31 +251,23 @@ public class TaskAttemptListenerImpl ext @Override public MapTaskCompletionEventsUpdate getMapCompletionEvents( - JobID jobIdentifier, int fromEventId, int maxEvents, + JobID jobIdentifier, int startIndex, int maxEvents, TaskAttemptID taskAttemptID) throws IOException { LOG.info("MapCompletionEvents request from " + taskAttemptID.toString() - + ". fromEventID " + fromEventId + " maxEvents " + maxEvents); + + ". startIndex " + startIndex + " maxEvents " + maxEvents); // TODO: shouldReset is never used. See TT. Ask for Removal. boolean shouldReset = false; org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events = - context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents( - fromEventId, maxEvents); + context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents( + startIndex, maxEvents); taskHeartbeatHandler.progressing(attemptID); - - // filter the events to return only map completion events in old format - List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>(); - for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) { - if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) { - mapEvents.add(TypeConverter.fromYarn(event)); - } - } return new MapTaskCompletionEventsUpdate( - mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset); + TypeConverter.fromYarn(events), shouldReset); } @Override Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Fri Oct 19 20:21:32 2012 @@ -88,6 +88,9 @@ public interface Job { TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId, int maxEvents); + TaskAttemptCompletionEvent[] + getMapAttemptCompletionEvents(int startIndex, int maxEvents); + /** * @return information for MR AppMasters (previously failed and current) */ Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Oct 19 20:21:32 2012 @@ -178,6 +178,7 @@ public class JobImpl implements org.apac private int allowedMapFailuresPercent = 0; private int allowedReduceFailuresPercent = 0; private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents; + private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents; private final List<String> diagnostics = new ArrayList<String>(); //task/attempt related datastructures @@ -520,14 +521,28 @@ public class JobImpl implements org.apac @Override public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents( int fromEventId, int maxEvents) { + return getAttemptCompletionEvents(taskAttemptCompletionEvents, + fromEventId, maxEvents); + } + + @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return getAttemptCompletionEvents(mapAttemptCompletionEvents, + startIndex, maxEvents); + } + + private TaskAttemptCompletionEvent[] getAttemptCompletionEvents( + List<TaskAttemptCompletionEvent> eventList, + int startIndex, int maxEvents) { TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS; readLock.lock(); try { - if (taskAttemptCompletionEvents.size() > fromEventId) { + if (eventList.size() > startIndex) { int actualMax = Math.min(maxEvents, - (taskAttemptCompletionEvents.size() - fromEventId)); - events = taskAttemptCompletionEvents.subList(fromEventId, - actualMax + fromEventId).toArray(events); + (eventList.size() - startIndex)); + events = eventList.subList(startIndex, + actualMax + startIndex).toArray(events); } return events; } finally { @@ -1023,6 +1038,8 @@ public class JobImpl implements org.apac job.taskAttemptCompletionEvents = new ArrayList<TaskAttemptCompletionEvent>( job.numMapTasks + job.numReduceTasks + 10); + job.mapAttemptCompletionEvents = + new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10); job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); @@ -1294,6 +1311,9 @@ public class JobImpl implements org.apac //eventId is equal to index in the arraylist tce.setEventId(job.taskAttemptCompletionEvents.size()); job.taskAttemptCompletionEvents.add(tce); + if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) { + job.mapAttemptCompletionEvents.add(tce); + } //make the previous completion event as obsolete if it exists Object successEventNo = Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Oct 19 20:21:32 2012 @@ -17,20 +17,33 @@ */ package org.apache.hadoop.mapred; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; public class TestTaskAttemptListenerImpl { @@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl listener.stop(); } + + @Test + public void testGetMapCompletionEvents() throws IOException { + TaskAttemptCompletionEvent[] empty = {}; + TaskAttemptCompletionEvent[] taskEvents = { + createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE), + createTce(1, false, TaskAttemptCompletionEventStatus.FAILED), + createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED), + createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) }; + TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] }; + Job mockJob = mock(Job.class); + when(mockJob.getTaskAttemptCompletionEvents(0, 100)) + .thenReturn(taskEvents); + when(mockJob.getTaskAttemptCompletionEvents(0, 2)) + .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2)); + when(mockJob.getTaskAttemptCompletionEvents(2, 100)) + .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4)); + when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents); + when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents); + when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty); + + AppContext appCtx = mock(AppContext.class); + when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptListenerImpl listener = + new TaskAttemptListenerImpl(appCtx, secret) { + @Override + protected void registerHeartbeatHandler(Configuration conf) { + taskHeartbeatHandler = hbHandler; + } + }; + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + + JobID jid = new JobID("12345", 1); + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0); + MapTaskCompletionEventsUpdate update = + listener.getMapCompletionEvents(jid, 0, 100, tid); + assertEquals(2, update.events.length); + update = listener.getMapCompletionEvents(jid, 0, 2, tid); + assertEquals(2, update.events.length); + update = listener.getMapCompletionEvents(jid, 2, 100, tid); + assertEquals(0, update.events.length); + } + + private static TaskAttemptCompletionEvent createTce(int eventId, + boolean isMap, TaskAttemptCompletionEventStatus status) { + JobId jid = MRBuilderUtils.newJobId(12345, 1, 1); + TaskId tid = MRBuilderUtils.newTaskId(jid, 0, + isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP + : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + TaskAttemptCompletionEvent tce = recordFactory + .newRecordInstance(TaskAttemptCompletionEvent.class); + tce.setEventId(eventId); + tce.setAttemptId(attemptId); + tce.setStatus(status); + return tce; + } + } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Oct 19 20:21:32 2012 @@ -550,6 +550,12 @@ public class MockJobs extends MockApps { } @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return null; + } + + @Override public Map<TaskId, Task> getTasks(TaskType taskType) { throw new UnsupportedOperationException("Not supported yet."); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Fri Oct 19 20:21:32 2012 @@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a import java.util.Arrays; import java.util.Iterator; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -40,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Assert; import org.junit.Test; public class TestFetchFailure { @@ -144,6 +143,15 @@ public class TestFetchFailure { TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus()); Assert.assertEquals("Event status not correct for reduce attempt1", TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus()); + + TaskAttemptCompletionEvent mapEvents[] = + job.getMapAttemptCompletionEvents(0, 2); + Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length); + Assert.assertArrayEquals("Unexpected map events", + Arrays.copyOfRange(events, 0, 2), mapEvents); + mapEvents = job.getMapAttemptCompletionEvents(2, 200); + Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length); + Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]); } /** Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Fri Oct 19 20:21:32 2012 @@ -440,6 +440,12 @@ public class TestRuntimeEstimators { } @Override + public TaskAttemptCompletionEvent[] + getMapAttemptCompletionEvents(int startIndex, int maxEvents) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override public String getName() { throw new UnsupportedOperationException("Not supported yet."); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Oct 19 20:21:32 2012 @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.h import java.io.IOException; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -81,6 +82,7 @@ public class CompletedJob implements org private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>(); private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>(); private List<TaskAttemptCompletionEvent> completionEvents = null; + private List<TaskAttemptCompletionEvent> mapCompletionEvents = null; private JobACLsManager aclsMgr; @@ -176,11 +178,28 @@ public class CompletedJob implements org if (completionEvents == null) { constructTaskAttemptCompletionEvents(); } + return getAttemptCompletionEvents(completionEvents, + fromEventId, maxEvents); + } + + @Override + public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + if (mapCompletionEvents == null) { + constructTaskAttemptCompletionEvents(); + } + return getAttemptCompletionEvents(mapCompletionEvents, + startIndex, maxEvents); + } + + private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents( + List<TaskAttemptCompletionEvent> eventList, + int startIndex, int maxEvents) { TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0]; - if (completionEvents.size() > fromEventId) { + if (eventList.size() > startIndex) { int actualMax = Math.min(maxEvents, - (completionEvents.size() - fromEventId)); - events = completionEvents.subList(fromEventId, actualMax + fromEventId) + (eventList.size() - startIndex)); + events = eventList.subList(startIndex, actualMax + startIndex) .toArray(events); } return events; @@ -190,11 +209,15 @@ public class CompletedJob implements org loadAllTasks(); completionEvents = new LinkedList<TaskAttemptCompletionEvent>(); List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>(); + int numMapAttempts = 0; for (TaskId taskId : tasks.keySet()) { Task task = tasks.get(taskId); for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) { TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId); allTaskAttempts.add(taskAttempt); + if (task.getType() == TaskType.MAP) { + ++numMapAttempts; + } } } Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() { @@ -223,6 +246,8 @@ public class CompletedJob implements org } }); + mapCompletionEvents = + new ArrayList<TaskAttemptCompletionEvent>(numMapAttempts); int eventId = 0; for (TaskAttempt taskAttempt : allTaskAttempts) { @@ -253,6 +278,9 @@ public class CompletedJob implements org .getAssignedContainerMgrAddress()); tace.setStatus(taceStatus); completionEvents.add(tace); + if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP) { + mapCompletionEvents.add(tace); + } } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Fri Oct 19 20:21:32 2012 @@ -154,6 +154,12 @@ public class PartialJob implements org.a } @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return null; + } + + @Override public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) { return true; } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1400267&r1=1400266&r2=1400267&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Fri Oct 19 20:21:32 2012 @@ -126,6 +126,12 @@ public class MockHistoryJobs extends Moc } @Override + public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return job.getMapAttemptCompletionEvents(startIndex, maxEvents); + } + + @Override public Map<TaskId, Task> getTasks() { return job.getTasks(); }