Author: acmurthy Date: Tue Sep 25 17:46:34 2012 New Revision: 1390016 URL: http://svn.apache.org/viewvc?rev=1390016&view=rev Log: Merge -c 1377714 from branch-1 to branch-1.1 to fix MAPREDUCE-4328. Add a JobTracker safemode to allow it to be resilient to NameNode failures.
Added: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java - copied unchanged from r1377714, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java - copied unchanged from r1377714, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java Modified: hadoop/common/branches/branch-1.1/CHANGES.txt hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 17:46:34 2012 @@ -54,6 +54,13 @@ Release 1.1.0 - 2012.09.16 configured timeout and are selected as the last location to read from. (Jing Zhao via szetszwo) + MAPREDUCE-4328. Add a JobTracker safemode to allow it to be resilient to + NameNode failures. The safemode can be entered either automatically via + the configurable background thread to monitor the NameNode or by the + admin. In the safemode the JobTracker doesn't schedule new tasks, marks + all failed tasks as KILLED for future retries and doesn't accept new job + submissions. (acmurthy) + IMPROVEMENTS HADOOP-8656. Backport forced daemon shutdown of HADOOP-8353 into branch-1 Modified: hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Sep 25 17:46:34 2012 @@ -1042,10 +1042,20 @@ class CapacityTaskScheduler extends Task */ updateAllQueues(mapClusterCapacity, reduceClusterCapacity); - // schedule tasks + /* + * Schedule tasks + */ + List<Task> result = new ArrayList<Task>(); - addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots); - addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots); + + // Check for JT safe-mode + if (taskTrackerManager.isInSafeMode()) { + LOG.info("JobTracker is in safe-mode, not scheduling any tasks."); + } else { + addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots); + addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots); + } + return result; } Modified: hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Sep 25 17:46:34 2012 @@ -688,6 +688,12 @@ public class TestCapacityScheduler exten public QueueManager getQueueManager() { return qm; } + + @Override + public boolean isInSafeMode() { + // TODO Auto-generated method stub + return false; + } } // represents a fake queue configuration info Modified: hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Sep 25 17:46:34 2012 @@ -412,7 +412,13 @@ public class FairScheduler extends TaskS // Update time waited for local maps for jobs skipped on last heartbeat updateLocalityWaitTimes(currentTime); - + + // Check for JT safe-mode + if (taskTrackerManager.isInSafeMode()) { + LOG.info("JobTracker is in safe-mode, not scheduling any tasks."); + return null; + } + TaskTrackerStatus tts = tracker.getStatus(); int mapsAssigned = 0; // loop counter for map in the below while loop Modified: hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Sep 25 17:46:34 2012 @@ -513,6 +513,12 @@ public class TestFairScheduler extends T trackerForTip.get(attemptIdStr).getTaskReports().remove(status); return true; } + + @Override + public boolean isInSafeMode() { + // TODO Auto-generated method stub + return false; + } } protected JobConf conf; Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java Tue Sep 25 17:46:34 2012 @@ -50,4 +50,13 @@ public interface AdminOperationsProtocol * Refresh the node list at the {@link JobTracker} */ void refreshNodes() throws IOException; + + /** + * Set safe mode for the JobTracker. + * @param safeModeAction safe mode action + * @return current safemode + * @throws IOException + */ + boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) + throws IOException; } Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java Tue Sep 25 17:46:34 2012 @@ -34,6 +34,9 @@ class AuditLogger { static final String REFRESH_QUEUE = "REFRESH_QUEUE"; static final String REFRESH_NODES = "REFRESH_NODES"; + static final String GET_SAFEMODE = "GET_SAFEMODE"; + static final String SET_SAFEMODE = "SET_SAFEMODE"; + // Some commonly used descriptions static final String UNAUTHORIZED_USER = "Unauthorized user"; } Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Tue Sep 25 17:46:34 2012 @@ -80,6 +80,12 @@ class JobQueueTaskScheduler extends Task @Override public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { + // Check for JT safe-mode + if (taskTrackerManager.isInSafeMode()) { + LOG.info("JobTracker is in safe-mode, not scheduling any tasks."); + return null; + } + TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 25 17:46:34 2012 @@ -18,13 +18,9 @@ package org.apache.hadoop.mapred; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; -import java.io.InputStreamReader; import java.io.Writer; import java.lang.management.ManagementFactory; import java.net.BindException; @@ -53,6 +49,7 @@ import java.util.TreeSet; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,27 +58,33 @@ import org.apache.hadoop.fs.FSDataInputS import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.JobSubmissionProtocol; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.RPC.VersionMismatch; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapred.AuditLogger.Constants; -import org.apache.hadoop.mapred.Counters.CountersExceededException; import org.apache.hadoop.mapred.JobHistory.Keys; -import org.apache.hadoop.mapred.JobHistory.Listener; import org.apache.hadoop.mapred.JobHistory.Values; import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException; import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType; import org.apache.hadoop.mapred.QueueManager.QueueACL; import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; @@ -89,6 +92,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.SecurityUtil; @@ -96,6 +100,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; @@ -104,16 +109,6 @@ import org.apache.hadoop.util.HostsFileR import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; - -import org.apache.hadoop.mapreduce.ClusterMetrics; -import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal; -import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.security.Credentials; import org.mortbay.util.ajax.JSON; /******************************************************* @@ -203,7 +198,7 @@ public class JobTracker implements MRCon public static enum State { INITIALIZING, RUNNING } State state = State.INITIALIZING; - private static final int FS_ACCESS_RETRY_PERIOD = 10000; + private static final int FS_ACCESS_RETRY_PERIOD = 1000; static final String JOB_INFO_FILE = "job-info"; static final String JOB_TOKEN_FILE = "jobToken"; private DNSToSwitchMapping dnsToSwitchMapping; @@ -276,6 +271,16 @@ public class JobTracker implements MRCon return clock; } + static final String JT_HDFS_MONITOR_ENABLE = + "mapreduce.jt.hdfs.monitor.enable"; + static final boolean DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE = false; + + static final String JT_HDFS_MONITOR_THREAD_INTERVAL = + "mapreduce.jt.hdfs.monitor.interval.ms"; + static final int DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS = 5000; + + private Thread hdfsMonitor; + /** * Start the JobTracker with given configuration. * @@ -1872,8 +1877,179 @@ public class JobTracker implements MRCon this(conf, identifier, clock, new QueueManager(new Configuration(conf))); } + private void initJTConf(JobConf conf) { + if (conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false)) { + LOG.warn(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY + + " is enabled, disabling it"); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false); + } + } + + private void initializeFilesystem() throws IOException, InterruptedException { + // Connect to HDFS NameNode + while (!Thread.currentThread().isInterrupted() && fs == null) { + try { + fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() { + public FileSystem run() throws IOException { + return FileSystem.get(conf); + }}); + } catch (IOException ie) { + fs = null; + LOG.info("Problem connecting to HDFS Namenode... re-trying", ie); + Thread.sleep(FS_ACCESS_RETRY_PERIOD); + } + } + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + // Ensure HDFS is healthy + if ("hdfs".equalsIgnoreCase(fs.getUri().getScheme())) { + while (!DistributedFileSystem.isHealthy(fs.getUri())) { + LOG.info("HDFS initialized but not 'healthy' yet, waiting..."); + Thread.sleep(FS_ACCESS_RETRY_PERIOD); + } + } + } + + private void initialize() + throws IOException, InterruptedException { + // initialize history parameters. + final JobTracker jtFinal = this; + + getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws Exception { + JobHistory.init(jtFinal, conf, jtFinal.localMachine, + jtFinal.startTime); + return true; + } + }); + + // start the recovery manager + recoveryManager = new RecoveryManager(); + + while (!Thread.currentThread().isInterrupted()) { + try { + // if we haven't contacted the namenode go ahead and do it + // clean up the system dir, which will only work if hdfs is out of + // safe mode + if(systemDir == null) { + systemDir = new Path(getSystemDir()); + } + try { + FileStatus systemDirStatus = fs.getFileStatus(systemDir); + if (!systemDirStatus.getOwner().equals( + getMROwner().getShortUserName())) { + throw new AccessControlException("The systemdir " + systemDir + + " is not owned by " + getMROwner().getShortUserName()); + } + if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) { + LOG.warn("Incorrect permissions on " + systemDir + + ". Setting it to " + SYSTEM_DIR_PERMISSION); + fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION)); + } + } catch (FileNotFoundException fnf) {} //ignore + // Make sure that the backup data is preserved + FileStatus[] systemDirData = fs.listStatus(this.systemDir); + // Check if the history is enabled .. as we cant have persistence with + // history disabled + if (conf.getBoolean("mapred.jobtracker.restart.recover", false) + && systemDirData != null) { + for (FileStatus status : systemDirData) { + try { + recoveryManager.checkAndAddJob(status); + } catch (Throwable t) { + LOG.warn("Failed to add the job " + status.getPath().getName(), + t); + } + } + + // Check if there are jobs to be recovered + hasRestarted = recoveryManager.shouldRecover(); + if (hasRestarted) { + break; // if there is something to recover else clean the sys dir + } + } + LOG.info("Cleaning up the system directory"); + fs.delete(systemDir, true); + if (FileSystem.mkdirs(fs, systemDir, + new FsPermission(SYSTEM_DIR_PERMISSION))) { + break; + } + LOG.error("Mkdirs failed to create " + systemDir); + } catch (AccessControlException ace) { + LOG.warn("Failed to operate on mapred.system.dir (" + systemDir + + ") because of permissions."); + LOG.warn("Manually delete the mapred.system.dir (" + systemDir + + ") and then start the JobTracker."); + LOG.warn("Bailing out ... ", ace); + throw ace; + } catch (IOException ie) { + LOG.info("problem cleaning system directory: " + systemDir, ie); + } + Thread.sleep(FS_ACCESS_RETRY_PERIOD); + } + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + // Same with 'localDir' except it's always on the local disk. + if (!hasRestarted) { + conf.deleteLocalFiles(SUBDIR); + } + + // Initialize history DONE folder + FileSystem historyFS = getMROwner().doAs( + new PrivilegedExceptionAction<FileSystem>() { + public FileSystem run() throws IOException { + JobHistory.initDone(conf, fs); + final String historyLogDir = + JobHistory.getCompletedJobHistoryLocation().toString(); + infoServer.setAttribute("historyLogDir", historyLogDir); + + infoServer.setAttribute + ("serialNumberDirectoryDigits", + Integer.valueOf(JobHistory.serialNumberDirectoryDigits())); + + infoServer.setAttribute + ("serialNumberTotalDigits", + Integer.valueOf(JobHistory.serialNumberTotalDigits())); + + return new Path(historyLogDir).getFileSystem(conf); + } + }); + infoServer.setAttribute("fileSys", historyFS); + infoServer.setAttribute("jobConf", conf); + infoServer.setAttribute("aclManager", aclsManager); + + if (JobHistoryServer.isEmbedded(conf)) { + LOG.info("History server being initialized in embedded mode"); + jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer); + jobHistoryServer.start(); + LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf)); + } + + //initializes the job status store + completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager); + + // Setup HDFS monitoring + if (this.conf.getBoolean( + JT_HDFS_MONITOR_ENABLE, DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE)) { + hdfsMonitor = new HDFSMonitorThread(this.conf, this, this.fs); + hdfsMonitor.start(); + } + + } + JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) - throws IOException, InterruptedException { + throws IOException, InterruptedException { + + initJTConf(conf); + this.queueManager = qm; this.clock = clock; // Set ports, start RPC servers, setup security policy etc. @@ -1979,7 +2155,12 @@ public class JobTracker implements MRCon // Set service-level authorization security policy if (conf.getBoolean( ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { - ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider()); + PolicyProvider policyProvider = + (PolicyProvider)(ReflectionUtils.newInstance( + conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, + MapReducePolicyProvider.class, PolicyProvider.class), + conf)); + ServiceAuthorizationManager.refresh(conf, policyProvider); } int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10); @@ -2006,16 +2187,6 @@ public class JobTracker implements MRCon infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, tmpInfoPort == 0, conf, aclsManager.getAdminsAcl()); infoServer.setAttribute("job.tracker", this); - // initialize history parameters. - final JobTracker jtFinal = this; - getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() { - @Override - public Boolean run() throws Exception { - JobHistory.init(jtFinal, conf,jtFinal.localMachine, - jtFinal.startTime); - return true; - } - }); infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); infoServer.start(); @@ -2035,125 +2206,12 @@ public class JobTracker implements MRCon infoBindAddress + ":" + this.infoPort); LOG.info("JobTracker webserver: " + this.infoServer.getPort()); - // start the recovery manager - recoveryManager = new RecoveryManager(); - - while (!Thread.currentThread().isInterrupted()) { - try { - // if we haven't contacted the namenode go ahead and do it - if (fs == null) { - fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() { - public FileSystem run() throws IOException { - return FileSystem.get(conf); - }}); - } - // clean up the system dir, which will only work if hdfs is out of - // safe mode - if(systemDir == null) { - systemDir = new Path(getSystemDir()); - } - try { - FileStatus systemDirStatus = fs.getFileStatus(systemDir); - if (!systemDirStatus.getOwner().equals( - getMROwner().getShortUserName())) { - throw new AccessControlException("The systemdir " + systemDir + - " is not owned by " + getMROwner().getShortUserName()); - } - if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) { - LOG.warn("Incorrect permissions on " + systemDir + - ". Setting it to " + SYSTEM_DIR_PERMISSION); - fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION)); - } - } catch (FileNotFoundException fnf) {} //ignore - // Make sure that the backup data is preserved - FileStatus[] systemDirData = fs.listStatus(this.systemDir); - // Check if the history is enabled .. as we cant have persistence with - // history disabled - if (conf.getBoolean("mapred.jobtracker.restart.recover", false) - && systemDirData != null) { - for (FileStatus status : systemDirData) { - try { - recoveryManager.checkAndAddJob(status); - } catch (Throwable t) { - LOG.warn("Failed to add the job " + status.getPath().getName(), - t); - } - } - - // Check if there are jobs to be recovered - hasRestarted = recoveryManager.shouldRecover(); - if (hasRestarted) { - break; // if there is something to recover else clean the sys dir - } - } - LOG.info("Cleaning up the system directory"); - fs.delete(systemDir, true); - if (FileSystem.mkdirs(fs, systemDir, - new FsPermission(SYSTEM_DIR_PERMISSION))) { - break; - } - LOG.error("Mkdirs failed to create " + systemDir); - } catch (AccessControlException ace) { - LOG.warn("Failed to operate on mapred.system.dir (" + systemDir - + ") because of permissions."); - LOG.warn("Manually delete the mapred.system.dir (" + systemDir - + ") and then start the JobTracker."); - LOG.warn("Bailing out ... ", ace); - throw ace; - } catch (IOException ie) { - LOG.info("problem cleaning system directory: " + systemDir, ie); - } - Thread.sleep(FS_ACCESS_RETRY_PERIOD); - } - - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); - } - - // Same with 'localDir' except it's always on the local disk. - if (!hasRestarted) { - jobConf.deleteLocalFiles(SUBDIR); - } - - // Initialize history DONE folder - FileSystem historyFS = getMROwner().doAs( - new PrivilegedExceptionAction<FileSystem>() { - public FileSystem run() throws IOException { - JobHistory.initDone(conf, fs); - final String historyLogDir = - JobHistory.getCompletedJobHistoryLocation().toString(); - infoServer.setAttribute("historyLogDir", historyLogDir); - - infoServer.setAttribute - ("serialNumberDirectoryDigits", - Integer.valueOf(JobHistory.serialNumberDirectoryDigits())); - - infoServer.setAttribute - ("serialNumberTotalDigits", - Integer.valueOf(JobHistory.serialNumberTotalDigits())); - - return new Path(historyLogDir).getFileSystem(conf); - } - }); - infoServer.setAttribute("fileSys", historyFS); - infoServer.setAttribute("jobConf", conf); - infoServer.setAttribute("aclManager", aclsManager); - - if (JobHistoryServer.isEmbedded(conf)) { - LOG.info("History server being initialized in embedded mode"); - jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer); - jobHistoryServer.start(); - LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf)); - } - this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", NetworkTopology.DEFAULT_HOST_LEVEL); - //initializes the job status store - completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager); } private static SimpleDateFormat getDateFormat() { @@ -2244,6 +2302,17 @@ public class JobTracker implements MRCon * Run forever */ public void offerService() throws InterruptedException, IOException { + // start the inter-tracker server + this.interTrackerServer.start(); + + // Initialize the JobTracker FileSystem within safemode + setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER); + initializeFilesystem(); + setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE); + + // Initialize JobTracker + initialize(); + // Prepare for recovery. This is done irrespective of the status of restart // flag. while (true) { @@ -2283,12 +2352,10 @@ public class JobTracker implements MRCon completedJobsStoreThread.start(); } - // start the inter-tracker server once the jt is ready - this.interTrackerServer.start(); - synchronized (this) { state = State.RUNNING; } + LOG.info("Starting RUNNING"); this.interTrackerServer.join(); @@ -3488,6 +3555,12 @@ public class JobTracker implements MRCon // returns cleanup tasks first, then setup tasks. synchronized List<Task> getSetupAndCleanupTasks( TaskTrackerStatus taskTracker) throws IOException { + + // Don't assign *any* new task in safemode + if (isInSafeMode()) { + return null; + } + int maxMapTasks = taskTracker.getMaxMapSlots(); int maxReduceTasks = taskTracker.getMaxReduceSlots(); int numMaps = taskTracker.countOccupiedMapSlots(); @@ -3622,6 +3695,10 @@ public class JobTracker implements MRCon public JobStatus submitJob(JobID jobId, String jobSubmitDir, UserGroupInformation ugi, Credentials ts, boolean recovered) throws IOException { + if (isInSafeMode()) { + throw new IOException("JobTracker in safemode"); + } + JobInfo jobInfo = null; if (ugi == null) { ugi = UserGroupInformation.getCurrentUser(); @@ -4315,6 +4392,11 @@ public class JobTracker implements MRCon * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir() */ public String getSystemDir() { + // Might not be initialized yet, TT handles this + if (isInSafeMode()) { + return null; + } + Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system")); return fs.makeQualified(sysDir).toString(); } @@ -5083,4 +5165,81 @@ public class JobTracker implements MRCon return map; } // End MXbean implementaiton + + /** + * JobTracker SafeMode + */ + // SafeMode actions + public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; } + + private AtomicBoolean safeMode = new AtomicBoolean(false); + private AtomicBoolean adminSafeMode = new AtomicBoolean(false); + private String adminSafeModeUser = ""; + + public boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) + throws IOException { + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + + // Anyone can check JT safe-mode + if (safeModeAction == SafeModeAction.SAFEMODE_GET) { + boolean safeMode = this.safeMode.get(); + LOG.info("Getting safemode information: safemode=" + safeMode + ". " + + "Requested by : " + + UserGroupInformation.getCurrentUser().getShortUserName()); + AuditLogger.logSuccess(user, Constants.GET_SAFEMODE, + Constants.JOBTRACKER); + return safeMode; + } + + // Check access for modifications to safe-mode + if (!aclsManager.isMRAdmin(UserGroupInformation.getCurrentUser())) { + AuditLogger.logFailure(user, Constants.SET_SAFEMODE, + aclsManager.getAdminsAcl().toString(), Constants.JOBTRACKER, + Constants.UNAUTHORIZED_USER); + throw new AccessControlException(user + + " is not authorized to set " + + " JobTracker safemode."); + } + AuditLogger.logSuccess(user, Constants.SET_SAFEMODE, Constants.JOBTRACKER); + + boolean currSafeMode = setSafeModeInternal(safeModeAction); + adminSafeMode.set(currSafeMode); + adminSafeModeUser = user; + return currSafeMode; + } + + boolean isInAdminSafeMode() { + return adminSafeMode.get(); + } + + boolean setSafeModeInternal(JobTracker.SafeModeAction safeModeAction) + throws IOException { + if (safeModeAction != SafeModeAction.SAFEMODE_GET) { + boolean safeMode = false; + if (safeModeAction == SafeModeAction.SAFEMODE_ENTER) { + safeMode = true; + } else if (safeModeAction == SafeModeAction.SAFEMODE_LEAVE) { + safeMode = false; + } + LOG.info("Setting safe mode to " + safeMode + ". Requested by : " + + UserGroupInformation.getCurrentUser().getShortUserName()); + this.safeMode.set(safeMode); + } + return this.safeMode.get(); + } + + public boolean isInSafeMode() { + return safeMode.get(); + } + + String getSafeModeText() { + if (!isInSafeMode()) + return "OFF"; + String safeModeInfo = + adminSafeMode.get() ? + "Set by admin <strong>" + adminSafeModeUser + "</strong>": + "HDFS unavailable"; + return "<em>ON - " + safeModeInfo + "</em>"; + } + } Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Tue Sep 25 17:46:34 2012 @@ -31,6 +31,7 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.SortedRanges.Range; +import org.apache.hadoop.mapred.TaskStatus.State; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.net.Node; @@ -604,10 +605,18 @@ class TaskInProgress { changed = oldState != newState; } + // if task is a cleanup attempt, do not replace the complete status, // update only specific fields. // For example, startTime should not be updated, // but finishTime has to be updated. + + // Don't fail tasks when JobTracker is in safe-mode + if (status.getRunState() == State.FAILED && jobtracker.isInSafeMode()) { + LOG.info("JT is in safe-mode; marking " + taskid + " as KILLED"); + status.setRunState(State.KILLED); + } + if (!isCleanupAttempt(taskid)) { taskStatuses.put(taskid, status); } else { Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 25 17:46:34 2012 @@ -1704,8 +1704,17 @@ public class TaskTracker implements MRCo } String dir = jobClient.getSystemDir(); - if (dir == null) { - throw new IOException("Failed to get system directory"); + while (dir == null) { + LOG.info("Failed to get system directory..."); + + // Re-try + try { + // Sleep interval: 1000 ms - 5000 ms + int sleepInterval = 1000 + r.nextInt(4000); + Thread.sleep(sleepInterval); + } catch (InterruptedException ie) + {} + dir = jobClient.getSystemDir(); } systemDirectory = new Path(dir); systemFS = systemDirectory.getFileSystem(fConf); Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Tue Sep 25 17:46:34 2012 @@ -112,4 +112,10 @@ interface TaskTrackerManager { * @param job JobInProgress object */ public void failJob(JobInProgress job); + + /** + * Get safe mode. + * @return + */ + public boolean isInSafeMode(); } Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java Tue Sep 25 17:46:34 2012 @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.AdminOperationsProtocol; @@ -57,7 +56,9 @@ public class MRAdmin extends Configured "The full syntax is: \n\n" + "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " + "[-refreshNodes] [-refreshUserToGroupsMappings] " + - "[-refreshSuperUserGroupsConfiguration] [-help [cmd]]\n"; + "[-refreshSuperUserGroupsConfiguration] " + + "[-safemode <enter | leave | wait | get> " + + "[-help [cmd]]\n"; String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" + "\t\tJobtracker will reload the authorization policy file.\n"; @@ -74,6 +75,14 @@ public class MRAdmin extends Configured String refreshNodes = "-refreshNodes: Refresh the hosts information at the jobtracker.\n"; + String safemode = "-safemode <enter|leave|get|wait>: Safe mode maintenance command.\n" + + "\t\tSafe mode is a JobTracker state in which it\n" + + "\t\t\t1. does not accept new job submissions\n" + + "\t\t\t2. does not schedule any new tasks\n" + + "\t\t\t3. does not fail any tasks due to any error\n" + + "\t\tSafe mode can be entered manually, but then\n" + + "\t\tit can only be turned off manually as well.\n"; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -87,6 +96,8 @@ public class MRAdmin extends Configured System.out.println(refreshSuperUserGroupsConfiguration); } else if ("refreshNodes".equals(cmd)) { System.out.println(refreshNodes); + } else if ("safemode".equals(cmd)) { + System.out.println(safemode); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -125,7 +136,8 @@ public class MRAdmin extends Configured System.err.println(" [-refreshQueues]"); System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); - System.err.println(" [-refreshNodes]"); + System.err.println(" [-refreshNodes]"); + System.err.println(" [-safemode <enter | leave | get | wait>]"); System.err.println(" [-help [cmd]]"); System.err.println(); ToolRunner.printGenericCommandUsage(System.err); @@ -208,6 +220,58 @@ public class MRAdmin extends Configured return 0; } + private int setSafeMode(String actionString) throws IOException { + JobTracker.SafeModeAction action; + Boolean waitExitSafe = false; + + if ("leave".equalsIgnoreCase(actionString)) { + action = JobTracker.SafeModeAction.SAFEMODE_LEAVE; + } else if ("enter".equalsIgnoreCase(actionString)) { + action = JobTracker.SafeModeAction.SAFEMODE_ENTER; + } else if ("get".equalsIgnoreCase(actionString)) { + action = JobTracker.SafeModeAction.SAFEMODE_GET; + } else if ("wait".equalsIgnoreCase(actionString)) { + action = JobTracker.SafeModeAction.SAFEMODE_GET; + waitExitSafe = true; + } else { + printUsage("-safemode"); + return -1; + } + + // Get the current configuration + Configuration conf = getConf(); + + // Create the client + AdminOperationsProtocol adminOperationsProtocol = + (AdminOperationsProtocol) + RPC.getProxy(AdminOperationsProtocol.class, + AdminOperationsProtocol.versionID, + JobTracker.getAddress(conf), getUGI(conf), conf, + NetUtils.getSocketFactory(conf, + AdminOperationsProtocol.class)); + + + boolean inSafeMode = adminOperationsProtocol.setSafeMode(action); + + // + // If we are waiting for safemode to exit, then poll and + // sleep till we are out of safemode. + // + if (waitExitSafe) { + while (inSafeMode) { + try { + Thread.sleep(3000); + } catch (java.lang.InterruptedException e) { + throw new IOException("Wait Interrupted"); + } + inSafeMode = adminOperationsProtocol.setSafeMode(action); + } + } + + System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF")); + + return 0; + } /** * refreshSuperUserGroupsConfiguration {@link JobTracker}. @@ -297,6 +361,12 @@ public class MRAdmin extends Configured return exitCode; } } + if ("-safemode".equals(cmd)) { + if (args.length != 2) { + printUsage(cmd); + return exitCode; + } + } exitCode = 0; try { @@ -310,6 +380,8 @@ public class MRAdmin extends Configured exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-refreshNodes".equals(cmd)) { exitCode = refreshNodes(); + } else if ("-safemode".equals(cmd)) { + exitCode = setSafeMode(args[i++]); } else if ("-help".equals(cmd)) { if (i < args.length) { printUsage(args[i]); Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Sep 25 17:46:34 2012 @@ -238,6 +238,12 @@ public class TestJobQueueTaskScheduler e status.setRunState(TaskStatus.State.RUNNING); trackers.get(taskTrackerName).getStatus().getTaskReports().add(status); } + + @Override + public boolean isInSafeMode() { + // TODO Auto-generated method stub + return false; + } } Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Tue Sep 25 17:46:34 2012 @@ -183,6 +183,12 @@ public class TestParallelInitialization listener.jobAdded(job); } } + + @Override + public boolean isInSafeMode() { + // TODO Auto-generated method stub + return false; + } } protected JobConf jobConf; Modified: hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp?rev=1390016&r1=1390015&r2=1390016&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp (original) +++ hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp Tue Sep 25 17:46:34 2012 @@ -107,7 +107,7 @@ <b>Compiled:</b> <%= VersionInfo.getDate()%> by <%= VersionInfo.getUser()%><br> <b>Identifier:</b> <%= tracker.getTrackerIdentifier()%><br> - +<b>SafeMode:</b> <%= tracker.getSafeModeText()%><br> <hr> <h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(Runtime.getRuntime().totalMemory()) %>/<%= StringUtils.byteDesc(Runtime.getRuntime().maxMemory()) %>)</h2> <%