Author: szetszwo Date: Fri Apr 5 02:43:29 2013 New Revision: 1464815 URL: http://svn.apache.org/r1464815 Log: Merge r1462698 through r1464807 from trunk.
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project:r1463804 Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1462698-1464807 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Apr 5 02:43:29 2013 @@ -158,6 +158,26 @@ Trunk (Unreleased) MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994. (Sandy Ryza via tomwhite) +Release 2.0.5-alpha - UNRELEASED + + INCOMPATIBLE CHANGES + + NEW FEATURES + + IMPROVEMENTS + + OPTIMIZATIONS + + BUG FIXES + + MAPREDUCE-5113. Streaming input/output types are ignored with java + mapper/reducer. (sandyr via tucu) + + MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu) + + MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot + command from the RM. (Jian He via jlowe) + Release 2.0.4-beta - UNRELEASED INCOMPATIBLE CHANGES @@ -195,6 +215,13 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. (Sandy Ryza via tomwhite) + MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus + fix failures in renewal of HistoryServer's delegations tokens. (Siddharth + Seth via vinodkv) + + MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is + submitting a job (Daryn Sharp via cos) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES @@ -744,6 +771,7 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear the value from the Task commitAttempt member (Robert Parker via jeagles) + MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves) Release 0.23.6 - UNRELEASED Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt:r1463804 Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1462698-1464807 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1462698-1464807 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Apr 5 02:43:29 2013 @@ -549,8 +549,14 @@ public class MRAppMaster extends Composi } try { - //We are finishing cleanly so this is the last retry - isLastAMRetry = true; + //if isLastAMRetry comes as true, should never set it to false + if ( !isLastAMRetry){ + if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) { + LOG.info("We are finishing cleanly so this is the last retry"); + isLastAMRetry = true; + } + } + notifyIsLastAMRetry(isLastAMRetry); // Stop all services // This will also send the final report to the ResourceManager LOG.info("Calling stop for all the services"); @@ -1272,19 +1278,25 @@ public class MRAppMaster extends Composi // that they don't take too long in shutting down if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) { ((ContainerAllocatorRouter) appMaster.containerAllocator) - .setSignalled(true); - ((ContainerAllocatorRouter) appMaster.containerAllocator) - .setShouldUnregister(appMaster.isLastAMRetry); - } - - if(appMaster.jobHistoryEventHandler != null) { - appMaster.jobHistoryEventHandler - .setForcejobCompletion(appMaster.isLastAMRetry); + .setSignalled(true); } + appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry); appMaster.stop(); } } + public void notifyIsLastAMRetry(boolean isLastAMRetry){ + if(containerAllocator instanceof ContainerAllocatorRouter) { + LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry); + ((ContainerAllocatorRouter) containerAllocator) + .setShouldUnregister(isLastAMRetry); + } + if(jobHistoryEventHandler != null) { + LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry); + jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry); + } + } + protected static void initAndStartAppMaster(final MRAppMaster appMaster, final YarnConfiguration conf, String jobUserName) throws IOException, InterruptedException { Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Fri Apr 5 02:43:29 2013 @@ -30,5 +30,6 @@ public enum JobStateInternal { KILL_WAIT, KILL_ABORT, KILLED, - ERROR + ERROR, + REBOOT } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Fri Apr 5 02:43:29 2013 @@ -54,6 +54,6 @@ public enum JobEventType { JOB_TASK_ATTEMPT_FETCH_FAILURE, //Producer:RMContainerAllocator - JOB_UPDATED_NODES - + JOB_UPDATED_NODES, + JOB_AM_REBOOT } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Apr 5 02:43:29 2013 @@ -215,6 +215,8 @@ public class JobImpl implements org.apac DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); + private static final InternalRebootTransition + INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition(); private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new TaskAttemptCompletedEventTransition(); @@ -246,6 +248,9 @@ public class JobImpl implements org.apac .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Ignore-able events .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_UPDATED_NODES) @@ -265,6 +270,9 @@ public class JobImpl implements org.apac .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Ignore-able events .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_UPDATED_NODES) @@ -287,6 +295,9 @@ public class JobImpl implements org.apac .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Ignore-able events .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP, JobEventType.JOB_UPDATED_NODES) @@ -327,6 +338,9 @@ public class JobImpl implements org.apac JobStateInternal.RUNNING, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Transitions from KILL_WAIT state. .addTransition @@ -352,7 +366,8 @@ public class JobImpl implements org.apac EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_MAP_TASK_RESCHEDULED, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) // Transitions from COMMITTING state .addTransition(JobStateInternal.COMMITTING, @@ -377,7 +392,10 @@ public class JobImpl implements org.apac .addTransition(JobStateInternal.COMMITTING, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able events + .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) + // Ignore-able events .addTransition(JobStateInternal.COMMITTING, JobStateInternal.COMMITTING, EnumSet.of(JobEventType.JOB_UPDATED_NODES, @@ -397,7 +415,8 @@ public class JobImpl implements org.apac .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) // Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, @@ -425,7 +444,8 @@ public class JobImpl implements org.apac JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_COMMIT_COMPLETED, - JobEventType.JOB_COMMIT_FAILED)) + JobEventType.JOB_COMMIT_FAILED, + JobEventType.JOB_AM_REBOOT)) // Transitions from KILL_ABORT state .addTransition(JobStateInternal.KILL_ABORT, @@ -452,7 +472,8 @@ public class JobImpl implements org.apac JobEventType.JOB_SETUP_COMPLETED, JobEventType.JOB_SETUP_FAILED, JobEventType.JOB_COMMIT_COMPLETED, - JobEventType.JOB_COMMIT_FAILED)) + JobEventType.JOB_COMMIT_FAILED, + JobEventType.JOB_AM_REBOOT)) // Transitions from FAILED state .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, @@ -476,7 +497,8 @@ public class JobImpl implements org.apac JobEventType.JOB_SETUP_FAILED, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_ABORT_COMPLETED)) + JobEventType.JOB_ABORT_COMPLETED, + JobEventType.JOB_AM_REBOOT)) // Transitions from KILLED state .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, @@ -498,7 +520,8 @@ public class JobImpl implements org.apac JobEventType.JOB_SETUP_FAILED, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_ABORT_COMPLETED)) + JobEventType.JOB_ABORT_COMPLETED, + JobEventType.JOB_AM_REBOOT)) // No transitions from INTERNAL_ERROR state. Ignore all. .addTransition( @@ -517,9 +540,33 @@ public class JobImpl implements org.apac JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, JobEventType.JOB_ABORT_COMPLETED, - JobEventType.INTERNAL_ERROR)) + JobEventType.INTERNAL_ERROR, + JobEventType.JOB_AM_REBOOT)) .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + + // No transitions from AM_REBOOT state. Ignore all. + .addTransition( + JobStateInternal.REBOOT, + JobStateInternal.REBOOT, + EnumSet.of(JobEventType.JOB_INIT, + JobEventType.JOB_KILL, + JobEventType.JOB_TASK_COMPLETED, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_DIAGNOSTIC_UPDATE, + JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_SETUP_COMPLETED, + JobEventType.JOB_SETUP_FAILED, + JobEventType.JOB_COMMIT_COMPLETED, + JobEventType.JOB_COMMIT_FAILED, + JobEventType.JOB_ABORT_COMPLETED, + JobEventType.INTERNAL_ERROR, + JobEventType.JOB_AM_REBOOT)) + .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + // create the topology tables .installTopology(); @@ -904,6 +951,8 @@ public class JobImpl implements org.apac return JobState.RUNNING; case FAIL_ABORT: return JobState.FAILED; + case REBOOT: + return JobState.ERROR; default: return JobState.valueOf(smState.name()); } @@ -972,6 +1021,7 @@ public class JobImpl implements org.apac case KILLED: metrics.killedJob(this); break; + case REBOOT: case ERROR: case FAILED: metrics.failedJob(this); @@ -1898,8 +1948,17 @@ public class JobImpl implements org.apac } } - private static class InternalErrorTransition implements + private static class InternalTerminationTransition implements SingleArcTransition<JobImpl, JobEvent> { + JobStateInternal terminationState = null; + String jobHistoryString = null; + public InternalTerminationTransition(JobStateInternal stateInternal, + String jobHistoryString) { + this.terminationState = stateInternal; + //mostly a hack for jbhistoryserver + this.jobHistoryString = jobHistoryString; + } + @Override public void transition(JobImpl job, JobEvent event) { //TODO Is this JH event required. @@ -1907,9 +1966,21 @@ public class JobImpl implements org.apac JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - JobStateInternal.ERROR.toString()); + jobHistoryString); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); - job.finished(JobStateInternal.ERROR); + job.finished(terminationState); + } + } + + private static class InternalErrorTransition extends InternalTerminationTransition { + public InternalErrorTransition(){ + super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString()); + } + } + + private static class InternalRebootTransition extends InternalTerminationTransition { + public InternalRebootTransition(){ + super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString()); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Apr 5 02:43:29 2013 @@ -123,7 +123,7 @@ public class LocalContainerAllocator ext // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.INTERNAL_ERROR)); + JobEventType.JOB_AM_REBOOT)); throw new YarnException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationID()); } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Apr 5 02:43:29 2013 @@ -574,7 +574,7 @@ public class RMContainerAllocator extend // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.INTERNAL_ERROR)); + JobEventType.JOB_AM_REBOOT)); throw new YarnException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationID()); } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Apr 5 02:43:29 2013 @@ -33,7 +33,9 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -86,9 +89,68 @@ import org.junit.Test; attemptId.setApplicationId(appId); JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); - MRAppMaster appMaster = new TestMRApp(attemptId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry()); + verify(fs).delete(stagingJobPath, true); + } + + @Test (timeout = 30000) + public void testNoDeletionofStagingOnReboot() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(0); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.REBOOT, 4); + appMaster.init(conf); + appMaster.start(); + //shutdown the job, not the lastRetry + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test (timeout = 30000) + public void testDeletionofStagingOnReboot() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(1); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); appMaster.init(conf); + appMaster.start(); + //shutdown the job, is lastRetry appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry()); verify(fs).delete(stagingJobPath, true); } @@ -151,6 +213,8 @@ import org.junit.Test; private class TestMRApp extends MRAppMaster { ContainerAllocator allocator; + boolean testIsLastAMRetry = false; + JobStateInternal jobStateInternal; public TestMRApp(ApplicationAttemptId applicationAttemptId, ContainerAllocator allocator, int maxAppAttempts) { @@ -160,9 +224,11 @@ import org.junit.Test; this.allocator = allocator; } - public TestMRApp(ApplicationAttemptId applicationAttemptId) { - this(applicationAttemptId, null, - MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + public TestMRApp(ApplicationAttemptId applicationAttemptId, + ContainerAllocator allocator, JobStateInternal jobStateInternal, + int maxAppAttempts) { + this(applicationAttemptId, allocator, maxAppAttempts); + this.jobStateInternal = jobStateInternal; } @Override @@ -180,6 +246,31 @@ import org.junit.Test; } @Override + protected Job createJob(Configuration conf, JobStateInternal forcedState, + String diagnostic) { + JobImpl jobImpl = mock(JobImpl.class); + when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + when(jobImpl.getID()).thenReturn(jobId); + ((AppContext) getContext()) + .getAllJobs().put(jobImpl.getID(), jobImpl); + return jobImpl; + } + + @Override + public void start() { + super.start(); + DefaultMetricsSystem.shutdown(); + } + + @Override + public void notifyIsLastAMRetry(boolean isLastAMRetry){ + testIsLastAMRetry = isLastAMRetry; + super.notifyIsLastAMRetry(isLastAMRetry); + } + + @Override public RMHeartbeatHandler getRMHeartbeatHandler() { return getStubbedHeartbeatHandler(getContext()); } @@ -197,6 +288,9 @@ import org.junit.Test; protected void downloadTokensAndSetupUGI(Configuration conf) { } + public boolean getTestIsLastAMRetry(){ + return testIsLastAMRetry; + } } private final class MRAppTestCleanup extends MRApp { @@ -288,7 +382,7 @@ import org.junit.Test; }; } - @Test + @Test(timeout=20000) public void testStagingCleanupOrder() throws Exception { MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, this.getClass().getName(), true); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Apr 5 02:43:29 2013 @@ -193,6 +193,68 @@ public class TestJobImpl { } @Test(timeout=20000) + public void testRebootedDuringSetup() throws Exception{ + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = new StubbedOutputCommitter() { + @Override + public synchronized void setupJob(JobContext jobContext) + throws IOException { + while(!Thread.interrupted()){ + try{ + wait(); + }catch (InterruptedException e) { + } + } + } + }; + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobId jobId = job.getID(); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.SETUP); + + job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); + assertJobState(job, JobStateInternal.REBOOT); + dispatcher.stop(); + commitHandler.stop(); + } + + @Test(timeout=20000) + public void testRebootedDuringCommit() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + completeJobTasks(job); + assertJobState(job, JobStateInternal.COMMITTING); + + syncBarrier.await(); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); + assertJobState(job, JobStateInternal.REBOOT); + dispatcher.stop(); + commitHandler.stop(); + } + + @Test(timeout=20000) public void testKilledDuringSetup() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java Fri Apr 5 02:43:29 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -101,7 +102,8 @@ import org.apache.hadoop.yarn.exceptions import com.google.protobuf.ServiceException; -public class MRClientProtocolPBClientImpl implements MRClientProtocol { +public class MRClientProtocolPBClientImpl implements MRClientProtocol, + Closeable { protected MRClientProtocolPB proxy; @@ -118,6 +120,13 @@ public class MRClientProtocolPBClientImp } @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException { GetJobReportRequestProto requestProto = ((GetJobReportRequestPBImpl)request).getProto(); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Fri Apr 5 02:43:29 2013 @@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner public class JobClient extends CLI { public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; - /* notes that get delegation token was called. Again this is hack for oozie - * to make sure we add history server delegation tokens to the credentials - * for the job. Since the api only allows one delegation token to be returned, - * we have to add this hack. - */ - private boolean getDelegationTokenCalled = false; - /* do we need a HS delegation token for this client */ - static final String HS_DELEGATION_TOKEN_REQUIRED - = "mapreduce.history.server.delegationtoken.required"; static{ ConfigUtil.loadResources(); @@ -569,10 +560,6 @@ public class JobClient extends CLI { try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); - if (getDelegationTokenCalled) { - conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); - getDelegationTokenCalled = false; - } Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { @Override public Job run() throws IOException, ClassNotFoundException, @@ -1173,7 +1160,6 @@ public class JobClient extends CLI { */ public Token<DelegationTokenIdentifier> getDelegationToken(final Text renewer) throws IOException, InterruptedException { - getDelegationTokenCalled = true; return clientUgi.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { public Token<DelegationTokenIdentifier> run() throws IOException, Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1462698-1464807 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Fri Apr 5 02:43:29 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,6 +88,10 @@ public class ResourceMgrDelegate extends return oldMetrics; } + InetSocketAddress getConnectAddress() { + return rmAddress; + } + @SuppressWarnings("rawtypes") public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Fri Apr 5 02:43:29 2013 @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; @@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMTokenSelector; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ProtoUtils; @@ -90,7 +92,7 @@ import com.google.common.annotations.Vis /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */ -@SuppressWarnings({ "rawtypes", "unchecked" }) +@SuppressWarnings("unchecked") public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); @@ -101,14 +103,6 @@ public class YARNRunner implements Clien private Configuration conf; private final FileContext defaultFileContext; - /* usually is false unless the jobclient get delegation token is - * called. This is a hack wherein we do return a token from RM - * on getDelegationtoken but due to the restricted api on jobclient - * we just add a job history DT token when submitting a job. - */ - private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED = - false; - /** * Yarn runner incapsulates the client interface of * yarn @@ -186,6 +180,28 @@ public class YARNRunner implements Clien } @VisibleForTesting + void addHistoyToken(Credentials ts) throws IOException, InterruptedException { + /* check if we have a hsproxy, if not, no need */ + MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); + if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) { + /* + * note that get delegation token was called. Again this is hack for oozie + * to make sure we add history server delegation tokens to the credentials + */ + RMTokenSelector tokenSelector = new RMTokenSelector(); + Text service = SecurityUtil.buildTokenService(resMgrDelegate + .getConnectAddress()); + if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) { + Text hsService = SecurityUtil.buildTokenService(hsProxy + .getConnectAddress()); + if (ts.getToken(hsService) == null) { + ts.addToken(hsService, getDelegationTokenFromHS(hsProxy)); + } + } + } + } + + @VisibleForTesting Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy) throws IOException, InterruptedException { GetDelegationTokenRequest request = recordFactory @@ -263,18 +279,8 @@ public class YARNRunner implements Clien public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { - /* check if we have a hsproxy, if not, no need */ - MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); - if (hsProxy != null) { - // JobClient will set this flag if getDelegationToken is called, if so, get - // the delegation tokens for the HistoryServer also. - if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, - DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { - Token hsDT = getDelegationTokenFromHS(hsProxy); - ts.addToken(hsDT.getService(), hsDT); - } - } - + addHistoyToken(ts); + // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1464815&r1=1464814&r2=1464815&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Fri Apr 5 02:43:29 2013 @@ -20,8 +20,10 @@ package org.apache.hadoop.mapred; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,6 +32,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.List; @@ -39,28 +42,24 @@ import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClientCache; -import org.apache.hadoop.mapred.ClientServiceDelegate; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Master; -import org.apache.hadoop.mapred.ResourceMgrDelegate; -import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Appender; import org.apache.log4j.Layout; import org.apache.log4j.Logger; @@ -146,7 +151,7 @@ public class TestYARNRunner extends Test } - @Test + @Test(timeout=20000) public void testJobKill() throws Exception { clientDelegate = mock(ClientServiceDelegate.class); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new @@ -171,7 +176,7 @@ public class TestYARNRunner extends Test verify(clientDelegate).killJob(jobId); } - @Test + @Test(timeout=20000) public void testJobSubmissionFailure() throws Exception { when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). thenReturn(appId); @@ -193,7 +198,7 @@ public class TestYARNRunner extends Test } } - @Test + @Test(timeout=20000) public void testResourceMgrDelegate() throws Exception { /* we not want a mock of resource mgr delegate */ final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); @@ -259,8 +264,88 @@ public class TestYARNRunner extends Test delegate.getQueueAclsForCurrentUser(); verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); } - - @Test + + @Test(timeout=20000) + public void testGetHSDelegationToken() throws Exception { + try { + Configuration conf = new Configuration(); + + // Setup mock service + InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); + Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); + + InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200); + Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress); + + // Setup mock rm token + RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), new Text("real")); + Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>( + new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice); + token.setKind(RMDelegationTokenIdentifier.KIND_NAME); + + // Setup mock history token + DelegationToken historyToken = BuilderUtils.newDelegationToken( + new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(), + new byte[0], hsTokenSevice.toString()); + GetDelegationTokenResponse getDtResponse = Records + .newRecord(GetDelegationTokenResponse.class); + getDtResponse.setDelegationToken(historyToken); + + // mock services + MRClientProtocol mockHsProxy = mock(MRClientProtocol.class); + doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress(); + doReturn(getDtResponse).when(mockHsProxy).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); + doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); + + ClientCache clientCache = mock(ClientCache.class); + doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); + + Credentials creds = new Credentials(); + + YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache); + + // No HS token if no RM token + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // No HS token if RM token, but secirity disabled. + creds.addToken(new Text("rmdt"), token); + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + creds = new Credentials(); + + // No HS token if no RM token, security enabled + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // HS token if RM token present, security enabled + creds.addToken(new Text("rmdt"), token); + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(1)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // No additional call to get HS token if RM and HS token present + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(1)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + } finally { + // Back to defaults. + UserGroupInformation.setConfiguration(new Configuration()); + } + } + + @Test(timeout=20000) public void testHistoryServerToken() throws Exception { //Set the master principal in the config conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL"); @@ -303,7 +388,7 @@ public class TestYARNRunner extends Test }); } - @Test + @Test(timeout=20000) public void testAMAdminCommandOpts() throws Exception { JobConf jobConf = new JobConf(); @@ -366,7 +451,7 @@ public class TestYARNRunner extends Test assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex); } } - @Test + @Test(timeout=20000) public void testWarnCommandOpts() throws Exception { Logger logger = Logger.getLogger(YARNRunner.class);