kaori-seasons commented on issue #7370:
URL: https://github.com/apache/gravitino/issues/7370#issuecomment-2972351246

   ClassLoader resource isolation solution
   
    Core Optimization Strategy 
   Based on the analysis of CatalogManager.java:957-983, the root cause of the 
problem is that each catalog property update recreates 
theCatalogWrapperandIsolatedClassLoader.
   1.  New ClassLoader Pool Manager 
   First, create a new class to manage the reuse of IsolatedClassLoader: 
   // 新增文件: 
core/src/main/java/org/apache/gravitino/catalog/ClassLoaderPool.java  
   package org.apache.gravitino.catalog;  
     
   import com.github.benmanes.caffeine.cache.Cache;  
   import com.github.benmanes.caffeine.cache.Caffeine;  
   import com.github.benmanes.caffeine.cache.RemovalListener;  
   import com.google.common.collect.Maps;  
   import java.io.Closeable;  
   import java.util.Map;  
   import java.util.concurrent.TimeUnit;  
   import java.util.concurrent.atomic.AtomicInteger;  
   import org.apache.gravitino.utils.IsolatedClassLoader;  
   import org.slf4j.Logger;  
   import org.slf4j.LoggerFactory;  
     
   /**  
    * ClassLoader池管理器,用于复用IsolatedClassLoader实例,减少Metaspace内存消耗  
    */  
   public class ClassLoaderPool implements Closeable {  
     private static final Logger LOG = 
LoggerFactory.getLogger(ClassLoaderPool.class);  
       
     // ClassLoader缓存key  
     private static class ClassLoaderKey {  
       private final String provider;  
       private final Map<String, String> criticalProperties;  
         
       public ClassLoaderKey(String provider, Map<String, String> conf) {  
         this.provider = provider;  
         // 只保留影响ClassLoader创建的关键属性  
         this.criticalProperties = extractCriticalProperties(conf);  
       }  
         
       private Map<String, String> extractCriticalProperties(Map<String, 
String> conf) {  
         Map<String, String> critical = Maps.newHashMap();  
         // 只包含影响classloader路径的属性  
         conf.entrySet().stream()  
             .filter(entry -> entry.getKey().contains("package-path") ||   
                             entry.getKey().contains("conf-path") ||  
                             entry.getKey().contains("authorization"))  
             .forEach(entry -> critical.put(entry.getKey(), entry.getValue())); 
 
         return critical;  
       }  
         
       @Override  
       public boolean equals(Object o) {  
         if (this == o) return true;  
         if (!(o instanceof ClassLoaderKey)) return false;  
         ClassLoaderKey that = (ClassLoaderKey) o;  
         return provider.equals(that.provider) &&   
                criticalProperties.equals(that.criticalProperties);  
       }  
         
       @Override  
       public int hashCode() {  
         return provider.hashCode() * 31 + criticalProperties.hashCode();  
       }  
     }  
       
     // ClassLoader包装器,包含引用计数  
     private static class ClassLoaderWrapper {  
       private final IsolatedClassLoader classLoader;  
       private final AtomicInteger refCount;  
         
       public ClassLoaderWrapper(IsolatedClassLoader classLoader) {  
         this.classLoader = classLoader;  
         this.refCount = new AtomicInteger(1);  
       }  
         
       public IsolatedClassLoader acquire() {  
         refCount.incrementAndGet();  
         return classLoader;  
       }  
         
       public boolean release() {  
         return refCount.decrementAndGet() <= 0;  
       }  
         
       public void close() {  
         classLoader.close();  
       }  
     }  
       
     private final Cache<ClassLoaderKey, ClassLoaderWrapper> classLoaderCache;  
       
     public ClassLoaderPool(long maxSize, long expireAfterAccessMinutes) {  
       this.classLoaderCache = Caffeine.newBuilder()  
           .maximumSize(maxSize)  
           .expireAfterAccess(expireAfterAccessMinutes, TimeUnit.MINUTES)  
           .removalListener((RemovalListener<ClassLoaderKey, 
ClassLoaderWrapper>)   
               (key, value, cause) -> {  
                 if (value != null) {  
                   LOG.info("Closing cached ClassLoader for provider: {}", 
key.provider);  
                   value.close();  
                 }  
               })  
           .build();  
     }  
       
     /**  
      * 获取或创建ClassLoader  
      */  
     public IsolatedClassLoader getOrCreate(String provider, Map<String, 
String> conf,   
                                           ClassLoaderFactory factory) {  
       ClassLoaderKey key = new ClassLoaderKey(provider, conf);  
       ClassLoaderWrapper wrapper = classLoaderCache.get(key, k -> {  
         LOG.info("Creating new ClassLoader for provider: {}", provider);  
         return new ClassLoaderWrapper(factory.create(provider, conf));  
       });  
       return wrapper.acquire();  
     }  
       
     /**  
      * 释放ClassLoader引用  
      */  
     public void release(String provider, Map<String, String> conf) {  
       ClassLoaderKey key = new ClassLoaderKey(provider, conf);  
       ClassLoaderWrapper wrapper = classLoaderCache.getIfPresent(key);  
       if (wrapper != null && wrapper.release()) {  
         classLoaderCache.invalidate(key);  
       }  
     }  
       
     @Override  
     public void close() {  
       classLoaderCache.invalidateAll();  
     }  
       
     @FunctionalInterface  
     public interface ClassLoaderFactory {  
       IsolatedClassLoader create(String provider, Map<String, String> conf);  
     }  
   }
   2.  New intelligent attribute update detector 
   Create a class to detect which property changes require a catalog to be 
