Author: acmurthy Date: Tue Sep 25 18:25:45 2012 New Revision: 1390041 URL: http://svn.apache.org/viewvc?rev=1390041&view=rev Log: MAPREDUCE-4603. Add support for JobClient to retry job-submission when JobTracker is in SafeMode. Contributed by Arun C. Murthy.
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1390041&r1=1390040&r2=1390041&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Sep 25 18:25:45 2012 @@ -394,14 +394,15 @@ Release 1.1.0 - unreleased MAPREDUCE-782. Use PureJavaCrc32 in mapreduce spills. (Todd Lipcon, backport by Brandon Li via sseth) - HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo) - HADOOP-8748. Refactor DFSClient retry utility methods to a new class in org.apache.hadoop.io.retry. Contributed by Arun C Murthy. HDFS-3871. Change DFSClient to use RetryUtils. (Arun C Murthy via szetszwo) + MAPREDUCE-4603. Add support for JobClient to retry job-submission when + JobTracker is in SafeMode. (acmurthy) + BUG FIXES HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1390041&r1=1390040&r2=1390041&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Sep 25 18:25:45 2012 @@ -40,6 +40,7 @@ import java.security.PrivilegedException import java.util.Arrays; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.Counters.Counter; @@ -434,7 +439,9 @@ public class JobClient extends Configure } } + private JobSubmissionProtocol rpcJobSubmitClient; private JobSubmissionProtocol jobSubmitClient; + private Path sysDir = null; private Path stagingAreaDir = null; @@ -445,6 +452,15 @@ public class JobClient extends Configure private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; static int tasklogtimeout; + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = + "mapreduce.jobclient.retry.policy.enabled"; + public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = + false; + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = + "mapreduce.jobclient.retry.policy.spec"; + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = + "10000,6,60000,10"; //t1,n1,t2,n2,... + /** * Create a job client. */ @@ -477,16 +493,61 @@ public class JobClient extends Configure conf.setNumMapTasks(1); this.jobSubmitClient = new LocalJobRunner(conf); } else { - this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); + this.rpcJobSubmitClient = + createRPCProxy(JobTracker.getAddress(conf), conf); + this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf); } } private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { - return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, - UserGroupInformation.getCurrentUser(), conf, - NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); + + JobSubmissionProtocol rpcJobSubmitClient = + (JobSubmissionProtocol)RPC.getProxy( + JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, addr, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), + 0, + RetryUtils.getMultipleLinearRandomRetry( + conf, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT + ) + ); + + return rpcJobSubmitClient; + } + + private static JobSubmissionProtocol createProxy( + JobSubmissionProtocol rpcJobSubmitClient, + Configuration conf) throws IOException { + + RetryPolicy defaultPolicy = + RetryUtils.getDefaultRetryPolicy( + conf, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT, + SafeModeException.class + ); + + /* + * Method specific retry policies for killJob and killTask... + * + * No retries on any exception including + * ConnectionException and SafeModeException + */ + Map<String,RetryPolicy> methodNameToPolicyMap = + new HashMap<String,RetryPolicy>(); + methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL); + methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL); + + return (JobSubmissionProtocol) RetryProxy.create(JobSubmissionProtocol.class, + rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap); } @InterfaceAudience.Private @@ -502,7 +563,7 @@ public class JobClient extends Configure public long renew(Token<?> token, Configuration conf ) throws IOException, InterruptedException { InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - JobSubmissionProtocol jt = createRPCProxy(addr, conf); + JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf); return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token); } @@ -511,7 +572,7 @@ public class JobClient extends Configure public void cancel(Token<?> token, Configuration conf ) throws IOException, InterruptedException { InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - JobSubmissionProtocol jt = createRPCProxy(addr, conf); + JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf); jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token); } @@ -537,15 +598,16 @@ public class JobClient extends Configure public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.ugi = UserGroupInformation.getCurrentUser(); - jobSubmitClient = createRPCProxy(jobTrackAddr, conf); + rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf); + jobSubmitClient = createProxy(rpcJobSubmitClient, conf); } /** * Close the <code>JobClient</code>. */ public synchronized void close() throws IOException { - if (!(jobSubmitClient instanceof LocalJobRunner)) { - RPC.stopProxy(jobSubmitClient); + if (!(rpcJobSubmitClient instanceof LocalJobRunner)) { + RPC.stopProxy(rpcJobSubmitClient); } } Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390041&r1=1390040&r2=1390041&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 25 18:25:45 2012 @@ -3719,9 +3719,8 @@ public class JobTracker implements MRCon public JobStatus submitJob(JobID jobId, String jobSubmitDir, UserGroupInformation ugi, Credentials ts, boolean recovered) throws IOException { - if (isInSafeMode()) { - throw new IOException("JobTracker in safemode"); - } + // Check for safe-mode + checkSafeMode(); JobInfo jobInfo = null; if (ugi == null) { @@ -3804,6 +3803,9 @@ public class JobTracker implements MRCon * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir() */ public String getStagingAreaDir() throws IOException { + // Check for safe-mode + checkSafeMode(); + try{ final String user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -3920,6 +3922,9 @@ public class JobTracker implements MRCon return; } + // No 'killJob' in safe-mode + checkSafeMode(); + JobInProgress job = jobs.get(jobid); if (null == job) { @@ -4375,6 +4380,9 @@ public class JobTracker implements MRCon */ public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException { + // No 'killTask' in safe-mode + checkSafeMode(); + TaskInProgress tip = taskidToTIPMap.get(taskid); if(tip != null) { // check both queue-level and job-level access @@ -5266,4 +5274,15 @@ public class JobTracker implements MRCon return "<em>ON - " + safeModeInfo + "</em>"; } + private void checkSafeMode() throws SafeModeException { + if (isInSafeMode()) { + try { + throw new SafeModeException(( + isInAdminSafeMode()) ? adminSafeModeUser : null); + } catch (SafeModeException sfe) { + LOG.info("JobTracker in safe-mode, aborting operation", sfe); + throw sfe; + } + } + } } Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java?rev=1390041&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java (added) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java Tue Sep 25 18:25:45 2012 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +/** + * This exception is thrown when the JobTracker is in safe mode. + */ +public class SafeModeException extends IOException { + + private static final long serialVersionUID = 1984839257L; + + /** + * SafeModeException + * @param adminUser admin who put JobTracker in safe-mode, + * <code>null</code> if it was automatic + * + */ + public SafeModeException(String adminUser) { + super( + (adminUser == null) ? + "JobTracker is in safe mode" : + "JobTracker is in safe-mode set by admin " + adminUser); + } + +} Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java?rev=1390041&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java Tue Sep 25 18:25:45 2012 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.TestMiniMRWithDFS.TestResult; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestJobClientRetries { + + private static final Log LOG = LogFactory.getLog(TestJobClientRetries.class); + + MiniMRCluster mr; + + @Test + public void testJobSubmission() throws Exception { + + // Start MR cluster + mr = new MiniMRCluster(2, "file:///", 3); + + final List<Exception> exceptions = new ArrayList<Exception>(); + + // Get jobConf + final JobConf jobConf = mr.createJobConf(); + + // Stop JobTracker + LOG.info("Stopping JobTracker"); + mr.stopJobTracker(); + + /* + * Submit job *after* setting job-client retries to be *on*... + * the test *should* fail without this config being set + */ + LOG.info("Stopping JobTracker"); + jobConf.setBoolean( + JobClient.MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, true); + WordCountThread wc = new WordCountThread(jobConf, exceptions); + wc.start(); + + // Restart JobTracker after a little while + Thread.sleep(5000); + LOG.info("Re-starting JobTracker for job-submission to go through"); + mr.startJobTracker(); + + // Wait for the job to complete or for an exception to occur + LOG.info("Waiting for job success/failure ..."); + wc.join(); + + Assert.assertNotNull(wc.result); + Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", wc.result.output); + Assert.assertTrue("exceptions is not empty: " + exceptions, exceptions.isEmpty()); + } + + @After + public void tearDown() throws Exception { + mr.shutdown(); + } + + public static class WordCountThread extends Thread { + JobConf jobConf; + List<Exception> exceptions; + TestResult result; + + public WordCountThread(JobConf jobConf, List<Exception> exceptions) { + super(WordCountThread.class.getName()); + this.jobConf = jobConf; + this.exceptions = exceptions; + } + + @Override + public void run() { + try { + FileSystem fs = FileSystem.getLocal(jobConf); + Path testdir = new Path( + System.getProperty("test.build.data","/tmp")).makeQualified(fs); + final Path inDir = new Path(testdir, "input"); + final Path outDir = new Path(testdir, "output"); + String input = "The quick brown fox\nhas many silly\nred fox sox\n"; + LOG.info("Starting word-count"); + result = + TestMiniMRWithDFS.launchWordCount( + jobConf, inDir, outDir, input, 3, 1); + LOG.info("Finished word-count"); + } catch (Exception e) { + LOG.error("Caught exception during word-count", e); + exceptions.add(e); + result = null; + } + } + } +}