This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 80662741870a YARN-11663. [Federation] Add Cache Entity Nums Limit. (#6662) Contributed by Shilun Fan. 80662741870a is described below commit 80662741870ae93493e0da53651dcd486cf13dd8 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Tue Apr 2 07:47:59 2024 +0800 YARN-11663. [Federation] Add Cache Entity Nums Limit. (#6662) Contributed by Shilun Fan. Reviewed-by: Dinesh Chitlangia <dine...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 4 ++ .../src/main/resources/yarn-default.xml | 9 ++++ .../federation/cache/FederationGuavaCache.java | 12 ++++- .../server/federation/cache/FederationJCache.java | 60 +++++++++++----------- .../utils/TestFederationStateStoreFacade.java | 4 +- 5 files changed, 56 insertions(+), 33 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0ab4107c1320..650e82d67381 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4031,6 +4031,10 @@ public class YarnConfiguration extends Configuration { // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + public static final String FEDERATION_CACHE_ENTITY_NUMS = + FEDERATION_PREFIX + "cache-entity.nums"; + public static final int DEFAULT_FEDERATION_CACHE_ENTITY_NUMS = 1000; + public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = FEDERATION_PREFIX + "flush-cache-for-rm-addr"; public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 41e38f601cbd..6b2d2cd817c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3787,6 +3787,15 @@ <value>300</value> </property> + <property> + <description> + The number of entries in the Federation cache. + default is 1000. + </description> + <name>yarn.federation.cache-entity.nums</name> + <value>1000</value> + </property> + <property> <description>The registry base directory for federation.</description> <name>yarn.federation.registry.base-dir</name> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java index 5ab0ef77218d..2ba9e2869fe8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java @@ -27,15 +27,20 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.TimeUnit; public class FederationGuavaCache extends FederationCache { + private static final Logger LOG = LoggerFactory.getLogger(FederationCache.class); + private Cache<String, CacheRequest<String, ?>> cache; private int cacheTimeToLive; + private long cacheEntityNums; private String className = this.getClass().getSimpleName(); @@ -52,6 +57,8 @@ public class FederationGuavaCache extends FederationCache { // no conflict or pick up a specific one in the future. cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS); if (cacheTimeToLive <= 0) { isCachingEnabled = false; return; @@ -59,8 +66,11 @@ public class FederationGuavaCache extends FederationCache { this.setStateStore(pStateStore); // Initialize Cache. + LOG.info("Creating a JCache Manager with name {}. " + + "Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive, + cacheEntityNums); cache = CacheBuilder.newBuilder().expireAfterWrite(cacheTimeToLive, - TimeUnit.MILLISECONDS).build(); + TimeUnit.SECONDS).maximumSize(cacheEntityNums).build(); isCachingEnabled = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java index 4b530149b48d..b4dbefe1278a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java @@ -26,32 +26,31 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.ehcache.Cache; +import org.ehcache.CacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.ExpiryPolicyBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.expiry.ExpiryPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.Caching; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.expiry.CreatedExpiryPolicy; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.spi.CachingProvider; +import java.time.Duration; import java.util.Map; -import java.util.concurrent.TimeUnit; public class FederationJCache extends FederationCache { private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); - private Cache<String, CacheRequest<String, ?>> cache; + private Cache<String, CacheRequest> cache; private int cacheTimeToLive; + private long cacheEntityNums; private boolean isCachingEnabled = false; - private String className = this.getClass().getSimpleName(); + private final String className = this.getClass().getSimpleName(); @Override public boolean isCachingEnabled() { @@ -64,33 +63,35 @@ public class FederationJCache extends FederationCache { // no conflict or pick up a specific one in the future cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS); if (cacheTimeToLive <= 0) { isCachingEnabled = false; return; } this.setStateStore(pStateStore); - CachingProvider jcacheProvider = Caching.getCachingProvider(); - CacheManager jcacheManager = jcacheProvider.getCacheManager(); - this.cache = jcacheManager.getCache(className); + CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true); + if (this.cache == null) { - LOG.info("Creating a JCache Manager with name {}.", className); - Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); - FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory = - new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); - MutableConfiguration<String, CacheRequest<String, ?>> configuration = - new MutableConfiguration<>(); - configuration.setStoreByValue(false); - configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory); - this.cache = jcacheManager.createCache(className, configuration); + LOG.info("Creating a JCache Manager with name {}. " + + "Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive, + cacheEntityNums); + // Set the number of caches + ResourcePoolsBuilder poolsBuilder = ResourcePoolsBuilder.heap(cacheEntityNums); + ExpiryPolicy expiryPolicy = ExpiryPolicyBuilder.timeToLiveExpiration( + Duration.ofSeconds(cacheTimeToLive)); + CacheConfigurationBuilder<String, CacheRequest> configurationBuilder = + CacheConfigurationBuilder.newCacheConfigurationBuilder( + String.class, CacheRequest.class, poolsBuilder) + .withExpiry(expiryPolicy); + cache = cacheManager.createCache(className, configurationBuilder); } isCachingEnabled = true; } @Override public void clearCache() { - CachingProvider jcacheProvider = Caching.getCachingProvider(); - CacheManager jcacheManager = jcacheProvider.getCacheManager(); - jcacheManager.destroyCache(className); + this.cache = null; } @@ -142,13 +143,12 @@ public class FederationJCache extends FederationCache { } @VisibleForTesting - public Cache<String, CacheRequest<String, ?>> getCache() { + public Cache<String, CacheRequest> getCache() { return cache; } @VisibleForTesting - public String getAppHomeSubClusterCacheKey(ApplicationId appId) - throws YarnException { + public String getAppHomeSubClusterCacheKey(ApplicationId appId) { return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, appId.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java index e2192e8ae7f2..25f47ab7d0d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -57,7 +57,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import javax.cache.Cache; +import org.ehcache.Cache; /** * Unit tests for FederationStateStoreFacade. @@ -245,7 +245,7 @@ public class TestFederationStateStoreFacade { assert fedCache instanceof FederationJCache; FederationJCache jCache = (FederationJCache) fedCache; String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId); - Cache<String, CacheRequest<String, ?>> cache = jCache.getCache(); + Cache<String, CacheRequest> cache = jCache.getCache(); CacheRequest<String, ?> cacheRequest = cache.get(cacheKey); ApplicationHomeSubClusterCacheResponse response = ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org