Author: sseth Date: Fri Sep 21 18:10:19 2012 New Revision: 1388595 URL: http://svn.apache.org/viewvc?rev=1388595&view=rev Log: MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1388595&r1=1388594&r2=1388595&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 21 18:10:19 2012 @@ -22,3 +22,5 @@ Branch MR-3902 MAPREDUCE-4618. Re-wire LocalContainerAllocator/UberAM (sseth) MAPREDUCE-4665. Use the configured shuffle port and application ACLs (sseth) + + MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1388595&r1=1388594&r2=1388595&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java Fri Sep 21 18:10:19 2012 @@ -70,12 +70,10 @@ import org.apache.hadoop.yarn.service.Co public class TaskAttemptListenerImpl2 extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { - // TODO XXX: Ideally containerId registration and unregistration should be taken care of by the Container. - // .... TaskAttemptId registration and unregistration by the TaskAttempt. Can this be split into a - // ContainerListener + TaskAttemptListener ? - - // TODO XXX. Re-look at big chunks. Possibly redo bits. - // ..launchedJvm map etc. + // TODO: Eventually, split this into a ContainerListener (getTask) and a + // TaskAttemptListener (attempt specific requests). After that, AMContainer + // registers containers, AMTask registers attempts. + // ..Sending back errors for unknown tasks. private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true); @@ -90,17 +88,13 @@ public class TaskAttemptListenerImpl2 ex private InetSocketAddress address; private Server server; - // TODO XXX: Use this to figure out whether an incoming ping is valid. + // TODO Use this to figure out whether an incoming ping is valid. private ConcurrentMap<TaskAttemptID, WrappedJvmID> attemptToJvmIdMap = new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>(); // jvmIdToContainerIdMap also serving to check whether the container is still running. private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap = - new ConcurrentHashMap<WrappedJvmID, ContainerId>(); -// private Set<WrappedJvmID> launchedJVMs = Collections -// .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); - - - + new ConcurrentHashMap<WrappedJvmID, ContainerId>(); + public TaskAttemptListenerImpl2(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, JobTokenSecretManager jobTokenSecretManager) { super(TaskAttemptListenerImpl2.class.getName()); @@ -112,7 +106,6 @@ public class TaskAttemptListenerImpl2 ex @Override public void start() { - LOG.info("XXX: Starting TAL2"); startRpcServer(); super.start(); } @@ -160,10 +153,26 @@ public class TaskAttemptListenerImpl2 ex return address; } + private void pingContainerHeartbeatHandler(WrappedJvmID jvmId) { + ContainerId containerId = jvmIDToContainerIdMap.get(jvmId); + if (containerId != null) { + containerHeartbeatHandler.pinged(containerId); + } else { + LOG.warn("Handling communication from JvmId: " + jvmId + + ", ContainerId not known for this jvm"); + } + } + private void pingContainerHeartbeatHandler(TaskAttemptID attemptID) { - containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(attemptToJvmIdMap.get(attemptID))); + WrappedJvmID jvmId = attemptToJvmIdMap.get(attemptID); + if (jvmId != null) { + pingContainerHeartbeatHandler(jvmId); + } else { + LOG.warn("Handling communication from attempt: " + attemptID + + ", JvmID not know for this attempt"); + } } - + /** * Child checking whether it can commit. * @@ -418,10 +427,11 @@ public class TaskAttemptListenerImpl2 ex public JvmTask getTask(JvmContext jvmContext) throws IOException { // A rough imitation of code from TaskTracker. - - // TODO XXX: Does ContainerHeartbeatHandler need to be pinged on getTask() ? JVMId jvmId = jvmContext.jvmId; - LOG.info("ZZZ: JVM with ID : " + jvmId + " asked for a task"); + if (LOG.isDebugEnabled()) { + LOG.debug("JVM with ID : " + jvmId + " asked for a task"); + } + JvmTask jvmTask = null; // TODO: Is it an authorized container to get a task? Otherwise return null. @@ -431,6 +441,7 @@ public class TaskAttemptListenerImpl2 ex WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, jvmId.getId()); + pingContainerHeartbeatHandler(wJvmID); ContainerId containerId = jvmIDToContainerIdMap.get(wJvmID); if (containerId == null) { @@ -453,46 +464,27 @@ public class TaskAttemptListenerImpl2 ex } } return jvmTask; - -// -// // Try to look up the task. We remove it directly as we don't give -// // multiple tasks to a JVM -// if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) { -// LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); -// jvmTask = TASK_FOR_INVALID_JVM; -// } else { -// if (!launchedJVMs.contains(wJvmID)) { -// jvmTask = null; -// LOG.info("JVM with ID: " + jvmId -// + " asking for task before AM launch registered. Given null task"); -// } else { -// // remove the task as it is no more needed and free up the memory. -// // Also we have already told the JVM to process a task, so it is no -// // longer pending, and further request should ask it to exit. -// org.apache.hadoop.mapred.Task task = -// jvmIDToActiveAttemptMap.remove(wJvmID); -// launchedJVMs.remove(wJvmID); -// LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); -// jvmTask = new JvmTask(task, false); -// } -// } -// return jvmTask; } @Override public void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId) { - LOG.info("XXX: JvmRegistration: " + jvmID + ", ContaienrId: " + containerId); + if (LOG.isDebugEnabled()) { + LOG.debug("JvmRegistartion: " + jvmID + ", ContainerId: " + containerId); + } jvmIDToContainerIdMap.putIfAbsent(jvmID, containerId); } - + @Override public void unregisterRunningJvm(WrappedJvmID jvmID) { - LOG.info("TOREMOVE: Unregistering jvmId: " + jvmID); + if (LOG.isDebugEnabled()) { + LOG.debug("JVM Unregister: " + jvmID); + } if (jvmIDToContainerIdMap.remove(jvmID) == null) { LOG.warn("Attempt to unregister unknwon jvmtoContainerMap: " + jvmID); } } - + + @Override public void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId) { attemptToJvmIdMap.put(TypeConverter.fromYarn(attemptId), jvmId); } @@ -505,58 +497,15 @@ public class TaskAttemptListenerImpl2 ex } public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) { - // TODO XXX: pullTaskAttempt as part of the interface. AMContainerImpl container = (AMContainerImpl) context.getAllContainers() .get(containerId); return container.pullTaskAttempt(); } -// @Override -// public void registerPendingTask( -// org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { -// // Create the mapping so that it is easy to look up -// // when the jvm comes back to ask for Task. -// -// // A JVM not present in this map is an illegal task/JVM. -// jvmIDToActiveAttemptMap.put(jvmID, task); -// } -// -// @Override -// public void registerLaunchedTask( -// org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, -// WrappedJvmID jvmId) { -// // The AM considers the task to be launched (Has asked the NM to launch it) -// // The JVM will only be given a task after this registartion. -// launchedJVMs.add(jvmId); -// -// taskHeartbeatHandler.register(attemptID); -// } -// -// @Override -// public void unregister( -// org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, -// WrappedJvmID jvmID) { -// -// // Unregistration also comes from the same TaskAttempt which does the -// // registration. Events are ordered at TaskAttempt, so unregistration will -// // always come after registration. -// -// // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid -// // synchronization issue with getTask(). getTask should be checking -// // jvmIDToActiveAttemptMap before it checks launchedJVMs. -// -// // remove the mappings if not already removed -// launchedJVMs.remove(jvmID); -// jvmIDToActiveAttemptMap.remove(jvmID); -// -// //unregister this attempt -// taskHeartbeatHandler.unregister(attemptID); -// } - @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } -} +} \ No newline at end of file Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1388595&r1=1388594&r2=1388595&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Sep 21 18:10:19 2012 @@ -24,11 +24,15 @@ import static org.junit.Assert.assertTru import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.any; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.app2.AppContext; import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler; @@ -48,8 +52,8 @@ public class TestTaskAttemptListenerImpl public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, - TaskHeartbeatHandler hbHandler) { - super(context, hbHandler, null, jobTokenSecretManager); + ContainerHeartbeatHandler chh, TaskHeartbeatHandler thh) { + super(context, thh, chh, jobTokenSecretManager); } @Override @@ -91,10 +95,11 @@ public class TestTaskAttemptListenerImpl when(appCtx.getEventHandler()).thenReturn(mockHandler); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class); + ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class); MockTaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler); + new MockTaskAttemptListenerImpl(appCtx, secret, chh, thh); Configuration conf = new Configuration(); listener.init(conf); listener.start(); @@ -116,11 +121,13 @@ public class TestTaskAttemptListenerImpl result = listener.getTask(context1); assertNotNull(result); assertTrue(result.shouldDie); + verify(chh, never()).pinged(any(ContainerId.class)); // Verify ask after JVM registration, but before container is assigned a task. listener.registerRunningJvm(wid1, containerId1); result = listener.getTask(context1); assertNull(result); + verify(chh, times(1)).pinged(any(ContainerId.class)); // Verify ask after JVM registration, and when the container has a task. listener.registerRunningJvm(wid2, containerId2); @@ -128,6 +135,7 @@ public class TestTaskAttemptListenerImpl assertNotNull(result); assertFalse(result.shouldDie); assertTrue(result.getTask() == task); + verify(chh, times(2)).pinged(any(ContainerId.class)); ArgumentCaptor<Event> ac = ArgumentCaptor.forClass(Event.class); verify(mockHandler).handle(ac.capture()); Event cEvent = ac.getValue();