Author: acmurthy
Date: Sat Apr  6 03:08:47 2013
New Revision: 1465171

URL: http://svn.apache.org/r1465171
Log:
MAPREDUCE-5131. Fix handling of job monitoring APIs during JobTracker restart. 
Contributed by Arun C. Murthy.

Added:
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Apr  6 03:08:47 2013
@@ -579,6 +579,9 @@ Release 1.2.0 - unreleased
     submitted the job rather than JobTracker user. (tomwhite, acmurthy via
     acmurthy) 
 
+    MAPREDUCE-5131. Fix handling of job monitoring APIs during JobTracker
+    restart. (acmurthy) 
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java
 Sat Apr  6 03:08:47 2013
@@ -47,7 +47,7 @@ public class RetryUtils {
    * @param defaultRetryPolicyEnabled default retryPolicyEnabledKey conf value 
    * @param retryPolicySpecKey        conf property key for retry policy spec
    * @param defaultRetryPolicySpec    default retryPolicySpecKey conf value
-   * @param remoteExceptionToRetry    The particular RemoteException to retry
+   * @param remoteExceptionsToRetry   The particular RemoteExceptions to retry
    * @return the default retry policy.
    */
   public static RetryPolicy getDefaultRetryPolicy(
@@ -56,7 +56,7 @@ public class RetryUtils {
       boolean defaultRetryPolicyEnabled,
       String retryPolicySpecKey,
       String defaultRetryPolicySpec,
-      final Class<? extends Exception> remoteExceptionToRetry
+      final Class<? extends Exception> ... remoteExceptionsToRetry
       ) {
     
     final RetryPolicy multipleLinearRandomRetry = 
@@ -81,8 +81,14 @@ public class RetryUtils {
           final RetryPolicy p;
           if (e instanceof RemoteException) {
             final RemoteException re = (RemoteException)e;
-            p = remoteExceptionToRetry.getName().equals(re.getClassName())?
-                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
+            RetryPolicy found = null;
+            for(Class<? extends Exception> reToRetry : 
remoteExceptionsToRetry) {
+              if (reToRetry.getName().equals(re.getClassName())) {
+                found = multipleLinearRandomRetry;
+                break;
+              }
+            }
+            p = found != null? found: RetryPolicies.TRY_ONCE_THEN_FAIL;        
  
           } else if (e instanceof IOException) {
             p = multipleLinearRandomRetry;
           } else { //non-IOException

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 
(original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 
Sat Apr  6 03:08:47 2013
@@ -147,6 +147,7 @@ public class DFSClient implements FSCons
   private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
       Configuration conf) throws IOException {
     //default policy
+    @SuppressWarnings("unchecked")
     final RetryPolicy defaultPolicy = 
         RetryUtils.getDefaultRetryPolicy(
             conf, 

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 Sat Apr  6 03:08:47 2013
@@ -154,6 +154,7 @@ public class WebHdfsFileSystem extends F
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public synchronized void initialize(URI uri, Configuration conf
       ) throws IOException {
     super.initialize(uri, conf);

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
 Sat Apr  6 03:08:47 2013
@@ -525,6 +525,12 @@ public class JobClient extends Configure
       JobSubmissionProtocol rpcJobSubmitClient,
       Configuration conf) throws IOException {
 
+    /*
+     * Default is to retry on JobTrackerNotYetInitializedException
+     * i.e. wait for JobTracker to get to RUNNING state and for
+     * SafeModeException
+     */
+    @SuppressWarnings("unchecked")
     RetryPolicy defaultPolicy = 
         RetryUtils.getDefaultRetryPolicy(
             conf,
@@ -532,13 +538,14 @@ public class JobClient extends Configure
             MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
             MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
             MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            JobTrackerNotYetInitializedException.class,
             SafeModeException.class
             ); 
-    
-    /* 
+
+    /*
      * Method specific retry policies for killJob and killTask...
-     * 
-     * No retries on any exception including 
+     *
+     * No retries on any exception including
      * ConnectionException and SafeModeException
      */
     Map<String,RetryPolicy> methodNameToPolicyMap = 

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 Sat Apr  6 03:08:47 2013
@@ -198,8 +198,11 @@ public class JobTracker implements MRCon
   private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
   private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
   
+  final static String JT_INIT_CONFIG_KEY_FOR_TESTS = 
+      "mapreduce.jobtracker.init.for.tests";
+  
   public static enum State { INITIALIZING, RUNNING }
-  State state = State.INITIALIZING;
+  volatile State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 1000;
   static final String JOB_INFO_FILE = "job-info";
   static final String JOB_TOKEN_FILE = "jobToken";
@@ -2029,6 +2032,8 @@ public class JobTracker implements MRCon
             + " could not be started", t);
       }
     }
+    
+    this.initDone.set(conf.getBoolean(JT_INIT_CONFIG_KEY_FOR_TESTS, true));
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2169,6 +2174,8 @@ public class JobTracker implements MRCon
       completedJobsStoreThread.start();
     }
 
+    // Just for unit-tests 
+    waitForInit();
     synchronized (this) {
       state = State.RUNNING;
     }
@@ -2179,6 +2186,29 @@ public class JobTracker implements MRCon
     LOG.info("Stopped interTrackerServer");
   }
 
+  AtomicBoolean initDone = new AtomicBoolean(true);
+  Object initDoneLock = new Object();
+  
+  private void waitForInit() {
+    synchronized (initDoneLock) {
+      while (!initDone.get()) {
+        try {
+          LOG.debug("About to wait since initDone = false");
+          initDoneLock.wait();
+        } catch (InterruptedException ie) {
+          LOG.debug("Ignoring ", ie);
+        }
+      }
+    }
+  }
+  
+  void setInitDone(boolean done) {
+    synchronized (initDoneLock) {
+      initDone.set(done);
+      initDoneLock.notify();
+    }
+  }
+  
   void close() throws IOException {
     if (plugins != null) {
       for (ServicePlugin p : plugins) {
@@ -3498,6 +3528,9 @@ public class JobTracker implements MRCon
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
   
@@ -3511,6 +3544,9 @@ public class JobTracker implements MRCon
    */
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     return submitJob(jobId, jobSubmitDir, null, ts, false);
   }
 
@@ -3737,6 +3773,9 @@ public class JobTracker implements MRCon
       return;
     }
     
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     // No 'killJob' in safe-mode
     checkSafeMode();
     
@@ -3883,6 +3922,9 @@ public class JobTracker implements MRCon
   public synchronized void setJobPriority(JobID jobid, 
                                           String priority)
                                           throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (null == job) {
         LOG.info("setJobPriority(): JobId " + jobid.toString()
@@ -3910,7 +3952,10 @@ public class JobTracker implements MRCon
     return job.inited(); 
   }
   
-  public JobProfile getJobProfile(JobID jobid) {
+  public JobProfile getJobProfile(JobID jobid) throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
@@ -3927,7 +3972,10 @@ public class JobTracker implements MRCon
     return completedJobStatusStore.readJobProfile(jobid);
   }
   
-  public JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
@@ -3950,6 +3998,9 @@ public class JobTracker implements MRCon
   
   private static final Counters EMPTY_COUNTERS = new Counters();
   public Counters getJobCounters(JobID jobid) throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
@@ -3984,6 +4035,9 @@ public class JobTracker implements MRCon
   
   public synchronized TaskReport[] getMapTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4012,6 +4066,9 @@ public class JobTracker implements MRCon
 
   public synchronized TaskReport[] getReduceTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4038,6 +4095,9 @@ public class JobTracker implements MRCon
 
   public synchronized TaskReport[] getCleanupTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4067,6 +4127,9 @@ public class JobTracker implements MRCon
   
   public synchronized TaskReport[] getSetupTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4110,6 +4173,9 @@ public class JobTracker implements MRCon
    */
   public TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = this.jobs.get(jobid);
       
     if (null != job) {
@@ -4131,6 +4197,9 @@ public class JobTracker implements MRCon
    */
   public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)  
     throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     List<String> taskDiagnosticInfo = null;
     JobID jobId = taskId.getJobID();
     TaskID tipId = taskId.getTaskID();
@@ -4195,6 +4264,9 @@ public class JobTracker implements MRCon
    */
   public synchronized boolean killTask(TaskAttemptID taskid, boolean 
shouldFail)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     // No 'killTask' in safe-mode
     checkSafeMode();
 
@@ -5093,13 +5165,23 @@ public class JobTracker implements MRCon
   
   private void checkSafeMode() throws SafeModeException {
     if (isInSafeMode()) {
-      try {
-        throw new SafeModeException((
-            isInAdminSafeMode()) ? adminSafeModeUser : null);
-      } catch (SafeModeException sfe) {
-        LOG.info("JobTracker in safe-mode, aborting operation", sfe);
-        throw sfe;
-      }
+      SafeModeException sme = 
+          new SafeModeException(
+              (isInAdminSafeMode()) ? adminSafeModeUser : null);
+      LOG.info("JobTracker in safe-mode, aborting operation: ", sme); 
+      throw sme;
     }
   }
+  
+  private void checkJobTrackerState() 
+      throws JobTrackerNotYetInitializedException {
+    if (state != State.RUNNING) {
+      JobTrackerNotYetInitializedException jtnyie =
+          new JobTrackerNotYetInitializedException();
+      LOG.info("JobTracker not yet in RUNNING state, aborting operation: ", 
+          jtnyie); 
+      throw jtnyie;
+    }
+  }
+
 }

Added: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java?rev=1465171&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java
 Sat Apr  6 03:08:47 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the JobTracker is still initializing and
+ * not yet operational.
+ */
+public class JobTrackerNotYetInitializedException extends IOException {
+
+  private static final long serialVersionUID = 1984839357L;
+
+  public JobTrackerNotYetInitializedException() {
+    super("JobTracker is not yet RUNNING");
+  }
+
+}

Modified: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1465171&r1=1465170&r2=1465171&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
 Sat Apr  6 03:08:47 2013
@@ -631,6 +631,7 @@ public class TestRecoveryManager {
 
     conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
     conf.set("mapred.system.dir", "/mapred");
+    
     String mapredSysDir =  conf.get("mapred.system.dir");
     mkdirWithPerms(fs, mapredSysDir, (short)0700);
     fs.setOwner(new Path(mapredSysDir),
@@ -679,12 +680,36 @@ public class TestRecoveryManager {
     LOG.info("Stopping jobtracker");
     mr.stopJobTracker();
 
+    // Blocking JT INIT on restart
+    mr.getJobTrackerConf().setBoolean(
+        JobTracker.JT_INIT_CONFIG_KEY_FOR_TESTS, false);
+    
+
     // start the jobtracker
     LOG.info("Starting jobtracker");
-    mr.startJobTracker();
-    UtilsForTests.waitForJobTracker(jc);
+    mr.startJobTracker(false);
 
+    while (!mr.getJobTrackerRunner().isUp()) {
+      Thread.sleep(100);
+    }
     jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    Assert.assertNotNull(jobtracker);
+    
+    // now check for job status ... 
+    // should throw JobTrackerNotYetInitializedException
+    boolean gotJTNYIException = false;
+    try {
+      jobtracker.getJobStatus(rJob1.getID());
+    } catch (JobTrackerNotYetInitializedException jtnyie) {
+      LOG.info("Caught JobTrackerNotYetInitializedException", jtnyie);
+      gotJTNYIException = true;
+    }
+    Assert.assertTrue(gotJTNYIException);
+    
+    jobtracker.setInitDone(true);
+
+    UtilsForTests.waitForJobTracker(jc);
+
     // assert that job is recovered by the jobtracker
     Assert.assertEquals("Resubmission failed ", 1, 
jobtracker.getAllJobs().length);
     JobInProgress jip = jobtracker.getJob(rJob1.getID());


Reply via email to