This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 80662741870a YARN-11663. [Federation] Add Cache Entity Nums Limit. 
(#6662) Contributed by Shilun Fan.
80662741870a is described below

commit 80662741870ae93493e0da53651dcd486cf13dd8
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Tue Apr 2 07:47:59 2024 +0800

    YARN-11663. [Federation] Add Cache Entity Nums Limit. (#6662) Contributed 
by Shilun Fan.
    
    Reviewed-by: Dinesh Chitlangia <dine...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  4 ++
 .../src/main/resources/yarn-default.xml            |  9 ++++
 .../federation/cache/FederationGuavaCache.java     | 12 ++++-
 .../server/federation/cache/FederationJCache.java  | 60 +++++++++++-----------
 .../utils/TestFederationStateStoreFacade.java      |  4 +-
 5 files changed, 56 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0ab4107c1320..650e82d67381 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4031,6 +4031,10 @@ public class YarnConfiguration extends Configuration {
   // 5 minutes
   public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
 
+  public static final String FEDERATION_CACHE_ENTITY_NUMS =
+      FEDERATION_PREFIX + "cache-entity.nums";
+  public static final int DEFAULT_FEDERATION_CACHE_ENTITY_NUMS = 1000;
+
   public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
       FEDERATION_PREFIX + "flush-cache-for-rm-addr";
   public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = 
true;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 41e38f601cbd..6b2d2cd817c6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3787,6 +3787,15 @@
    <value>300</value>
   </property>
 
+  <property>
+    <description>
+      The number of entries in the Federation cache.
+      default is 1000.
+    </description>
+    <name>yarn.federation.cache-entity.nums</name>
+    <value>1000</value>
+  </property>
+
   <property>
     <description>The registry base directory for federation.</description>
     <name>yarn.federation.registry.base-dir</name>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java
index 5ab0ef77218d..2ba9e2869fe8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java
@@ -27,15 +27,20 @@ import 
org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 public class FederationGuavaCache extends FederationCache {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(FederationCache.class);
+
   private Cache<String, CacheRequest<String, ?>> cache;
 
   private int cacheTimeToLive;
+  private long cacheEntityNums;
 
   private String className = this.getClass().getSimpleName();
 
@@ -52,6 +57,8 @@ public class FederationGuavaCache extends FederationCache {
     // no conflict or pick up a specific one in the future.
     cacheTimeToLive = 
pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
         YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    cacheEntityNums = 
pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS);
     if (cacheTimeToLive <= 0) {
       isCachingEnabled = false;
       return;
@@ -59,8 +66,11 @@ public class FederationGuavaCache extends FederationCache {
     this.setStateStore(pStateStore);
 
     // Initialize Cache.
+    LOG.info("Creating a JCache Manager with name {}. " +
+        "Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, 
cacheTimeToLive,
+        cacheEntityNums);
     cache = CacheBuilder.newBuilder().expireAfterWrite(cacheTimeToLive,
-        TimeUnit.MILLISECONDS).build();
+        TimeUnit.SECONDS).maximumSize(cacheEntityNums).build();
     isCachingEnabled = true;
   }
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java
index 4b530149b48d..b4dbefe1278a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java
@@ -26,32 +26,31 @@ import 
org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.ehcache.Cache;
+import org.ehcache.CacheManager;
+import org.ehcache.config.builders.CacheConfigurationBuilder;
+import org.ehcache.config.builders.CacheManagerBuilder;
+import org.ehcache.config.builders.ExpiryPolicyBuilder;
+import org.ehcache.config.builders.ResourcePoolsBuilder;
+import org.ehcache.expiry.ExpiryPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.cache.Cache;
-import javax.cache.CacheManager;
-import javax.cache.Caching;
-import javax.cache.configuration.FactoryBuilder;
-import javax.cache.configuration.MutableConfiguration;
-import javax.cache.expiry.CreatedExpiryPolicy;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.spi.CachingProvider;
+import java.time.Duration;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 public class FederationJCache extends FederationCache {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FederationJCache.class);
 
-  private Cache<String, CacheRequest<String, ?>> cache;
+  private Cache<String, CacheRequest> cache;
 
   private int cacheTimeToLive;
+  private long cacheEntityNums;
 
   private boolean isCachingEnabled = false;
 
-  private String className = this.getClass().getSimpleName();
+  private final String className = this.getClass().getSimpleName();
 
   @Override
   public boolean isCachingEnabled() {
@@ -64,33 +63,35 @@ public class FederationJCache extends FederationCache {
     // no conflict or pick up a specific one in the future
     cacheTimeToLive = 
pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
         YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+    cacheEntityNums = 
pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS,
+        YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS);
     if (cacheTimeToLive <= 0) {
       isCachingEnabled = false;
       return;
     }
     this.setStateStore(pStateStore);
-    CachingProvider jcacheProvider = Caching.getCachingProvider();
-    CacheManager jcacheManager = jcacheProvider.getCacheManager();
-    this.cache = jcacheManager.getCache(className);
+    CacheManager cacheManager = 
CacheManagerBuilder.newCacheManagerBuilder().build(true);
+
     if (this.cache == null) {
-      LOG.info("Creating a JCache Manager with name {}.", className);
-      Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
-      FactoryBuilder.SingletonFactory<ExpiryPolicy> 
expiryPolicySingletonFactory =
-          new FactoryBuilder.SingletonFactory<>(new 
CreatedExpiryPolicy(cacheExpiry));
-      MutableConfiguration<String, CacheRequest<String, ?>> configuration =
-          new MutableConfiguration<>();
-      configuration.setStoreByValue(false);
-      configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory);
-      this.cache = jcacheManager.createCache(className, configuration);
+      LOG.info("Creating a JCache Manager with name {}. " +
+          "Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, 
cacheTimeToLive,
+          cacheEntityNums);
+      // Set the number of caches
+      ResourcePoolsBuilder poolsBuilder = 
ResourcePoolsBuilder.heap(cacheEntityNums);
+      ExpiryPolicy expiryPolicy = ExpiryPolicyBuilder.timeToLiveExpiration(
+          Duration.ofSeconds(cacheTimeToLive));
+      CacheConfigurationBuilder<String, CacheRequest> configurationBuilder =
+          CacheConfigurationBuilder.newCacheConfigurationBuilder(
+          String.class, CacheRequest.class, poolsBuilder)
+          .withExpiry(expiryPolicy);
+      cache = cacheManager.createCache(className, configurationBuilder);
     }
     isCachingEnabled = true;
   }
 
   @Override
   public void clearCache() {
-    CachingProvider jcacheProvider = Caching.getCachingProvider();
-    CacheManager jcacheManager = jcacheProvider.getCacheManager();
-    jcacheManager.destroyCache(className);
+
     this.cache = null;
   }
 
@@ -142,13 +143,12 @@ public class FederationJCache extends FederationCache {
   }
 
   @VisibleForTesting
-  public Cache<String, CacheRequest<String, ?>> getCache() {
+  public Cache<String, CacheRequest> getCache() {
     return cache;
   }
 
   @VisibleForTesting
-  public String getAppHomeSubClusterCacheKey(ApplicationId appId)
-      throws YarnException {
+  public String getAppHomeSubClusterCacheKey(ApplicationId appId) {
     return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID,
         appId.toString());
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
index e2192e8ae7f2..25f47ab7d0d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
@@ -57,7 +57,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import javax.cache.Cache;
+import org.ehcache.Cache;
 
 /**
  * Unit tests for FederationStateStoreFacade.
@@ -245,7 +245,7 @@ public class TestFederationStateStoreFacade {
       assert fedCache instanceof FederationJCache;
       FederationJCache jCache = (FederationJCache) fedCache;
       String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId);
-      Cache<String, CacheRequest<String, ?>> cache = jCache.getCache();
+      Cache<String, CacheRequest> cache = jCache.getCache();
       CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
       ApplicationHomeSubClusterCacheResponse response =
           
ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to