Author: tgraves
Date: Fri Aug 31 20:45:34 2012
New Revision: 1379602
URL: http://svn.apache.org/viewvc?rev=1379602&view=rev
Log:
merge -r 1379598:1379599 from trunk. FIXES: MAPREDUCE-4611
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1379602&r1=1379601&r2=1379602&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri
Aug 31 20:45:34 2012
@@ -734,6 +734,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4614. Simplify debugging a job's tokens (daryn via bobby)
+ MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert
+ Evans via tgraves)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1379602&r1=1379601&r2=1379602&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
Fri Aug 31 20:45:34 2012
@@ -99,8 +99,8 @@ public class JobHistoryEventHandler exte
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
- // Has a signal (SIGTERM etc) been issued?
- protected volatile boolean isSignalled = false;
+ // should job completion be force when the AM shuts down?
+ protected volatile boolean forceJobCompletion = false;
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
@@ -322,7 +322,7 @@ public class JobHistoryEventHandler exte
// Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
// closed their event writers
Iterator<JobId> jobIt = fileMap.keySet().iterator();
- if(isSignalled) {
+ if(forceJobCompletion) {
while (jobIt.hasNext()) {
JobId toClose = jobIt.next();
MetaInfo mi = fileMap.get(toClose);
@@ -911,9 +911,9 @@ public class JobHistoryEventHandler exte
return tmpFileName.substring(0, tmpFileName.length()-4);
}
- public void setSignalled(boolean isSignalled) {
- this.isSignalled = isSignalled;
- LOG.info("JobHistoryEventHandler notified that isSignalled was "
- + isSignalled);
+ public void setForcejobCompletion(boolean forceJobCompletion) {
+ this.forceJobCompletion = forceJobCompletion;
+ LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ + forceJobCompletion);
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1379602&r1=1379601&r2=1379602&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
Fri Aug 31 20:45:34 2012
@@ -170,6 +170,8 @@ public class MRAppMaster extends Composi
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
+ private volatile boolean isLastAMRetry = false;
+
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) {
@@ -195,11 +197,21 @@ public class MRAppMaster extends Composi
@Override
public void init(final Configuration conf) {
-
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
-
+
+ //TODO this is a hack, we really need the RM to inform us when we
+ // are the last one. This would allow us to configure retries on
+ // a per application basis.
+ int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
+ isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
+ LOG.info("AM Retries: " + numAMRetries +
+ " attempt num: " + appAttemptID.getAttemptId() +
+ " is last retry: " + isLastAMRetry);
+
+
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
@@ -417,6 +429,8 @@ public class MRAppMaster extends Composi
}
try {
+ //We are finishing cleanly so this is the last retry
+ isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
@@ -666,7 +680,11 @@ public class MRAppMaster extends Composi
}
public void setSignalled(boolean isSignalled) {
- ((RMCommunicator) containerAllocator).setSignalled(true);
+ ((RMCommunicator) containerAllocator).setSignalled(isSignalled);
+ }
+
+ public void setShouldUnregister(boolean shouldUnregister) {
+ ((RMCommunicator)
containerAllocator).setShouldUnregister(shouldUnregister);
}
}
@@ -717,7 +735,12 @@ public class MRAppMaster extends Composi
@Override
public synchronized void stop() {
try {
- cleanupStagingDir();
+ if(isLastAMRetry) {
+ cleanupStagingDir();
+ } else {
+ LOG.info("Skipping cleaning up the staging dir. "
+ + "assuming AM will be retried.");
+ }
} catch (IOException io) {
LOG.error("Failed to cleanup staging dir: ", io);
}
@@ -1016,14 +1039,19 @@ public class MRAppMaster extends Composi
public void run() {
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler.");
+
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so
// that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setSignalled(true);
+ ((ContainerAllocatorRouter) appMaster.containerAllocator)
+ .setShouldUnregister(appMaster.isLastAMRetry);
}
+
if(appMaster.jobHistoryEventHandler != null) {
- appMaster.jobHistoryEventHandler.setSignalled(true);
+ appMaster.jobHistoryEventHandler
+ .setForcejobCompletion(appMaster.isLastAMRetry);
}
appMaster.stop();
}
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1379602&r1=1379601&r2=1379602&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
Fri Aug 31 20:45:34 2012
@@ -84,6 +84,7 @@ public abstract class RMCommunicator ext
private Job job;
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
+ private volatile boolean shouldUnregister = true;
public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator");
@@ -213,7 +214,9 @@ public abstract class RMCommunicator ext
} catch (InterruptedException ie) {
LOG.warn("InterruptedException while stopping", ie);
}
- unregister();
+ if(shouldUnregister) {
+ unregister();
+ }
super.stop();
}
@@ -288,8 +291,15 @@ public abstract class RMCommunicator ext
protected abstract void heartbeat() throws Exception;
+ public void setShouldUnregister(boolean shouldUnregister) {
+ this.shouldUnregister = shouldUnregister;
+ LOG.info("RMCommunicator notified that shouldUnregistered is: "
+ + shouldUnregister);
+ }
+
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
- LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+ LOG.info("RMCommunicator notified that iSignalled is: "
+ + isSignalled);
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1379602&r1=1379601&r2=1379602&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
Fri Aug 31 20:45:34 2012
@@ -330,7 +330,7 @@ public class TestJobHistoryEventHandler
Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
jheh.addToFileMap(jobId);
- jheh.setSignalled(true);
+ jheh.setForcejobCompletion(true);
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1379602&r1=1379601&r2=1379602&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
Fri Aug 31 20:45:34 2012
@@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBo
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import java.io.IOException;
@@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -89,28 +91,94 @@ import org.junit.Test;
handler.handle(new JobFinishEvent(jobid));
verify(fs).delete(stagingJobPath, true);
}
+
+ @Test
+ public void testDeletionofStagingOnKill() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(0);
+ ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ JobId jobid = recordFactory.newRecordInstance(JobId.class);
+ jobid.setAppId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+ appMaster.init(conf);
+ //simulate the process being killed
+ MRAppMaster.MRAppMasterShutdownHook hook =
+ new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+ hook.run();
+ verify(fs, times(0)).delete(stagingJobPath, true);
+ }
+
+ @Test
+ public void testDeletionofStagingOnKillLastTry() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(1);
+ ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ JobId jobid = recordFactory.newRecordInstance(JobId.class);
+ jobid.setAppId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+ appMaster.init(conf);
+ //simulate the process being killed
+ MRAppMaster.MRAppMasterShutdownHook hook =
+ new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+ hook.run();
+ verify(fs).delete(stagingJobPath, true);
+ }
private class TestMRApp extends MRAppMaster {
+ ContainerAllocator allocator;
- public TestMRApp(ApplicationAttemptId applicationAttemptId) {
- super(applicationAttemptId, BuilderUtils.newContainerId(
- applicationAttemptId, 1), "testhost", 2222, 3333, System
- .currentTimeMillis());
- }
-
- @Override
- protected FileSystem getFileSystem(Configuration conf) {
- return fs;
- }
-
- @Override
- protected void sysexit() {
- }
-
- @Override
- public Configuration getConfig() {
- return conf;
- }
+ public TestMRApp(ApplicationAttemptId applicationAttemptId,
+ ContainerAllocator allocator) {
+ super(applicationAttemptId, BuilderUtils.newContainerId(
+ applicationAttemptId, 1), "testhost", 2222, 3333, System
+ .currentTimeMillis());
+ this.allocator = allocator;
+ }
+
+ public TestMRApp(ApplicationAttemptId applicationAttemptId) {
+ this(applicationAttemptId, null);
+ }
+
+ @Override
+ protected FileSystem getFileSystem(Configuration conf) {
+ return fs;
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ final ClientService clientService, final AppContext context) {
+ if(allocator == null) {
+ return super.createContainerAllocator(clientService, context);
+ }
+ return allocator;
+ }
+
+ @Override
+ protected void sysexit() {
+ }
+
+ @Override
+ public Configuration getConfig() {
+ return conf;
+ }
}
private final class MRAppTestCleanup extends MRApp {