YARN-3672. Create Facade for Federation State and Policy Store. Contributed by Subru Krishnan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1112595a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1112595a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1112595a Branch: refs/heads/YARN-2915 Commit: 1112595afdc4d4d1afddc7ea5c70afc25746d3a0 Parents: dc374d3 Author: Jian He <jia...@apache.org> Authored: Wed Aug 17 11:13:19 2016 +0800 Committer: Subru Krishnan <su...@apache.org> Committed: Tue Jul 25 16:56:31 2017 -0700 ---------------------------------------------------------------------- hadoop-project/pom.xml | 13 + .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../yarn/conf/TestYarnConfigurationFields.java | 4 + .../src/main/resources/yarn-default.xml | 20 +- .../hadoop-yarn-server-common/pom.xml | 10 + .../utils/FederationStateStoreFacade.java | 532 +++++++++++++++++++ .../server/federation/utils/package-info.java | 17 + .../utils/FederationStateStoreTestUtil.java | 149 ++++++ .../utils/TestFederationStateStoreFacade.java | 148 ++++++ 9 files changed, 905 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index b9819b4..93bbcf8 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -98,6 +98,9 @@ <apacheds.version>2.0.0-M21</apacheds.version> <ldap-api.version>1.0.0-M33</ldap-api.version> + <jcache.version>1.0.0</jcache.version> + <ehcache.version>3.0.3</ehcache.version> + <!-- define the Java language version used by the compiler --> <javac.version>1.8</javac.version> @@ -1265,6 +1268,16 @@ <artifactId>kerb-simplekdc</artifactId> <version>1.0.0</version> </dependency> + <dependency> + <groupId>javax.cache</groupId> + <artifactId>cache-api</artifactId> + <version>${jcache.version}</version> + </dependency> + <dependency> + <groupId>org.ehcache</groupId> + <artifactId>ehcache</artifactId> + <version>${ehcache.version}</version> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3e778ee..fe6c7b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2560,6 +2560,19 @@ public class YarnConfiguration extends Configuration { //////////////////////////////// public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; + + public static final String FEDERATION_STATESTORE_CLIENT_CLASS = + FEDERATION_PREFIX + "state-store.class"; + + public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS = + "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore"; + + public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = + FEDERATION_PREFIX + "cache-ttl.secs"; + + // 5 minutes + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 3da4bab..bfc2534 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -68,6 +68,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); + // Federation default configs to be ignored + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS); + // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" configurationPropsToSkipCompare.add(YarnConfiguration. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e20aad5..0b0a160 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2686,8 +2686,8 @@ <description>The arguments to pass to the Node label script.</description> <name>yarn.nodemanager.node-labels.provider.script.opts</name> </property> - <!-- Other Configuration --> + <!-- Federation Configuration --> <property> <description> Machine list file to be loaded by the FederationSubCluster Resolver @@ -2696,6 +2696,24 @@ </property> <property> + <description> + Store class name for federation state store + </description> + <name>yarn.federation.state-store.class</name> + <value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value> + </property> + + <property> + <description> + The time in seconds after which the federation state store local cache + will be refreshed periodically + </description> + <name>yarn.federation.cache-ttl.secs</name> + <value>300</value> + </property> + + <!-- Other Configuration --> + <property> <description>The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. </description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 89dec30..def5357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -102,6 +102,16 @@ <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> </dependency> + <dependency> + <groupId>javax.cache</groupId> + <artifactId>cache-api</artifactId> + <version>${jcache.version}</version> + </dependency> + <dependency> + <groupId>org.ehcache</groupId> + <artifactId>ehcache</artifactId> + <version>${ehcache.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java new file mode 100644 index 0000000..f1c8218 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.CompleteConfiguration; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import javax.cache.spi.CachingProvider; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * + * The FederationStateStoreFacade is an utility wrapper that provides singleton + * access to the Federation state store. It abstracts out retries and in + * addition, it also implements the caching for various objects. + * + */ +public final class FederationStateStoreFacade { + private static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreFacade.class); + + private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; + private static final String GET_POLICIES_CONFIGURATIONS_CACHEID = + "getPoliciesConfigurations"; + + private static final FederationStateStoreFacade FACADE = + new FederationStateStoreFacade(); + + private FederationStateStore stateStore; + private int cacheTimeToLive; + private Configuration conf; + private Cache<Object, Object> cache; + + private FederationStateStoreFacade() { + initializeFacadeInternal(new Configuration()); + } + + private void initializeFacadeInternal(Configuration config) { + this.conf = config; + try { + this.stateStore = (FederationStateStore) createRetryInstance(this.conf, + YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS, + FederationStateStore.class, createRetryPolicy(conf)); + this.stateStore.init(conf); + + initCache(); + + } catch (YarnException ex) { + LOG.error("Failed to initialize the FederationStateStoreFacade object", + ex); + throw new RuntimeException(ex); + } + } + + /** + * Delete and re-initialize the cache, to force it to use the given + * configuration. + * + * @param store the {@link FederationStateStore} instance to reinitialize with + * @param config the updated configuration to reinitialize with + */ + @VisibleForTesting + public synchronized void reinitialize(FederationStateStore store, + Configuration config) { + this.conf = config; + this.stateStore = store; + clearCache(); + initCache(); + } + + public static RetryPolicy createRetryPolicy(Configuration conf) { + // Retry settings for StateStore + RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry( + conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE), + conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS), + TimeUnit.MILLISECONDS); + + return retryPolicy; + } + + private boolean isCachingEnabled() { + return (cacheTimeToLive > 0); + } + + private void initCache() { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = + conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + if (isCachingEnabled()) { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); + if (this.cache == null) { + LOG.info("Creating a JCache Manager with name " + + this.getClass().getSimpleName()); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + CompleteConfiguration<Object, Object> configuration = + new MutableConfiguration<Object, Object>().setStoreByValue(false) + .setReadThrough(true) + .setExpiryPolicyFactory( + new FactoryBuilder.SingletonFactory<ExpiryPolicy>( + new CreatedExpiryPolicy(cacheExpiry))) + .setCacheLoaderFactory( + new FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>( + new CacheLoaderImpl<Object, Object>())); + this.cache = jcacheManager.createCache(this.getClass().getSimpleName(), + configuration); + } + } + } + + private void clearCache() { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + + jcacheManager.destroyCache(this.getClass().getSimpleName()); + this.cache = null; + } + + /** + * Returns the singleton instance of the FederationStateStoreFacade object. + * + * @return the singleton {@link FederationStateStoreFacade} instance + */ + public static FederationStateStoreFacade getInstance() { + return FACADE; + } + + /** + * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. + * + * @param subClusterId the identifier of the sub-cluster + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterInfo getSubCluster(final SubClusterId subClusterId) + throws YarnException { + if (isCachingEnabled()) { + return getSubClusters(false).get(subClusterId); + } else { + return stateStore + .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)) + .getSubClusterInfo(); + } + } + + /** + * Updates the cache with the central {@link FederationStateStore} and returns + * the {@link SubClusterInfo} for the specified {@link SubClusterId}. + * + * @param subClusterId the identifier of the sub-cluster + * @param flushCache flag to indicate if the cache should be flushed or not + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterInfo getSubCluster(final SubClusterId subClusterId, + final boolean flushCache) throws YarnException { + if (flushCache && isCachingEnabled()) { + LOG.info("Flushing subClusters from cache and rehydrating from store," + + " most likely on account of RM failover."); + cache.remove(buildGetSubClustersCacheRequest(false)); + } + return getSubCluster(subClusterId); + } + + /** + * Returns the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @return the information of all active sub cluster(s) + * @throws YarnException if the call to the state store is unsuccessful + */ + @SuppressWarnings("unchecked") + public Map<SubClusterId, SubClusterInfo> getSubClusters( + final boolean filterInactiveSubClusters) throws YarnException { + try { + if (isCachingEnabled()) { + return (Map<SubClusterId, SubClusterInfo>) cache + .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } else { + return buildSubClusterInfoMap(stateStore.getSubClusters( + GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters))); + } + } catch (Throwable ex) { + throw new YarnException(ex); + } + } + + /** + * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. + * + * @param queue the queue whose policy is required + * @return the corresponding configured policy + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterPolicyConfiguration getPolicyConfiguration( + final String queue) throws YarnException { + if (isCachingEnabled()) { + return getPoliciesConfigurations().get(queue); + } else { + return stateStore + .getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest.newInstance(queue)) + .getPolicyConfiguration(); + } + + } + + /** + * Get the policies that is represented as + * {@link SubClusterPolicyConfiguration} for all currently active queues in + * the system. + * + * @return the policies for all currently active queues in the system + * @throws YarnException if the call to the state store is unsuccessful + */ + @SuppressWarnings("unchecked") + public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations() + throws YarnException { + try { + if (isCachingEnabled()) { + return (Map<String, SubClusterPolicyConfiguration>) cache + .get(buildGetPoliciesConfigurationsCacheRequest()); + } else { + return buildPolicyConfigMap(stateStore.getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest.newInstance())); + } + } catch (Throwable ex) { + throw new YarnException(ex); + } + } + + /** + * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}. + * + * @param appHomeSubCluster the mapping of the application to it's home + * sub-cluster + * @throws YarnException if the call to the state store is unsuccessful + */ + public void addApplicationHomeSubCluster( + ApplicationHomeSubCluster appHomeSubCluster) throws YarnException { + stateStore.addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster)); + return; + } + + /** + * Updates the home {@link SubClusterId} for the specified + * {@link ApplicationId}. + * + * @param appHomeSubCluster the mapping of the application to it's home + * sub-cluster + * @throws YarnException if the call to the state store is unsuccessful + */ + public void updateApplicationHomeSubCluster( + ApplicationHomeSubCluster appHomeSubCluster) throws YarnException { + stateStore.updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster)); + return; + } + + /** + * Returns the home {@link SubClusterId} for the specified + * {@link ApplicationId}. + * + * @param appId the identifier of the application + * @return the home sub cluster identifier + * @throws YarnException if the call to the state store is unsuccessful + */ + public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) + throws YarnException { + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest.newInstance(appId)); + return response.getApplicationHomeSubCluster().getHomeSubCluster(); + } + + /** + * Helper method to create instances of Object using the class name defined in + * the configuration object. The instances creates {@link RetryProxy} using + * the specific {@link RetryPolicy}. + * + * @param conf the yarn configuration + * @param configuredClassName the configuration provider key + * @param defaultValue the default implementation for fallback + * @param type the class for which a retry proxy is required + * @param retryPolicy the policy for retrying method call failures + * @return a retry proxy for the specified interface + */ + @SuppressWarnings("unchecked") + public static <T> Object createRetryInstance(Configuration conf, + String configuredClassName, String defaultValue, Class<T> type, + RetryPolicy retryPolicy) { + + String className = conf.get(configuredClassName, defaultValue); + try { + Class<?> clusterResolverClass = conf.getClassByName(className); + if (type.isAssignableFrom(clusterResolverClass)) { + return RetryProxy.create(type, + (T) ReflectionUtils.newInstance(clusterResolverClass, conf), + retryPolicy); + } else { + throw new YarnRuntimeException( + "Class: " + className + " not instance of " + type.getSimpleName()); + } + } catch (Exception e) { + throw new YarnRuntimeException("Could not instantiate : " + className, e); + } + } + + private Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap( + final GetSubClustersInfoResponse response) { + List<SubClusterInfo> subClusters = response.getSubClusters(); + Map<SubClusterId, SubClusterInfo> subClustersMap = + new HashMap<>(subClusters.size()); + for (SubClusterInfo subCluster : subClusters) { + subClustersMap.put(subCluster.getSubClusterId(), subCluster); + } + return subClustersMap; + } + + private Object buildGetSubClustersCacheRequest( + final boolean filterInactiveSubClusters) { + final String cacheKey = buildCacheKey(getClass().getSimpleName(), + GET_SUBCLUSTERS_CACHEID, null); + CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest = + new CacheRequest<String, Map<SubClusterId, SubClusterInfo>>(cacheKey, + new Func<String, Map<SubClusterId, SubClusterInfo>>() { + @Override + public Map<SubClusterId, SubClusterInfo> invoke(String key) + throws Exception { + GetSubClustersInfoResponse subClusters = + stateStore.getSubClusters(GetSubClustersInfoRequest + .newInstance(filterInactiveSubClusters)); + return buildSubClusterInfoMap(subClusters); + } + }); + return cacheRequest; + } + + private Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap( + GetSubClusterPoliciesConfigurationsResponse response) { + List<SubClusterPolicyConfiguration> policyConfigs = + response.getPoliciesConfigs(); + Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs = + new HashMap<>(); + for (SubClusterPolicyConfiguration policyConfig : policyConfigs) { + queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig); + } + return queuePolicyConfigs; + } + + private Object buildGetPoliciesConfigurationsCacheRequest() { + final String cacheKey = buildCacheKey(getClass().getSimpleName(), + GET_POLICIES_CONFIGURATIONS_CACHEID, null); + CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest = + new CacheRequest<String, Map<String, SubClusterPolicyConfiguration>>( + cacheKey, + new Func<String, Map<String, SubClusterPolicyConfiguration>>() { + @Override + public Map<String, SubClusterPolicyConfiguration> invoke( + String key) throws Exception { + GetSubClusterPoliciesConfigurationsResponse policyConfigs = + stateStore.getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest + .newInstance()); + return buildPolicyConfigMap(policyConfigs); + } + }); + return cacheRequest; + } + + protected String buildCacheKey(String typeName, String methodName, + String argName) { + StringBuilder buffer = new StringBuilder(); + buffer.append(typeName).append("."); + buffer.append(methodName); + if (argName != null) { + buffer.append("::"); + buffer.append(argName); + } + return buffer.toString(); + } + + /** + * Internal class that implements the CacheLoader interface that can be + * plugged into the CacheManager to load objects into the cache for specified + * keys. + */ + private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> { + @SuppressWarnings("unchecked") + @Override + public V load(K key) throws CacheLoaderException { + try { + CacheRequest<K, V> query = (CacheRequest<K, V>) key; + assert query != null; + return query.getValue(); + } catch (Throwable ex) { + throw new CacheLoaderException(ex); + } + } + + @Override + public Map<K, V> loadAll(Iterable<? extends K> keys) + throws CacheLoaderException { + // The FACADE does not use the Cache's getAll API. Hence this is not + // required to be implemented + throw new NotImplementedException(); + } + } + + /** + * Internal class that encapsulates the cache key and a function that returns + * the value for the specified key. + */ + private static class CacheRequest<K, V> { + private K key; + private Func<K, V> func; + + public CacheRequest(K key, Func<K, V> func) { + this.key = key; + this.func = func; + } + + public V getValue() throws Exception { + return func.invoke(key); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((key == null) ? 0 : key.hashCode()); + return result; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + CacheRequest<K, V> other = (CacheRequest<K, V>) obj; + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + + return true; + } + } + + /** + * Encapsulates a method that has one parameter and returns a value of the + * type specified by the TResult parameter. + */ + protected interface Func<T, TResult> { + TResult invoke(T input) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java new file mode 100644 index 0000000..39a46ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.utils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java new file mode 100644 index 0000000..c179521 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * Utility class for FederationStateStore unit tests. + */ +public class FederationStateStoreTestUtil { + + private static final MonotonicClock CLOCK = new MonotonicClock(); + + public static final String SC_PREFIX = "SC-"; + public static final String Q_PREFIX = "queue-"; + public static final String POLICY_PREFIX = "policy-"; + + private FederationStateStore stateStore; + + public FederationStateStoreTestUtil(FederationStateStore stateStore) { + this.stateStore = stateStore; + } + + private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { + + String amRMAddress = "1.2.3.4:1"; + String clientRMAddress = "1.2.3.4:2"; + String rmAdminAddress = "1.2.3.4:3"; + String webAppAddress = "1.2.3.4:4"; + + return SubClusterInfo.newInstance(subClusterId, amRMAddress, + clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, + CLOCK.getTime(), "capability"); + } + + private void registerSubCluster(SubClusterId subClusterId) + throws YarnException { + + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + } + + public void registerSubClusters(int numSubClusters) throws YarnException { + + for (int i = 0; i < numSubClusters; i++) { + registerSubCluster(SubClusterId.newInstance(SC_PREFIX + i)); + } + } + + private void addApplicationHomeSC(ApplicationId appId, + SubClusterId subClusterId) throws YarnException { + ApplicationHomeSubCluster ahsc = + ApplicationHomeSubCluster.newInstance(appId, subClusterId); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(ahsc); + stateStore.addApplicationHomeSubCluster(request); + } + + public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException { + for (int i = 0; i < numApps; i++) { + addApplicationHomeSC(ApplicationId.newInstance(clusterTs, i), + SubClusterId.newInstance(SC_PREFIX + i)); + } + } + + private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, + String policyType) { + return SubClusterPolicyConfiguration.newInstance(queueName, policyType, + ByteBuffer.allocate(1)); + } + + private void setPolicyConf(String queue, String policyType) + throws YarnException { + SetSubClusterPolicyConfigurationRequest request = + SetSubClusterPolicyConfigurationRequest + .newInstance(createSCPolicyConf(queue, policyType)); + stateStore.setPolicyConfiguration(request); + } + + public void addPolicyConfigs(int numQueues) throws YarnException { + + for (int i = 0; i < numQueues; i++) { + setPolicyConf(Q_PREFIX + i, POLICY_PREFIX + i); + } + } + + public SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) + throws YarnException { + GetSubClusterInfoRequest request = + GetSubClusterInfoRequest.newInstance(subClusterId); + return stateStore.getSubCluster(request).getSubClusterInfo(); + } + + public SubClusterId queryApplicationHomeSC(ApplicationId appId) + throws YarnException { + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(appId); + + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster(request); + + return response.getApplicationHomeSubCluster().getHomeSubCluster(); + } + + public SubClusterPolicyConfiguration queryPolicyConfiguration(String queue) + throws YarnException { + GetSubClusterPolicyConfigurationRequest request = + GetSubClusterPolicyConfigurationRequest.newInstance(queue); + + GetSubClusterPolicyConfigurationResponse result = + stateStore.getPolicyConfiguration(request); + return result.getPolicyConfiguration(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1112595a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java new file mode 100644 index 0000000..53f4f84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Unit tests for FederationStateStoreFacade. + */ +@RunWith(Parameterized.class) +public class TestFederationStateStoreFacade { + + @Parameters + public static Collection<Boolean[]> getParameters() { + return Arrays + .asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } }); + } + + private final long clusterTs = System.currentTimeMillis(); + private final int numSubClusters = 3; + private final int numApps = 5; + private final int numQueues = 2; + + private Configuration conf; + private FederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreTestUtil; + private FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + + public TestFederationStateStoreFacade(Boolean isCachingEnabled) { + conf = new Configuration(); + if (!(isCachingEnabled.booleanValue())) { + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + } + } + + @Before + public void setUp() throws IOException, YarnException { + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + facade.reinitialize(stateStore, conf); + // hydrate the store + stateStoreTestUtil = new FederationStateStoreTestUtil(stateStore); + stateStoreTestUtil.registerSubClusters(numSubClusters); + stateStoreTestUtil.addAppsHomeSC(clusterTs, numApps); + stateStoreTestUtil.addPolicyConfigs(numQueues); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + @Test + public void testGetSubCluster() throws YarnException { + for (int i = 0; i < numSubClusters; i++) { + SubClusterId subClusterId = + SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i); + Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId), + facade.getSubCluster(subClusterId)); + } + } + + @Test + public void testGetSubClusterFlushCache() throws YarnException { + for (int i = 0; i < numSubClusters; i++) { + SubClusterId subClusterId = + SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i); + Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId), + facade.getSubCluster(subClusterId, true)); + } + } + + @Test + public void testGetSubClusters() throws YarnException { + Map<SubClusterId, SubClusterInfo> subClusters = + facade.getSubClusters(false); + for (SubClusterId subClusterId : subClusters.keySet()) { + Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId), + subClusters.get(subClusterId)); + } + } + + @Test + public void testGetPolicyConfiguration() throws YarnException { + for (int i = 0; i < numQueues; i++) { + String queue = FederationStateStoreTestUtil.Q_PREFIX + i; + Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue), + facade.getPolicyConfiguration(queue)); + } + } + + @Test + public void testGetPoliciesConfigurations() throws YarnException { + Map<String, SubClusterPolicyConfiguration> queuePolicies = + facade.getPoliciesConfigurations(); + for (String queue : queuePolicies.keySet()) { + Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue), + queuePolicies.get(queue)); + } + } + + @Test + public void testGetHomeSubClusterForApp() throws YarnException { + for (int i = 0; i < numApps; i++) { + ApplicationId appId = ApplicationId.newInstance(clusterTs, i); + Assert.assertEquals(stateStoreTestUtil.queryApplicationHomeSC(appId), + facade.getApplicationHomeSubCluster(appId)); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org