recreated:
   // 新增文件: 
core/src/main/java/org/apache/gravitino/catalog/CatalogPropertyAnalyzer.java  
   package org.apache.gravitino.catalog;  
     
   import com.google.common.collect.Sets;  
   import java.util.Map;  
   import java.util.Set;  
   import org.apache.gravitino.CatalogChange;  
     
   /**  
    * Catalog属性分析器,用于判断属性变更是否需要重新创建catalog实例  
    */  
   public class CatalogPropertyAnalyzer {  
       
     // 需要重新创建catalog的关键属性前缀  
     private static final Set<String> CRITICAL_PROPERTY_PREFIXES = 
Sets.newHashSet(  
         "gravitino.bypass.",  
         "authentication.",  
         "authorization.",  
         "package-path",  
         "conf-path",  
         "jdbc.url",  
         "jdbc.driver",  
         "metastore.uris",  
         "warehouse"  
     );  
       
     // 只需要热更新的属性前缀  
     private static final Set<String> HOT_UPDATABLE_PREFIXES = Sets.newHashSet( 
 
         "gravitino.catalog.cache.",  
         "description",  
         "comment"  
     );  
       
     /**  
      * 分析catalog变更,判断是否需要重新创建实例  
      */  
     public static ChangeAnalysisResult analyzeCatalogChanges(CatalogChange... 
changes) {  
       boolean needsRecreation = false;  
       boolean hasHotUpdatableChanges = false;  
         
       for (CatalogChange change : changes) {  
         if (change instanceof CatalogChange.SetProperty) {  
           CatalogChange.SetProperty setProperty = (CatalogChange.SetProperty) 
change;  
           String property = setProperty.getProperty();  
             
           if (isCriticalProperty(property)) {  
             needsRecreation = true;  
           } else if (isHotUpdatableProperty(property)) {  
             hasHotUpdatableChanges = true;  
           }  
         } else if (change instanceof CatalogChange.RemoveProperty) {  
           CatalogChange.RemoveProperty removeProperty = 
(CatalogChange.RemoveProperty) change;  
           String property = removeProperty.getProperty();  
             
           if (isCriticalProperty(property)) {  
             needsRecreation = true;  
           }  
         } else {  
           // 其他类型的变更(如重命名)通常不需要重新创建实例  
           hasHotUpdatableChanges = true;  
         }  
       }  
         
       return new ChangeAnalysisResult(needsRecreation, 
hasHotUpdatableChanges);  
     }  
       
     private static boolean isCriticalProperty(String property) {  
       return CRITICAL_PROPERTY_PREFIXES.stream()  
           .anyMatch(property::startsWith);  
     }  
       
     private static boolean isHotUpdatableProperty(String property) {  
       return HOT_UPDATABLE_PREFIXES.stream()  
           .anyMatch(property::startsWith);  
     }  
       
     /**  
      * 变更分析结果  
      */  
     public static class ChangeAnalysisResult {  
       private final boolean needsRecreation;  
       private final boolean hasHotUpdatableChanges;  
         
       public ChangeAnalysisResult(boolean needsRecreation, boolean 
hasHotUpdatableChanges) {  
         this.needsRecreation = needsRecreation;  
         this.hasHotUpdatableChanges = hasHotUpdatableChanges;  
       }  
         
       public boolean needsRecreation() {  
         return needsRecreation;  
       }  
         
       public boolean hasHotUpdatableChanges() {  
         return hasHotUpdatableChanges;  
       }  
     }  
   }
   3. Modify the CatalogManager class
    Optimize the existing CatalogManager.java:112-264 
   // 修改文件: core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
 
     
   public class CatalogManager implements CatalogDispatcher, Closeable {  
     // ... 现有代码保持不变 ...  
       
     // 新增ClassLoader池  
     private ClassLoaderPool classLoaderPool;  
       
     // 新增操作频率限制器  
     private final RateLimiter catalogOperationLimiter;  
       
     // 在构造函数中初始化  
     public CatalogManager(Config config, EntityStore store, IdGenerator 
idGenerator) {  
       // ... 现有初始化代码 ...  
         
       // 初始化ClassLoader池  
       long poolMaxSize = 
config.get(Configs.CATALOG_CLASSLOADER_POOL_MAX_SIZE);  
       long poolExpireMinutes = 
config.get(Configs.CATALOG_CLASSLOADER_POOL_EXPIRE_MINUTES);  
       this.classLoaderPool = new ClassLoaderPool(poolMaxSize, 
poolExpireMinutes);  
         
       // 初始化操作频率限制器  
       double operationsPerSecond = 
config.get(Configs.CATALOG_OPERATIONS_PER_SECOND);  
       this.catalogOperationLimiter = RateLimiter.create(operationsPerSecond);  
     }  
       
     // 修改alterCatalog方法,添加智能更新逻辑  
     @Override  
     public Catalog alterCatalog(NameIdentifier ident, CatalogChange... 
changes)  
         throws NoSuchCatalogException, IllegalArgumentException {  
         
       // 添加频率限制  
       if (!catalogOperationLimiter.tryAcquire(1, TimeUnit.SECONDS)) {  
         throw new IllegalStateException("Catalog operation rate limit 
exceeded");  
       }  
         
       return TreeLockUtils.doWithTreeLock(  
           ident,  
           LockType.WRITE,  
           () -> {  
             // 分析变更类型  
             CatalogPropertyAnalyzer.ChangeAnalysisResult analysisResult =   
                 CatalogPropertyAnalyzer.analyzeCatalogChanges(changes);  
               
             CatalogEntity oldEntity = store.get(ident, EntityType.CATALOG, 
CatalogEntity.class);  
               
             if (!analysisResult.needsRecreation()) {  
               // 只需要热更新,不重新创建catalog实例  
               return performHotUpdate(ident, oldEntity, changes);  
             } else {  
               // 需要重新创建catalog实例  
               return performFullUpdate(ident, oldEntity, changes);  
             }  
           });  
     }  
       
     // 新增热更新方法  
     private Catalog performHotUpdate(NameIdentifier ident, CatalogEntity 
oldEntity,   
                                     CatalogChange... changes) throws 
IOException {  
       LOG.info("Performing hot update for catalog: {}", ident);  
         
       // 更新实体属性但不重新创建wrapper  
       Map<String, String> newProps = 
Maps.newHashMap(oldEntity.getProperties());  
       CatalogEntity.Builder builder = newCatalogBuilder(ident.namespace(), 
oldEntity);  
       CatalogEntity updatedEntity = updateEntity(builder, newProps, changes)  
           .withProperties(newProps)  
           .build();  
         
       // 更新存储  
       store.update(ident, CatalogEntity.class, Entity.EntityType.CATALOG, 
updatedEntity);  
         
       // 获取现有的wrapper并更新其配置  
       CatalogWrapper existingWrapper = catalogCache.getIfPresent(ident);  
       if (existingWrapper != null) {  
         // 热更新wrapper的配置  
         
existingWrapper.catalog().withCatalogConf(updatedEntity.getProperties())  
                                  .withCatalogEntity(updatedEntity);  
       }  
         
       return DTOConverters.toCatalog(updatedEntity, 
existingWrapper.catalog().properties());  
     }  
       
     // 修改完整更新方法  
     private Catalog performFullUpdate(NameIdentifier ident, CatalogEntity 
oldEntity,   
                                      CatalogChange... changes) throws 
IOException {  
       LOG.info("Performing full update for catalog: {}", ident);  
         
       // 先释放旧的ClassLoader引用  
       classLoaderPool.release(oldEntity.getProvider(), 
oldEntity.getProperties());  
         
       // 使缓存失效  
       catalogCache.invalidate(ident);  
         
       // 执行原有的完整更新逻辑  
       Map<String, String> newProps = 
Maps.newHashMap(oldEntity.getProperties());  
       CatalogEntity.Builder builder = newCatalogBuilder(ident.namespace(), 
oldEntity);  
       CatalogEntity updatedEntity = updateEntity(builder, newProps, changes)  
           .withProperties(newProps)  
           .build();  
         
       store.update(ident, CatalogEntity.class, Entity.EntityType.CATALOG, 
updatedEntity);  
         
       return DTOConverters.toCatalog(  
           updatedEntity,  
           catalogCache.get(  
               updatedEntity.nameIdentifier(),  
               id -> createCatalogWrapper(updatedEntity, null))  
           .catalog.properties());  
     }  
       
     // 修改createClassLoader方法,使用ClassLoader池  
     private IsolatedClassLoader createClassLoader(String provider, Map<String, 
String> conf) {
       return classLoaderPool.getOrCreate(provider, conf, (p, c) -> {
         // 原有的createClassLoader逻辑
         if (config.get(Configs.CATALOG_LOAD_ISOLATED)) {
           String catalogPkgPath = buildPkgPath(c, p);
           String catalogConfPath = buildConfPath(c, p);
           ArrayList<String> libAndResourcesPaths = 
Lists.newArrayList(catalogPkgPath, catalogConfPath);
           
BaseAuthorization.buildAuthorizationPkgPath(c).ifPresent(libAndResourcesPaths::add);
           return IsolatedClassLoader.buildClassLoader(libAndResourcesPaths);
         } else {
           return new IsolatedClassLoader(
               Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
         }
       });
     }
     
     // 修改close方法,确保资源正确释放
     @Override
     public void close() {
       catalogCache.invalidateAll();
       if (classLoaderPool != null) {
         classLoaderPool.close();
       }
     }
     
     // 新增辅助方法用于更新catalog实体
     private CatalogEntity.Builder newCatalogBuilder(Namespace namespace, 
CatalogEntity oldEntity) {
       return CatalogEntity.builder()
           .withId(oldEntity.id())
           .withName(oldEntity.name())
           .withNamespace(namespace)
           .withType(oldEntity.getType())
           .withProvider(oldEntity.getProvider())
           .withAuditInfo(oldEntity.auditInfo());
     }
     
     private CatalogEntity.Builder updateEntity(CatalogEntity.Builder builder, 
                                              Map<String, String> newProps, 
                                              CatalogChange... changes) {
       for (CatalogChange change : changes) {
         if (change instanceof CatalogChange.SetProperty) {
           CatalogChange.SetProperty setProperty = (CatalogChange.SetProperty) 
change;
           newProps.put(setProperty.getProperty(), setProperty.getValue());
         } else if (change instanceof CatalogChange.RemoveProperty) {
           CatalogChange.RemoveProperty removeProperty = 
(CatalogChange.RemoveProperty) change;
           newProps.remove(removeProperty.getProperty());
         } else if (change instanceof CatalogChange.RenameCatalog) {
           CatalogChange.RenameCatalog renameCatalog = 
(CatalogChange.RenameCatalog) change;
           builder.withName(renameCatalog.getNewName());
         } else if (change instanceof CatalogChange.UpdateCatalogComment) {
           CatalogChange.UpdateCatalogComment updateComment = 
(CatalogChange.UpdateCatalogComment) change;
           newProps.put("comment", updateComment.getNewComment());
         }
       }
       return builder;
     }
   }
   
   4. New configuration items
   
   // 修改文件: core/src/main/java/org/apache/gravitino/Configs.java
   
   public class Configs {
     // ... 现有配置项 ...
     
     // 新增ClassLoader池相关配置
     public static final ConfigEntry<Long> CATALOG_CLASSLOADER_POOL_MAX_SIZE =
         new ConfigBuilder("gravitino.catalog.classloader.pool.max-size")
             .doc("Maximum size of the catalog ClassLoader pool")
             .version("1.0.0")
             .longConf()
             .createWithDefault(50L);
     
     public static final ConfigEntry<Long> 
CATALOG_CLASSLOADER_POOL_EXPIRE_MINUTES =
         new ConfigBuilder("gravitino.catalog.classloader.pool.expire-minutes")
             .doc("Expiration time in minutes for cached ClassLoaders")
             .version("1.0.0")
             .longConf()
             .createWithDefault(30L);
     
     // 新增操作频率限制配置
     public static final ConfigEntry<Double> CATALOG_OPERATIONS_PER_SECOND =
         new ConfigBuilder("gravitino.catalog.operations.rate-limit")
             .doc("Maximum catalog operations per second")
             .version("1.0.0")
             .doubleConf()
             .createWithDefault(10.0);
   }
   
   5. New frequency limiter
   
   // 新增文件: core/src/main/java/org/apache/gravitino/catalog/RateLimiter.java
   package org.apache.gravitino.catalog;
   
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.atomic.AtomicLong;
   
   /**
    * 简单的频率限制器实现
    */
   public class RateLimiter {
     private final double permitsPerSecond;
     private final AtomicLong lastRefillTime;
     private final AtomicLong availablePermits;
     private final long maxPermits;
     
     private RateLimiter(double permitsPerSecond) {
       this.permitsPerSecond = permitsPerSecond;
       this.maxPermits = (long) Math.max(1, permitsPerSecond);
       this.availablePermits = new AtomicLong(maxPermits);
       this.lastRefillTime = new AtomicLong(System.nanoTime());
     }
     
     public static RateLimiter create(double permitsPerSecond) {
       return new RateLimiter(permitsPerSecond);
     }
     
     public boolean tryAcquire(long timeout, TimeUnit unit) {
       refill();
       return availablePermits.getAndDecrement() > 0;
     }
     
     private void refill() {
       long now = System.nanoTime();
       long lastRefill = lastRefillTime.get();
       
       if (now > lastRefill) {
         long elapsedNanos = now - lastRefill;
         double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
         long permitsToAdd = (long) (elapsedSeconds * permitsPerSecond);
         
         if (permitsToAdd > 0 && lastRefillTime.compareAndSet(lastRefill, now)) 
{
           long currentPermits = availablePermits.get();
           long newPermits = Math.min(maxPermits, currentPermits + 
permitsToAdd);
           availablePermits.set(newPermits);
         }
       }
     }
   }
   
   6. Enhance the CatalogWrapper class
   
   // core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
中的CatalogWrapper类
   
   public static class CatalogWrapper {
     private BaseCatalog catalog;
     private IsolatedClassLoader classLoader;
     private final String provider;
     private final Map<String, String> originalConf;
     
     public CatalogWrapper(BaseCatalog catalog, IsolatedClassLoader 
classLoader) {
       this.catalog = catalog;
       this.classLoader = classLoader;
       this.provider = catalog.provider();
       this.originalConf = Maps.newHashMap(catalog.properties());
     }
     
     // ... 现有方法保持不变 ...
     
     /**
      * 热更新catalog配置,不重新创建ClassLoader
      */
     public void hotUpdateProperties(Map<String, String> newProperties) {
       try {
         classLoader.withClassLoader(cl -> {
           catalog.withCatalogConf(newProperties);
           return null;
         });
       } catch (Exception e) {
         LOG.warn("Failed to hot update catalog properties", e);
         throw new RuntimeException(e);
       }
     }
     
     /**
      * 检查是否可以进行热更新
      */
     public boolean canHotUpdate(CatalogChange... changes) {
       CatalogPropertyAnalyzer.ChangeAnalysisResult result = 
           CatalogPropertyAnalyzer.analyzeCatalogChanges(changes);
       return !result.needsRecreation() && result.hasHotUpdatableChanges();
     }
     
     public String getProvider() {
       return provider;
     }
     
     public Map<String, String> getOriginalConf() {
       return Maps.newHashMap(originalConf);
     }
   }
   
   7. New monitoring and metrics collection
   
    core/src/main/java/org/apache/gravitino/catalog/CatalogMetrics.java
   package org.apache.gravitino.catalog;
   
   import java.util.concurrent.atomic.AtomicLong;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   /**
    * Catalog operation metrics collector
    */
   public class CatalogMetrics {
     private static final Logger LOG = 
LoggerFactory.getLogger(CatalogMetrics.class);
     
     private final AtomicLong classLoaderCreations = new AtomicLong(0);
     private final AtomicLong classLoaderReleases = new AtomicLong(0);
     private final AtomicLong hotUpdates = new AtomicLong(0);
     private final AtomicLong fullUpdates = new AtomicLong(0);
     private final AtomicLong rateLimitHits = new AtomicLong(0);
     
     public void recordClassLoaderCreation() {
       long count = classLoaderCreations.incrementAndGet();
       if (count % 100 == 0) {
         LOG.info("ClassLoader creations: {}", count);
       }
     }
     
     public void recordClassLoaderRelease() {
       classLoaderReleases.incrementAndGet();
     }
     
     public void recordHotUpdate() {
       hotUpdates.incrementAndGet();
     }
     
     public void recordFullUpdate() {
       fullUpdates.incrementAndGet();
     }
     
     public void recordRateLimitHit() {
       rateLimitHits.incrementAndGet();
     }
     
     public void logMetrics() {
       LOG.info("Catalog Metrics - ClassLoader creations: {}, releases: {}, " +
                "hot updates: {}, full updates: {}, rate limit hits: {}",
                classLoaderCreations.get(), classLoaderReleases.get(),
                hotUpdates.get(), fullUpdates.get(), rateLimitHits.get());
     }
   }
   
   8. Modify the import statement
   
   Add the necessary imports at the top of the CatalogManager.java file:
   
   // 在 core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 顶部添加
   import java.util.concurrent.TimeUnit;
   import org.apache.gravitino.catalog.ClassLoaderPool;
   import org.apache.gravitino.catalog.CatalogPropertyAnalyzer;
   import org.apache.gravitino.catalog.RateLimiter;
   import org.apache.gravitino.catalog.CatalogMetrics;
   import org.apache.gravitino.dto.util.DTOConverters;
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to