YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru).
(cherry picked from commit 09999d7e014fde717e8b122773b68664f4594106) (cherry picked from commit 304ce1871406a5ee4f1e88e294088d1e01b8de3e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cb3e5147 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cb3e5147 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cb3e5147 Branch: refs/heads/branch-2.9 Commit: cb3e5147a1eda4036b793d4fca67ad471ba349f1 Parents: ad7d793 Author: Subru Krishnan <su...@apache.org> Authored: Wed Mar 28 11:33:19 2018 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Wed Mar 28 11:55:18 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 9 ++- .../yarn/conf/TestYarnConfigurationFields.java | 2 + .../TestFederationRMFailoverProxyProvider.java | 83 +++++++++++++++++++- .../FederationRMFailoverProxyProvider.java | 11 ++- 4 files changed, 95 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb3e5147/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index edeec9f..001d02e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2792,15 +2792,18 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + // 5 minutes + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + + public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = + FEDERATION_PREFIX + "flush-cache-for-rm-addr"; + public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true; public static final String FEDERATION_REGISTRY_BASE_KEY = FEDERATION_PREFIX + "registry.base-dir"; public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; - // 5 minutes - public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; - public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb3e5147/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 1d3111c..230d840 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -79,6 +79,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); configurationPropsToSkipCompare .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR); configurationPropsToSkipCompare .add(YarnConfiguration.RM_EPOCH); configurationPropsToSkipCompare http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb3e5147/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java index e3f9155..7e670c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProvider import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; @@ -49,7 +50,11 @@ import org.junit.Before; import org.junit.Test; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider { private FederationStateStore stateStore; private final String dummyCapability = "cap"; + private GetClusterMetricsResponse threadResponse; + @Before public void setUp() throws IOException, YarnException { conf = new YarnConfiguration(); - stateStore = new MemoryFederationStateStore(); + + // Configure Facade cache to use a very long ttl + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60); + + stateStore = spy(new MemoryFederationStateStore()); stateStore.init(conf); FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); + verify(stateStore, times(0)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); } @After @@ -75,12 +88,25 @@ public class TestFederationRMFailoverProxyProvider { stateStore = null; } - @Test + @Test(timeout = 60000) public void testFederationRMFailoverProxyProvider() throws Exception { + testProxyProvider(true); + } + + @Test (timeout=60000) + public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache() + throws Exception { + testProxyProvider(false); + } + + private void testProxyProvider(boolean facadeFlushCache) throws Exception { final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); final MiniYARNCluster cluster = new MiniYARNCluster( "testFederationRMFailoverProxyProvider", 3, 0, 1, 1); + conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR, + facadeFlushCache); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); @@ -100,14 +126,20 @@ public class TestFederationRMFailoverProxyProvider { // Transition rm3 to active; makeRMActive(subClusterId, cluster, 2); - ApplicationClientProtocol client = FederationProxyProviderUtil + final ApplicationClientProtocol client = FederationProxyProviderUtil .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId, UserGroupInformation.getCurrentUser()); + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + // client will retry until the rm becomes active. GetClusterMetricsResponse response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + // validate response checkResponse(response); @@ -118,7 +150,50 @@ public class TestFederationRMFailoverProxyProvider { // Transition rm2 to active; makeRMActive(subClusterId, cluster, 1); - response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + + threadResponse = null; + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + // In non flush cache case, we will be hitting the cache with old RM + // address and keep failing before the cache is flushed + threadResponse = + client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + } catch (YarnException | IOException e) { + e.printStackTrace(); + } + } + }); + thread.start(); + + if (!facadeFlushCache) { + // Add a wait so that hopefully the thread has started hitting old cached + Thread.sleep(500); + + // Should still be hitting cache + verify(stateStore, times(1)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + + // Force flush cache, so that it will pick up the new RM address + FederationStateStoreFacade.getInstance().getSubCluster(subClusterId, + true); + } + + // Wait for the thread to finish and grab result + thread.join(); + response = threadResponse; + + if (facadeFlushCache) { + verify(stateStore, atLeast(2)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + } else { + verify(stateStore, times(2)) + .getSubClusters(any(GetSubClustersInfoRequest.class)); + } // validate response checkResponse(response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb3e5147/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java index c631208..cf6d1ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T> private FederationStateStoreFacade facade; private SubClusterId subClusterId; private UserGroupInformation originalUser; - private boolean federationFailoverEnabled = false; + private boolean federationFailoverEnabled; + private boolean flushFacadeCacheForYarnRMAddr; @Override public void init(Configuration configuration, RMProxy<T> proxy, @@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider<T> String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID); Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); this.subClusterId = SubClusterId.newInstance(clusterId); - this.facade = facade.getInstance(); + this.facade = FederationStateStoreFacade.getInstance(); if (configuration instanceof YarnConfiguration) { this.conf = (YarnConfiguration) configuration; } federationFailoverEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + flushFacadeCacheForYarnRMAddr = + conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR, + YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR); conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, @@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider<T> try { LOG.info("Failing over to the ResourceManager for SubClusterId: {}", subClusterId); - subClusterInfo = facade.getSubCluster(subClusterId, isFailover); + subClusterInfo = facade.getSubCluster(subClusterId, + this.flushFacadeCacheForYarnRMAddr && isFailover); // updating the conf with the refreshed RM addresses as proxy // creations are based out of conf updateRMAddress(subClusterInfo); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org