Author: vinodkv Date: Thu Sep 19 22:36:58 2013 New Revision: 1524857 URL: http://svn.apache.org/r1524857 Log: MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. Contributed by Jian He. svn merge --ignore-ancestry -c 1524856 ../../trunk/
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1524857&r1=1524856&r2=1524857&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Sep 19 22:36:58 2013 @@ -56,6 +56,9 @@ Release 2.2.0 - UNRELEASED MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta via tgraves) + MAPREDUCE-5488. Changed MR client to keep trying to reach the application + when it sees that on attempt's AM is down. (Jian He via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1524857&r1=1524856&r2=1524857&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Sep 19 22:36:58 2013 @@ -496,6 +496,12 @@ <Field name="sslFileBufferSize" /> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> + + <Match> + <Class name="org.apache.hadoop.mapred.ClientServiceDelegate" /> + <Method name="invoke" /> + <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" /> + </Match> <Match> <Class name="org.apache.hadoop.mapreduce.util.ProcessTree" /> Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1524857&r1=1524856&r2=1524857&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Sep 19 22:36:58 2013 @@ -352,7 +352,7 @@ public interface MRJobConfig { public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3; /** - * The number of client retries to the RM/HS/AM before throwing exception. + * The number of client retries to the RM/HS before throwing exception. */ public static final String MR_CLIENT_MAX_RETRIES = MR_PREFIX + "client.max-retries"; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1524857&r1=1524856&r2=1524857&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Sep 19 22:36:58 2013 @@ -1416,7 +1416,7 @@ <property> <name>yarn.app.mapreduce.client-am.ipc.max-retries</name> - <value>1</value> + <value>3</value> <description>The number of client retries to the AM - before reconnecting to the RM to fetch Application Status.</description> </property> @@ -1424,7 +1424,7 @@ <property> <name>yarn.app.mapreduce.client.max-retries</name> <value>3</value> - <description>The number of client retries to the RM/HS/AM before + <description>The number of client retries to the RM/HS before throwing exception. This is a layer above the ipc.</description> </property> Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1524857&r1=1524856&r2=1524857&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 19 22:36:58 2013 @@ -26,6 +26,7 @@ import java.security.PrivilegedException import java.util.EnumSet; import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.ipc.YarnRP import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; + public class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); private static final String UNAVAILABLE = "N/A"; @@ -93,7 +96,8 @@ public class ClientServiceDelegate { private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static String UNKNOWN_USER = "Unknown User"; private String trackingUrl; - + private AtomicBoolean usingAMProxy = new AtomicBoolean(false); + private int maxClientRetry; private boolean amAclDisabledStatusLogged = false; public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, @@ -287,6 +291,7 @@ public class ClientServiceDelegate { MRClientProtocol proxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, serviceAddr, conf); + usingAMProxy.set(true); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); return proxy; } @@ -301,13 +306,15 @@ public class ClientServiceDelegate { } catch (NoSuchMethodException e) { throw new YarnRuntimeException("Method name mismatch", e); } - int maxRetries = this.conf.getInt( + maxClientRetry = this.conf.getInt( MRJobConfig.MR_CLIENT_MAX_RETRIES, MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES); IOException lastException = null; - while (maxRetries > 0) { + while (maxClientRetry > 0) { + MRClientProtocol MRClientProxy = null; try { - return methodOb.invoke(getProxy(), args); + MRClientProxy = getProxy(); + return methodOb.invoke(MRClientProxy, args); } catch (InvocationTargetException e) { // Will not throw out YarnException anymore LOG.debug("Failed to contact AM/History for job " + jobId + @@ -315,22 +322,44 @@ public class ClientServiceDelegate { // Force reconnection by setting the proxy to null. realProxy = null; // HS/AMS shut down - maxRetries--; + // if it's AM shut down, do not decrement maxClientRetry as we wait for + // AM to be restarted. + if (!usingAMProxy.get()) { + maxClientRetry--; + } + usingAMProxy.set(false); lastException = new IOException(e.getTargetException()); - + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("ClientServiceDelegate invoke call interrupted", ie); + throw new YarnRuntimeException(ie); + } } catch (Exception e) { LOG.debug("Failed to contact AM/History for job " + jobId + " Will retry..", e); // Force reconnection by setting the proxy to null. realProxy = null; // RM shutdown - maxRetries--; - lastException = new IOException(e.getMessage()); + maxClientRetry--; + lastException = new IOException(e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("ClientServiceDelegate invoke call interrupted", ie); + throw new YarnRuntimeException(ie); + } } } throw lastException; } + // Only for testing + @VisibleForTesting + public int getMaxClientRetry() { + return this.maxClientRetry; + } + public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1524857&r1=1524856&r2=1524857&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Sep 19 22:36:58 2013 @@ -141,6 +141,48 @@ public class TestClientServiceDelegate { } @Test + public void testRetriesOnAMConnectionFailures() throws Exception { + if (!isAMReachableFromClient) { + return; + } + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(getRunningApplicationReport("am1", 78)); + + // throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and + // succeed in the 5th call. + final MRClientProtocol amProxy = mock(MRClientProtocol.class); + when(amProxy.getJobReport(any(GetJobReportRequest.class))) + .thenThrow(new RuntimeException("11")) + .thenThrow(new RuntimeException("22")) + .thenThrow(new RuntimeException("33")) + .thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse()); + Configuration conf = new YarnConfiguration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, + !isAMReachableFromClient); + ClientServiceDelegate clientServiceDelegate = + new ClientServiceDelegate(conf, rm, oldJobId, null) { + @Override + MRClientProtocol instantiateAMProxy( + final InetSocketAddress serviceAddr) throws IOException { + super.instantiateAMProxy(serviceAddr); + return amProxy; + } + }; + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + + Assert.assertNotNull(jobStatus); + // assert maxClientRetry is not decremented. + Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, + MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate + .getMaxClientRetry()); + verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class)); + } + + @Test public void testHistoryServerNotConfigured() throws Exception { //RM doesn't have app report and job History Server is not configured ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(