Author: acmurthy
Date: Tue Sep 25 18:25:45 2012
New Revision: 1390041

URL: http://svn.apache.org/viewvc?rev=1390041&view=rev
Log:
MAPREDUCE-4603. Add support for JobClient to retry job-submission when 
JobTracker is in SafeMode. Contributed by Arun C. Murthy.

Added:
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1390041&r1=1390040&r2=1390041&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Sep 25 18:25:45 2012
@@ -394,14 +394,15 @@ Release 1.1.0 - unreleased
     MAPREDUCE-782. Use PureJavaCrc32 in mapreduce spills. 
     (Todd Lipcon, backport by Brandon Li via sseth)
 
-    HDFS-3667.  Add retry support to WebHdfsFileSystem.  (szetszwo)
-
     HADOOP-8748. Refactor DFSClient retry utility methods to a new class in
     org.apache.hadoop.io.retry.  Contributed by Arun C Murthy.
 
     HDFS-3871. Change DFSClient to use RetryUtils.  (Arun C Murthy
     via szetszwo)
 
+    MAPREDUCE-4603. Add support for JobClient to retry job-submission when
+    JobTracker is in SafeMode. (acmurthy)
+
   BUG FIXES
 
     HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1390041&r1=1390040&r2=1390041&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
 Tue Sep 25 18:25:45 2012
@@ -40,6 +40,7 @@ import java.security.PrivilegedException
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -434,7 +439,9 @@ public class JobClient extends Configure
     }
   }
 
+  private JobSubmissionProtocol rpcJobSubmitClient;
   private JobSubmissionProtocol jobSubmitClient;
+  
   private Path sysDir = null;
   private Path stagingAreaDir = null;
   
@@ -445,6 +452,15 @@ public class JobClient extends Configure
   private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
   static int tasklogtimeout;
 
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "mapreduce.jobclient.retry.policy.enabled";
+  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 
+      false;
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "mapreduce.jobclient.retry.policy.spec";
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "10000,6,60000,10"; //t1,n1,t2,n2,...
+  
   /**
    * Create a job client.
    */
@@ -477,16 +493,61 @@ public class JobClient extends Configure
       conf.setNumMapTasks(1);
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.rpcJobSubmitClient = 
+          createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
     }        
   }
 
   private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
-    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, 
-        UserGroupInformation.getCurrentUser(), conf,
-        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    
+    JobSubmissionProtocol rpcJobSubmitClient = 
+        (JobSubmissionProtocol)RPC.getProxy(
+            JobSubmissionProtocol.class,
+            JobSubmissionProtocol.versionID, addr, 
+            UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), 
+            0,
+            RetryUtils.getMultipleLinearRandomRetry(
+                conf,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+                )
+            );
+    
+    return rpcJobSubmitClient;
+  }
+
+  private static JobSubmissionProtocol createProxy(
+      JobSubmissionProtocol rpcJobSubmitClient,
+      Configuration conf) throws IOException {
+
+    RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class
+            ); 
+    
+    /* 
+     * Method specific retry policies for killJob and killTask...
+     * 
+     * No retries on any exception including 
+     * ConnectionException and SafeModeException
+     */
+    Map<String,RetryPolicy> methodNameToPolicyMap = 
+        new HashMap<String,RetryPolicy>();
+    methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    
+    return (JobSubmissionProtocol) 
RetryProxy.create(JobSubmissionProtocol.class,
+        rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap);
   }
 
   @InterfaceAudience.Private
@@ -502,7 +563,7 @@ public class JobClient extends Configure
     public long renew(Token<?> token, Configuration conf
                       ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
 
@@ -511,7 +572,7 @@ public class JobClient extends Configure
     public void cancel(Token<?> token, Configuration conf
                        ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
 
@@ -537,15 +598,16 @@ public class JobClient extends Configure
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
     this.ugi = UserGroupInformation.getCurrentUser();
-    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+    rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf); 
+    jobSubmitClient = createProxy(rpcJobSubmitClient, conf);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
-    if (!(jobSubmitClient instanceof LocalJobRunner)) {
-      RPC.stopProxy(jobSubmitClient);
+    if (!(rpcJobSubmitClient instanceof LocalJobRunner)) {
+      RPC.stopProxy(rpcJobSubmitClient);
     }
   }
 

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390041&r1=1390040&r2=1390041&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
 Tue Sep 25 18:25:45 2012
@@ -3719,9 +3719,8 @@ public class JobTracker implements MRCon
   public JobStatus submitJob(JobID jobId, String jobSubmitDir,
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       throws IOException {
-    if (isInSafeMode()) {
-      throw new IOException("JobTracker in safemode");
-    }
+    // Check for safe-mode
+    checkSafeMode();
     
     JobInfo jobInfo = null;
     if (ugi == null) {
@@ -3804,6 +3803,9 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
+    // Check for safe-mode
+    checkSafeMode();
+
     try{
       final String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
@@ -3920,6 +3922,9 @@ public class JobTracker implements MRCon
       return;
     }
     
+    // No 'killJob' in safe-mode
+    checkSafeMode();
+    
     JobInProgress job = jobs.get(jobid);
     
     if (null == job) {
@@ -4375,6 +4380,9 @@ public class JobTracker implements MRCon
    */
   public synchronized boolean killTask(TaskAttemptID taskid, boolean 
shouldFail)
       throws IOException {
+    // No 'killTask' in safe-mode
+    checkSafeMode();
+
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
       // check both queue-level and job-level access
@@ -5266,4 +5274,15 @@ public class JobTracker implements MRCon
     return "<em>ON - " + safeModeInfo + "</em>";
   }
   
+  private void checkSafeMode() throws SafeModeException {
+    if (isInSafeMode()) {
+      try {
+        throw new SafeModeException((
+            isInAdminSafeMode()) ? adminSafeModeUser : null);
+      } catch (SafeModeException sfe) {
+        LOG.info("JobTracker in safe-mode, aborting operation", sfe);
+        throw sfe;
+      }
+    }
+  }
 }

Added: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java?rev=1390041&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
 Tue Sep 25 18:25:45 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the JobTracker is in safe mode.
+ */
+public class SafeModeException extends IOException {
+
+  private static final long serialVersionUID = 1984839257L;
+
+  /**
+   * SafeModeException
+   * @param adminUser admin who put JobTracker in safe-mode, 
+   *                  <code>null</code> if it was automatic
+   * 
+   */
+  public SafeModeException(String adminUser) {
+    super(
+        (adminUser == null) ? 
+            "JobTracker is in safe mode" : 
+              "JobTracker is in safe-mode set by admin " + adminUser);
+  }
+
+}

Added: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java?rev=1390041&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
 Tue Sep 25 18:25:45 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS.TestResult;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobClientRetries {
+  
+  private static final Log LOG = LogFactory.getLog(TestJobClientRetries.class);
+  
+  MiniMRCluster mr;
+  
+  @Test
+  public void testJobSubmission() throws Exception {
+    
+    // Start MR cluster
+    mr = new MiniMRCluster(2, "file:///", 3);
+    
+    final List<Exception> exceptions = new ArrayList<Exception>();
+
+    // Get jobConf
+    final JobConf jobConf = mr.createJobConf();
+    
+    // Stop JobTracker
+    LOG.info("Stopping JobTracker");
+    mr.stopJobTracker();
+    
+    /*
+     * Submit job *after* setting job-client retries to be *on*...
+     * the test *should* fail without this config being set
+     */
+    LOG.info("Stopping JobTracker");
+    jobConf.setBoolean(
+        JobClient.MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+    WordCountThread wc = new WordCountThread(jobConf, exceptions);
+    wc.start();
+    
+    // Restart JobTracker after a little while
+    Thread.sleep(5000);
+    LOG.info("Re-starting JobTracker for job-submission to go through");
+    mr.startJobTracker();
+    
+    // Wait for the job to complete or for an exception to occur
+    LOG.info("Waiting for job success/failure ...");
+    wc.join();
+
+    Assert.assertNotNull(wc.result);
+    Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", wc.result.output);
+    Assert.assertTrue("exceptions is not empty: " + exceptions, 
exceptions.isEmpty());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    mr.shutdown();
+  }
+  
+  public static class WordCountThread extends Thread {
+    JobConf jobConf;
+    List<Exception> exceptions;
+    TestResult result;
+    
+    public WordCountThread(JobConf jobConf, List<Exception> exceptions) {
+      super(WordCountThread.class.getName());
+      this.jobConf = jobConf;
+      this.exceptions = exceptions;
+    }
+
+    @Override
+    public void run() {
+      try {
+        FileSystem fs = FileSystem.getLocal(jobConf);
+        Path testdir = new Path(
+            System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+        final Path inDir = new Path(testdir, "input");
+        final Path outDir = new Path(testdir, "output");
+        String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+        LOG.info("Starting word-count");
+        result = 
+            TestMiniMRWithDFS.launchWordCount(
+                jobConf, inDir, outDir, input, 3, 1);
+        LOG.info("Finished word-count");
+      } catch (Exception e) {
+        LOG.error("Caught exception during word-count", e);
+        exceptions.add(e);
+        result = null;
+      }
+    }
+  }
+}


Reply via email to