This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 87965e9b9ad branch-4.0: [refactor](paimon) Per-catalog Paimon metadata 
cache with two-level table+snapshot structure #60478 (#60741)
87965e9b9ad is described below

commit 87965e9b9ad2cd5d769dad434ecd39bc7a71f0a6
Author: Socrates <[email protected]>
AuthorDate: Mon Mar 9 09:29:56 2026 +0800

    branch-4.0: [refactor](paimon) Per-catalog Paimon metadata cache with 
two-level table+snapshot structure #60478 (#60741)
    
    - Cherry-picked from #60478
    - Keep branch-4.0 compatibility in PaimonExternalCatalog without
    introducing #58894
---
 .../doris/datasource/CatalogScopedCacheMgr.java    |  43 +++++
 .../apache/doris/datasource/ExternalCatalog.java   |   1 -
 .../doris/datasource/ExternalMetaCacheMgr.java     | 133 ++++++--------
 .../doris/datasource/ExternalSchemaCache.java      |  13 +-
 .../datasource/iceberg/IcebergExternalCatalog.java |  70 +++-----
 .../datasource/iceberg/IcebergMetadataCache.java   | 135 +++++++-------
 .../doris/datasource/iceberg/IcebergUtils.java     |  14 +-
 .../iceberg/cache/ContentFileEstimator.java        | 194 ---------------------
 .../iceberg/cache/IcebergManifestCache.java        |  27 ++-
 .../iceberg/cache/ManifestCacheValue.java          |  20 +--
 .../doris/datasource/metacache/CacheSpec.java      | 152 ++++++++++++++++
 .../datasource/paimon/PaimonExternalCatalog.java   |  23 +++
 .../datasource/paimon/PaimonExternalTable.java     |  52 +++---
 .../datasource/paimon/PaimonMetadataCache.java     | 152 +++++++++++-----
 .../datasource/paimon/PaimonMetadataCacheMgr.java  |  51 ------
 .../datasource/paimon/PaimonSnapshotCacheKey.java  |  60 -------
 .../datasource/paimon/PaimonTableCacheValue.java   |  61 +++++++
 .../doris/datasource/paimon/PaimonUtils.java       |  64 +++++++
 .../datasource/paimon/source/PaimonScanNode.java   |   5 +-
 .../metastore/AbstractIcebergProperties.java       |  20 +++
 .../org/apache/doris/regression/suite/Suite.groovy |  35 +++-
 .../iceberg/test_iceberg_manifest_cache.groovy     |   5 +-
 .../iceberg/test_iceberg_table_cache.groovy        |   2 +-
 .../iceberg/test_iceberg_table_meta_cache.groovy   |  18 +-
 .../paimon/test_paimon_table_meta_cache.groovy     | 128 ++++++++++++++
 25 files changed, 856 insertions(+), 622 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java
new file mode 100644
index 00000000000..6b9afc1b74a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+public class CatalogScopedCacheMgr<T> {
+    private final Map<Long, T> cacheMap = new ConcurrentHashMap<>();
+    private final Function<ExternalCatalog, T> cacheFactory;
+
+    public CatalogScopedCacheMgr(Function<ExternalCatalog, T> cacheFactory) {
+        this.cacheFactory = cacheFactory;
+    }
+
+    public T getCache(ExternalCatalog catalog) {
+        return cacheMap.computeIfAbsent(catalog.getId(), id -> 
cacheFactory.apply(catalog));
+    }
+
+    public T getCache(long catalogId) {
+        return cacheMap.get(catalogId);
+    }
+
+    public T removeCache(long catalogId) {
+        return cacheMap.remove(catalogId);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 3501fcafb5e..1ccb3801b9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -1450,4 +1450,3 @@ public abstract class ExternalCatalog
         }
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 8d6aa5522f1..9b44833f01a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -36,7 +36,6 @@ import 
org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
 import org.apache.doris.datasource.metacache.MetaCache;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.paimon.PaimonMetadataCache;
-import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
 import org.apache.doris.fs.FileSystemCache;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 
@@ -87,13 +86,10 @@ public class ExternalMetaCacheMgr {
     // This executor is used to schedule the getting split tasks
     private ExecutorService scheduleExecutor;
 
-    // catalog id -> HiveMetaStoreCache
-    private final Map<Long, HiveMetaStoreCache> hiveCacheMap = 
Maps.newConcurrentMap();
-
-    // catalog id -> IcebergMetadataCache
-    private final Map<Long, IcebergMetadataCache> icebergCacheMap = 
Maps.newConcurrentMap();
-    // catalog id -> table schema cache
-    private final Map<Long, ExternalSchemaCache> schemaCacheMap = 
Maps.newHashMap();
+    private final CatalogScopedCacheMgr<HiveMetaStoreCache> 
hiveMetaStoreCacheMgr;
+    private final CatalogScopedCacheMgr<IcebergMetadataCache> 
icebergMetadataCacheMgr;
+    private final CatalogScopedCacheMgr<PaimonMetadataCache> 
paimonMetadataCacheMgr;
+    private final CatalogScopedCacheMgr<ExternalSchemaCache> schemaCacheMgr;
     // hudi partition manager
     private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
     // all catalogs could share the same fsCache.
@@ -101,7 +97,6 @@ public class ExternalMetaCacheMgr {
     // all external table row count cache.
     private ExternalRowCountCache rowCountCache;
     private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
-    private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
     private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
 
     public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
@@ -132,7 +127,15 @@ public class ExternalMetaCacheMgr {
 
         hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
         maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
-        paimonMetadataCacheMgr = new 
PaimonMetadataCacheMgr(commonRefreshExecutor);
+        hiveMetaStoreCacheMgr = new CatalogScopedCacheMgr<>(
+                catalog -> new HiveMetaStoreCache((HMSExternalCatalog) catalog,
+                        commonRefreshExecutor, fileListingExecutor));
+        icebergMetadataCacheMgr = new CatalogScopedCacheMgr<>(
+                catalog -> new IcebergMetadataCache(catalog, 
commonRefreshExecutor));
+        schemaCacheMgr = new CatalogScopedCacheMgr<>(
+                catalog -> new ExternalSchemaCache(catalog, 
commonRefreshExecutor));
+        paimonMetadataCacheMgr = new CatalogScopedCacheMgr<>(
+                catalog -> new PaimonMetadataCache(catalog, 
commonRefreshExecutor));
         dorisExternalMetaCacheMgr = new 
DorisExternalMetaCacheMgr(commonRefreshExecutor);
     }
 
@@ -160,30 +163,11 @@ public class ExternalMetaCacheMgr {
     }
 
     public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
-        HiveMetaStoreCache cache = hiveCacheMap.get(catalog.getId());
-        if (cache == null) {
-            synchronized (hiveCacheMap) {
-                if (!hiveCacheMap.containsKey(catalog.getId())) {
-                    hiveCacheMap.put(catalog.getId(),
-                            new HiveMetaStoreCache(catalog, 
commonRefreshExecutor, fileListingExecutor));
-                }
-                cache = hiveCacheMap.get(catalog.getId());
-            }
-        }
-        return cache;
+        return hiveMetaStoreCacheMgr.getCache(catalog);
     }
 
     public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
-        ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId());
-        if (cache == null) {
-            synchronized (schemaCacheMap) {
-                if (!schemaCacheMap.containsKey(catalog.getId())) {
-                    schemaCacheMap.put(catalog.getId(), new 
ExternalSchemaCache(catalog, commonRefreshExecutor));
-                }
-                cache = schemaCacheMap.get(catalog.getId());
-            }
-        }
-        return cache;
+        return schemaCacheMgr.getCache(catalog);
     }
 
     public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog 
catalog) {
@@ -203,20 +187,11 @@ public class ExternalMetaCacheMgr {
     }
 
     public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog 
catalog) {
-        IcebergMetadataCache cache = icebergCacheMap.get(catalog.getId());
-        if (cache == null) {
-            synchronized (icebergCacheMap) {
-                if (!icebergCacheMap.containsKey(catalog.getId())) {
-                    icebergCacheMap.put(catalog.getId(), new 
IcebergMetadataCache(catalog, commonRefreshExecutor));
-                }
-                cache = icebergCacheMap.get(catalog.getId());
-            }
-        }
-        return cache;
+        return icebergMetadataCacheMgr.getCache(catalog);
     }
 
-    public PaimonMetadataCache getPaimonMetadataCache() {
-        return paimonMetadataCacheMgr.getPaimonMetadataCache();
+    public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog) 
{
+        return paimonMetadataCacheMgr.getCache(catalog);
     }
 
     public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
@@ -236,41 +211,43 @@ public class ExternalMetaCacheMgr {
     }
 
     public void removeCache(long catalogId) {
-        if (hiveCacheMap.remove(catalogId) != null) {
+        if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) {
             LOG.info("remove hive metastore cache for catalog {}", catalogId);
         }
-        synchronized (schemaCacheMap) {
-            if (schemaCacheMap.remove(catalogId) != null) {
-                LOG.info("remove schema cache for catalog {}", catalogId);
-            }
+        if (schemaCacheMgr.removeCache(catalogId) != null) {
+            LOG.info("remove schema cache for catalog {}", catalogId);
         }
-        if (icebergCacheMap.remove(catalogId) != null) {
+        if (icebergMetadataCacheMgr.removeCache(catalogId) != null) {
             LOG.info("remove iceberg meta cache for catalog {}", catalogId);
         }
         hudiMetadataCacheMgr.removeCache(catalogId);
         maxComputeMetadataCacheMgr.removeCache(catalogId);
-        paimonMetadataCacheMgr.removeCache(catalogId);
+        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.removeCache(catalogId);
+        if (paimonMetadataCache != null) {
+            paimonMetadataCache.invalidateCatalogCache(catalogId);
+        }
         dorisExternalMetaCacheMgr.removeCache(catalogId);
     }
 
     public void invalidateTableCache(ExternalTable dorisTable) {
-        synchronized (schemaCacheMap) {
-            ExternalSchemaCache schemaCache = 
schemaCacheMap.get(dorisTable.getCatalog().getId());
-            if (schemaCache != null) {
-                schemaCache.invalidateTableCache(dorisTable);
-            }
+        ExternalSchemaCache schemaCache = 
schemaCacheMgr.getCache(dorisTable.getCatalog().getId());
+        if (schemaCache != null) {
+            schemaCache.invalidateTableCache(dorisTable);
         }
-        HiveMetaStoreCache hiveMetaCache = 
hiveCacheMap.get(dorisTable.getCatalog().getId());
+        HiveMetaStoreCache hiveMetaCache = 
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
         if (hiveMetaCache != null) {
             
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
         }
-        IcebergMetadataCache icebergMetadataCache = 
icebergCacheMap.get(dorisTable.getCatalog().getId());
+        IcebergMetadataCache icebergMetadataCache = 
icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
         if (icebergMetadataCache != null) {
             icebergMetadataCache.invalidateTableCache(dorisTable);
         }
         hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
         maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
-        paimonMetadataCacheMgr.invalidateTableCache(dorisTable);
+        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
+        if (paimonMetadataCache != null) {
+            paimonMetadataCache.invalidateTableCache(dorisTable);
+        }
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid table cache for {}.{} in catalog {}", 
dorisTable.getRemoteDbName(),
                     dorisTable.getRemoteName(), 
dorisTable.getCatalog().getName());
@@ -279,43 +256,45 @@ public class ExternalMetaCacheMgr {
 
     public void invalidateDbCache(long catalogId, String dbName) {
         dbName = ClusterNamespace.getNameFromFullName(dbName);
-        synchronized (schemaCacheMap) {
-            ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
-            if (schemaCache != null) {
-                schemaCache.invalidateDbCache(dbName);
-            }
+        ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId);
+        if (schemaCache != null) {
+            schemaCache.invalidateDbCache(dbName);
         }
-        HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+        HiveMetaStoreCache metaCache = 
hiveMetaStoreCacheMgr.getCache(catalogId);
         if (metaCache != null) {
             metaCache.invalidateDbCache(dbName);
         }
-        IcebergMetadataCache icebergMetadataCache = 
icebergCacheMap.get(catalogId);
+        IcebergMetadataCache icebergMetadataCache = 
icebergMetadataCacheMgr.getCache(catalogId);
         if (icebergMetadataCache != null) {
             icebergMetadataCache.invalidateDbCache(catalogId, dbName);
         }
         hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
         maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
-        paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
+        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.getCache(catalogId);
+        if (paimonMetadataCache != null) {
+            paimonMetadataCache.invalidateDbCache(catalogId, dbName);
+        }
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid db cache for {} in catalog {}", dbName, 
catalogId);
         }
     }
 
     public void invalidateCatalogCache(long catalogId) {
-        synchronized (schemaCacheMap) {
-            schemaCacheMap.remove(catalogId);
-        }
-        HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+        schemaCacheMgr.removeCache(catalogId);
+        HiveMetaStoreCache metaCache = 
hiveMetaStoreCacheMgr.getCache(catalogId);
         if (metaCache != null) {
             metaCache.invalidateAll();
         }
-        IcebergMetadataCache icebergMetadataCache = 
icebergCacheMap.get(catalogId);
+        IcebergMetadataCache icebergMetadataCache = 
icebergMetadataCacheMgr.getCache(catalogId);
         if (icebergMetadataCache != null) {
             icebergMetadataCache.invalidateCatalogCache(catalogId);
         }
         hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
         maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
-        paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
+        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.getCache(catalogId);
+        if (paimonMetadataCache != null) {
+            paimonMetadataCache.invalidateCatalogCache(catalogId);
+        }
         dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid catalog cache for {}", catalogId);
@@ -323,14 +302,12 @@ public class ExternalMetaCacheMgr {
     }
 
     public void invalidSchemaCache(long catalogId) {
-        synchronized (schemaCacheMap) {
-            schemaCacheMap.remove(catalogId);
-        }
+        schemaCacheMgr.removeCache(catalogId);
     }
 
     public void addPartitionsCache(long catalogId, HMSExternalTable table, 
List<String> partitionNames) {
         String dbName = 
ClusterNamespace.getNameFromFullName(table.getDbName());
-        HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+        HiveMetaStoreCache metaCache = 
hiveMetaStoreCacheMgr.getCache(catalogId);
         if (metaCache != null) {
             List<Type> partitionColumnTypes;
             try {
@@ -348,7 +325,7 @@ public class ExternalMetaCacheMgr {
 
     public void dropPartitionsCache(long catalogId, HMSExternalTable table, 
List<String> partitionNames) {
         String dbName = 
ClusterNamespace.getNameFromFullName(table.getDbName());
-        HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+        HiveMetaStoreCache metaCache = 
hiveMetaStoreCacheMgr.getCache(catalogId);
         if (metaCache != null) {
             metaCache.dropPartitionsCache(table, partitionNames, true);
         }
@@ -358,7 +335,7 @@ public class ExternalMetaCacheMgr {
     }
 
     public void invalidatePartitionsCache(ExternalTable dorisTable, 
List<String> partitionNames) {
-        HiveMetaStoreCache metaCache = 
hiveCacheMap.get(dorisTable.getCatalog().getId());
+        HiveMetaStoreCache metaCache = 
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
         if (metaCache != null) {
             for (String partitionName : partitionNames) {
                 metaCache.invalidatePartitionCache(dorisTable, partitionName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index a1c0236eeb4..cf129ea8623 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource;
 
 import org.apache.doris.common.CacheFactory;
 import org.apache.doris.common.Config;
+import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.metric.GaugeMetric;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.MetricLabel;
@@ -26,7 +27,6 @@ import org.apache.doris.metric.MetricRepo;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import lombok.Data;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -49,13 +49,14 @@ public class ExternalSchemaCache {
     }
 
     private void init(ExecutorService executor) {
-        long schemaCacheTtlSecond = NumberUtils.toLong(
-                
(catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND)), 
ExternalCatalog.CACHE_NO_TTL);
+        CacheSpec cacheSpec = CacheSpec.fromTtlValue(
+                
catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND),
+                Config.external_cache_expire_time_seconds_after_access,
+                Config.max_external_schema_cache_num);
         CacheFactory schemaCacheFactory = new CacheFactory(
-                OptionalLong.of(schemaCacheTtlSecond >= 
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
-                        ? schemaCacheTtlSecond : 
Config.external_cache_expire_time_seconds_after_access),
+                CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()),
                 OptionalLong.of(Config.external_cache_refresh_time_minutes * 
60),
-                Config.max_external_schema_cache_num,
+                cacheSpec.getCapacity(),
                 false,
                 null);
         schemaCache = schemaCacheFactory.buildCache(this::loadSchema, 
executor);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 039bba4ed0f..ee8ad8b4fc0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -28,11 +28,11 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
 import 
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -52,12 +52,14 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String ICEBERG_DLF = "dlf";
     public static final String ICEBERG_S3_TABLES = "s3tables";
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
-    public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = 
"iceberg.table.meta.cache.ttl-second";
-    public static final String ICEBERG_MANIFEST_CACHE_ENABLE = 
"iceberg.manifest.cache.enable";
-    public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 
"iceberg.manifest.cache.capacity-mb";
-    public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = 
"iceberg.manifest.cache.ttl-second";
+    public static final String ICEBERG_TABLE_CACHE_ENABLE = 
"meta.cache.iceberg.table.enable";
+    public static final String ICEBERG_TABLE_CACHE_TTL_SECOND = 
"meta.cache.iceberg.table.ttl-second";
+    public static final String ICEBERG_TABLE_CACHE_CAPACITY = 
"meta.cache.iceberg.table.capacity";
+    public static final String ICEBERG_MANIFEST_CACHE_ENABLE = 
"meta.cache.iceberg.manifest.enable";
+    public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = 
"meta.cache.iceberg.manifest.ttl-second";
+    public static final String ICEBERG_MANIFEST_CACHE_CAPACITY = 
"meta.cache.iceberg.manifest.capacity";
     public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = false;
-    public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024;
+    public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY = 1024;
     public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 
60 * 60;
     protected String icebergCatalogType;
     protected Catalog catalog;
@@ -86,51 +88,33 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     @Override
     public void checkProperties() throws DdlException {
         super.checkProperties();
-        // check iceberg.table.meta.cache.ttl-second parameter
-        String tableMetaCacheTtlSecond = 
catalogProperty.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
-        if (Objects.nonNull(tableMetaCacheTtlSecond) && 
NumberUtils.toInt(tableMetaCacheTtlSecond, CACHE_NO_TTL)
-                < CACHE_TTL_DISABLE_CACHE) {
-            throw new DdlException(
-                    "The parameter " + ICEBERG_TABLE_META_CACHE_TTL_SECOND + " 
is wrong, value is "
-                    + tableMetaCacheTtlSecond);
-        }
-
-        String manifestCacheEnable = 
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
-        if (Objects.nonNull(manifestCacheEnable)
-                && !(manifestCacheEnable.equalsIgnoreCase("true") || 
manifestCacheEnable.equalsIgnoreCase("false"))) {
-            throw new DdlException(
-                    "The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is 
wrong, value is "
-                    + manifestCacheEnable);
-        }
-
-        String manifestCacheCapacity = 
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
-        if (Objects.nonNull(manifestCacheCapacity) && 
NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) {
-            throw new DdlException(
-                    "The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + " 
is wrong, value is "
-                    + manifestCacheCapacity);
-        }
-
-        String manifestCacheTtlSecond = 
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
-        if (Objects.nonNull(manifestCacheTtlSecond)
-                && NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) < 
CACHE_TTL_DISABLE_CACHE) {
-            throw new DdlException(
-                    "The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + " 
is wrong, value is "
-                    + manifestCacheTtlSecond);
-        }
+        
CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_ENABLE,
 null),
+                ICEBERG_TABLE_CACHE_ENABLE);
+        
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_TTL_SECOND,
 null),
+                -1L, ICEBERG_TABLE_CACHE_TTL_SECOND);
+        
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_CAPACITY,
 null),
+                0L, ICEBERG_TABLE_CACHE_CAPACITY);
+
+        
CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE,
 null),
+                ICEBERG_MANIFEST_CACHE_ENABLE);
+        
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND,
 null),
+                -1L, ICEBERG_MANIFEST_CACHE_TTL_SECOND);
+        
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY,
 null),
+                0L, ICEBERG_MANIFEST_CACHE_CAPACITY);
         
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
     }
 
     @Override
     public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
         super.notifyPropertiesUpdated(updatedProps);
-        String tableMetaCacheTtl = 
updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
-        if (Objects.nonNull(tableMetaCacheTtl)) {
-            
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
-        }
+        String tableCacheEnable = 
updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_ENABLE, null);
+        String tableCacheTtl = 
updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_TTL_SECOND, null);
+        String tableCacheCapacity = 
updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_CAPACITY, null);
         String manifestCacheEnable = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
-        String manifestCacheCapacity = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
+        String manifestCacheCapacity = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY, null);
         String manifestCacheTtl = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
-        if (Objects.nonNull(manifestCacheEnable) || 
Objects.nonNull(manifestCacheCapacity)
+        if (Objects.nonNull(tableCacheEnable) || 
Objects.nonNull(tableCacheTtl) || Objects.nonNull(tableCacheCapacity)
+                || Objects.nonNull(manifestCacheEnable) || 
Objects.nonNull(manifestCacheCapacity)
                 || Objects.nonNull(manifestCacheTtl)) {
             
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 5f0c0700efe..40c8ba29184 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
+import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -35,7 +36,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -65,28 +65,38 @@ public class IcebergMetadataCache {
     }
 
     public void init() {
-        long tableMetaCacheTtlSecond = NumberUtils.toLong(
-                
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND),
-                ExternalCatalog.CACHE_NO_TTL);
-
+        CacheSpec tableCacheSpec = resolveTableCacheSpec();
         CacheFactory tableCacheFactory = new CacheFactory(
-                OptionalLong.of(tableMetaCacheTtlSecond >= 
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
-                    ? tableMetaCacheTtlSecond : 
Config.external_cache_expire_time_seconds_after_access),
-                OptionalLong.of(Config.external_cache_refresh_time_minutes * 
60),
-                Config.max_external_table_cache_num,
+                CacheSpec.toExpireAfterAccess(tableCacheSpec.getTtlSecond()),
+                OptionalLong.empty(),
+                tableCacheSpec.getCapacity(),
                 true,
                 null);
         this.tableCache = 
tableCacheFactory.buildCache(this::loadTableCacheValue, executor);
         this.viewCache = tableCacheFactory.buildCache(this::loadView, 
executor);
 
-        long manifestCacheCapacityMb = NumberUtils.toLong(
-                
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB),
-                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB);
-        manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L);
-        long manifestCacheTtlSec = NumberUtils.toLong(
-                
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND),
-                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND);
-        this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb, 
manifestCacheTtlSec);
+        CacheSpec manifestCacheSpec = resolveManifestCacheSpec();
+        this.manifestCache = new 
IcebergManifestCache(manifestCacheSpec.getCapacity(),
+                manifestCacheSpec.getTtlSecond());
+    }
+
+    private CacheSpec resolveTableCacheSpec() {
+        return CacheSpec.fromProperties(catalog.getProperties(),
+                IcebergExternalCatalog.ICEBERG_TABLE_CACHE_ENABLE, true,
+                IcebergExternalCatalog.ICEBERG_TABLE_CACHE_TTL_SECOND,
+                Config.external_cache_expire_time_seconds_after_access,
+                IcebergExternalCatalog.ICEBERG_TABLE_CACHE_CAPACITY,
+                Config.max_external_table_cache_num);
+    }
+
+    private CacheSpec resolveManifestCacheSpec() {
+        return CacheSpec.fromProperties(catalog.getProperties(),
+                IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
+                IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
+                IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+                IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
+                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
     }
 
     public Table getIcebergTable(ExternalTable dorisTable) {
@@ -169,74 +179,61 @@ public class IcebergMetadataCache {
     }
 
     public void invalidateCatalogCache(long catalogId) {
-        tableCache.asMap().entrySet().stream()
-                .filter(entry -> entry.getKey().nameMapping.getCtlId() == 
catalogId)
-                .forEach(entry -> {
-                    
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.info("invalidate iceberg table cache {} when 
invalidating catalog cache",
-                                entry.getKey().nameMapping, new Exception());
-                    }
-                    tableCache.invalidate(entry.getKey());
-                });
-
-        viewCache.asMap().entrySet().stream()
-                .filter(entry -> entry.getKey().nameMapping.getCtlId() == 
catalogId)
-                .forEach(entry -> viewCache.invalidate(entry.getKey()));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("invalidate all iceberg table cache when invalidating 
catalog {}", catalogId);
+        }
+        // Invalidate all entries related to the catalog
+        tableCache.invalidateAll();
+        viewCache.invalidateAll();
         manifestCache.invalidateAll();
     }
 
     public void invalidateTableCache(ExternalTable dorisTable) {
-        long catalogId = dorisTable.getCatalog().getId();
+        IcebergMetadataCacheKey key = 
IcebergMetadataCacheKey.of(dorisTable.getOrBuildNameMapping());
+        IcebergTableCacheValue tableCacheValue = tableCache.getIfPresent(key);
+        if (tableCacheValue != null) {
+            invalidateTableCache(key, tableCacheValue);
+        } else {
+            invalidateTableCacheByLocalName(dorisTable);
+        }
+    }
+
+    private void invalidateTableCache(IcebergMetadataCacheKey key, 
IcebergTableCacheValue tableCacheValue) {
+        ManifestFiles.dropCache(tableCacheValue.getIcebergTable().io());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("invalidate iceberg table cache {}", key.nameMapping, 
new Exception());
+        }
+        tableCache.invalidate(key);
+        viewCache.invalidate(key);
+    }
+
+    private void invalidateTableCacheByLocalName(ExternalTable dorisTable) {
         String dbName = dorisTable.getDbName();
         String tblName = dorisTable.getName();
         tableCache.asMap().entrySet().stream()
-                .filter(entry -> {
-                    IcebergMetadataCacheKey key = entry.getKey();
-                    return key.nameMapping.getCtlId() == catalogId
-                            && key.nameMapping.getLocalDbName().equals(dbName)
-                            && 
key.nameMapping.getLocalTblName().equals(tblName);
-                })
-                .forEach(entry -> {
-                    
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.info("invalidate iceberg table cache {}",
-                                entry.getKey().nameMapping, new Exception());
-                    }
-                    tableCache.invalidate(entry.getKey());
-                });
-        viewCache.asMap().entrySet().stream()
-                .filter(entry -> {
-                    IcebergMetadataCacheKey key = entry.getKey();
-                    return key.nameMapping.getCtlId() == catalogId
-                            && key.nameMapping.getLocalDbName().equals(dbName)
-                            && 
key.nameMapping.getLocalTblName().equals(tblName);
-                })
-                .forEach(entry -> viewCache.invalidate(entry.getKey()));
+                .filter(entry -> 
entry.getKey().nameMapping.getLocalDbName().equals(dbName)
+                        && 
entry.getKey().nameMapping.getLocalTblName().equals(tblName))
+                .forEach(entry -> invalidateTableCache(entry.getKey(), 
entry.getValue()));
+        viewCache.asMap().keySet().stream()
+                .filter(key -> key.nameMapping.getLocalDbName().equals(dbName)
+                        && key.nameMapping.getLocalTblName().equals(tblName))
+                .forEach(viewCache::invalidate);
     }
 
     public void invalidateDbCache(long catalogId, String dbName) {
         tableCache.asMap().entrySet().stream()
-                .filter(entry -> {
-                    IcebergMetadataCacheKey key = entry.getKey();
-                    return key.nameMapping.getCtlId() == catalogId
-                            && key.nameMapping.getLocalDbName().equals(dbName);
-                })
+                .filter(entry -> 
entry.getKey().nameMapping.getLocalDbName().equals(dbName))
                 .forEach(entry -> {
                     
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
                     if (LOG.isDebugEnabled()) {
-                        LOG.info("invalidate iceberg table cache {} when 
invalidating db cache",
+                        LOG.debug("invalidate iceberg table cache {} when 
invalidating db cache",
                                 entry.getKey().nameMapping, new Exception());
                     }
                     tableCache.invalidate(entry.getKey());
                 });
-        viewCache.asMap().entrySet().stream()
-                .filter(entry -> {
-                    IcebergMetadataCacheKey key = entry.getKey();
-                    return key.nameMapping.getCtlId() == catalogId
-                            && key.nameMapping.getLocalDbName().equals(dbName);
-                })
-                .forEach(entry -> viewCache.invalidate(entry.getKey()));
+        viewCache.asMap().keySet().stream()
+                .filter(key -> key.nameMapping.getLocalDbName().equals(dbName))
+                .forEach(viewCache::invalidate);
     }
 
     private static void initIcebergTableFileIO(Table table, Map<String, 
String> props) {
@@ -260,6 +257,10 @@ public class IcebergMetadataCache {
             this.nameMapping = nameMapping;
         }
 
+        private static IcebergMetadataCacheKey of(NameMapping nameMapping) {
+            return new IcebergMetadataCacheKey(nameMapping);
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index e14a79cf3ea..75db779d157 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -58,6 +58,7 @@ import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
 import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
+import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.property.metastore.HMSBaseProperties;
@@ -1588,11 +1589,14 @@ public class IcebergUtils {
     }
 
     public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
-        String enabled = 
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE);
-        if (enabled == null) {
-            return 
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE;
-        }
-        return Boolean.parseBoolean(enabled);
+        CacheSpec spec = CacheSpec.fromProperties(catalog.getProperties(),
+                IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
+                IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
+                IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+                IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
+                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
+        return CacheSpec.isCacheEnabled(spec.isEnable(), spec.getTtlSecond(), 
spec.getCapacity());
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
deleted file mode 100644
index 112f161389b..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
+++ /dev/null
@@ -1,194 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg.cache;
-
-import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.StructLike;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
- */
-public final class ContentFileEstimator {
-    private static final long LIST_BASE_WEIGHT = 48L;
-    private static final long OBJECT_REFERENCE_WEIGHT = 8L;
-    private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
-    private static final long STRING_BASE_WEIGHT = 40L;
-    private static final long CHAR_BYTES = 2L;
-    private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
-    private static final long MAP_BASE_WEIGHT = 48L;
-    private static final long MAP_ENTRY_OVERHEAD = 24L;
-    private static final long LONG_OBJECT_WEIGHT = 24L;
-    private static final long INT_OBJECT_WEIGHT = 16L;
-    private static final long PARTITION_BASE_WEIGHT = 48L;
-    private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
-
-    private ContentFileEstimator() {
-    }
-
-    public static long estimate(List<? extends ContentFile<?>> files) {
-        return listReferenceWeight(files) + estimateContentFilesWeight(files);
-    }
-
-    private static long listReferenceWeight(List<?> files) {
-        if (files == null || files.isEmpty()) {
-            return 0L;
-        }
-        return LIST_BASE_WEIGHT + (long) files.size() * 
OBJECT_REFERENCE_WEIGHT;
-    }
-
-    private static long estimateContentFilesWeight(List<? extends 
ContentFile<?>> files) {
-        long total = 0L;
-        if (files == null) {
-            return 0L;
-        }
-        for (ContentFile<?> file : files) {
-            total += estimateContentFileWeight(file);
-        }
-        return total;
-    }
-
-    private static long estimateContentFileWeight(ContentFile<?> file) {
-        if (file == null) {
-            return 0L;
-        }
-
-        long weight = CONTENT_FILE_BASE_WEIGHT;
-        weight += charSequenceWeight(file.path());
-        weight += stringWeight(file.manifestLocation());
-        weight += byteBufferWeight(file.keyMetadata());
-        weight += partitionWeight(file.partition());
-
-        weight += numericMapWeight(file.columnSizes());
-        weight += numericMapWeight(file.valueCounts());
-        weight += numericMapWeight(file.nullValueCounts());
-        weight += numericMapWeight(file.nanValueCounts());
-        weight += byteBufferMapWeight(file.lowerBounds());
-        weight += byteBufferMapWeight(file.upperBounds());
-
-        weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
-        weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
-
-        weight += optionalLongWeight(file.pos());
-        weight += optionalLongWeight(file.dataSequenceNumber());
-        weight += optionalLongWeight(file.fileSequenceNumber());
-        weight += optionalLongWeight(file.firstRowId());
-        weight += optionalIntWeight(file.sortOrderId());
-
-        if (file instanceof DeleteFile) {
-            DeleteFile deleteFile = (DeleteFile) file;
-            weight += stringWeight(deleteFile.referencedDataFile());
-            weight += optionalLongWeight(deleteFile.contentOffset());
-            weight += optionalLongWeight(deleteFile.contentSizeInBytes());
-        }
-
-        return weight;
-    }
-
-    private static long listWeight(List<? extends Number> list, long 
elementWeight) {
-        if (list == null || list.isEmpty()) {
-            return 0L;
-        }
-        return LIST_BASE_WEIGHT + (long) list.size() * 
(OBJECT_REFERENCE_WEIGHT + elementWeight);
-    }
-
-    private static long numericMapWeight(Map<Integer, Long> map) {
-        if (map == null || map.isEmpty()) {
-            return 0L;
-        }
-        return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + 
LONG_OBJECT_WEIGHT);
-    }
-
-    private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
-        if (map == null || map.isEmpty()) {
-            return 0L;
-        }
-        long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
-        for (ByteBuffer buffer : map.values()) {
-            weight += byteBufferWeight(buffer);
-        }
-        return weight;
-    }
-
-    private static long partitionWeight(StructLike partition) {
-        if (partition == null) {
-            return 0L;
-        }
-        long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * 
PARTITION_VALUE_BASE_WEIGHT;
-        for (int i = 0; i < partition.size(); i++) {
-            Object value = partition.get(i, Object.class);
-            weight += estimateValueWeight(value);
-        }
-        return weight;
-    }
-
-    private static long estimateValueWeight(Object value) {
-        if (value == null) {
-            return 0L;
-        }
-        if (value instanceof CharSequence) {
-            return charSequenceWeight((CharSequence) value);
-        } else if (value instanceof byte[]) {
-            return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
-        } else if (value instanceof ByteBuffer) {
-            return byteBufferWeight((ByteBuffer) value);
-        } else if (value instanceof Long || value instanceof Double) {
-            return LONG_OBJECT_WEIGHT;
-        } else if (value instanceof Integer || value instanceof Float) {
-            return INT_OBJECT_WEIGHT;
-        } else if (value instanceof Short || value instanceof Character) {
-            return 4L;
-        } else if (value instanceof Boolean) {
-            return 1L;
-        }
-        return OBJECT_REFERENCE_WEIGHT;
-    }
-
-    private static long charSequenceWeight(CharSequence value) {
-        if (value == null) {
-            return 0L;
-        }
-        return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
-    }
-
-    private static long stringWeight(String value) {
-        if (value == null) {
-            return 0L;
-        }
-        return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
-    }
-
-    private static long byteBufferWeight(ByteBuffer buffer) {
-        if (buffer == null) {
-            return 0L;
-        }
-        return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
-    }
-
-    private static long optionalLongWeight(Long value) {
-        return value == null ? 0L : LONG_OBJECT_WEIGHT;
-    }
-
-    private static long optionalIntWeight(Integer value) {
-        return value == null ? 0L : INT_OBJECT_WEIGHT;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
index 6c5d79ecb69..6016b2ab7e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
@@ -17,16 +17,15 @@
 
 package org.apache.doris.datasource.iceberg.cache;
 
+import org.apache.doris.common.CacheFactory;
 import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.metacache.CacheSpec;
 
 import com.github.benmanes.caffeine.cache.CacheLoader;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.github.benmanes.caffeine.cache.Weigher;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 
@@ -38,20 +37,14 @@ public class IcebergManifestCache {
 
     private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache;
 
-    public IcebergManifestCache(long capacityMb, long ttlSec) {
-        long capacityInBytes = capacityMb * 1024L * 1024L;
-        Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value) 
-> {
-            long weight = 
Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L);
-            if (weight > Integer.MAX_VALUE) {
-                return Integer.MAX_VALUE;
-            }
-            return (int) weight;
-        };
-        Caffeine<ManifestCacheKey, ManifestCacheValue> builder = 
Caffeine.newBuilder()
-                .maximumWeight(capacityInBytes)
-                .weigher(weigher)
-                .expireAfterAccess(Duration.ofSeconds(ttlSec));
-        cache = builder.build(new CacheLoader<ManifestCacheKey, 
ManifestCacheValue>() {
+    public IcebergManifestCache(long capacity, long ttlSec) {
+        CacheFactory cacheFactory = new CacheFactory(
+                CacheSpec.toExpireAfterAccess(ttlSec),
+                java.util.OptionalLong.empty(),
+                capacity,
+                true,
+                null);
+        cache = cacheFactory.buildCache(new CacheLoader<ManifestCacheKey, 
ManifestCacheValue>() {
             @Override
             public ManifestCacheValue load(ManifestCacheKey key) {
                 throw new CacheException("Manifest cache loader should be 
provided explicitly for key %s", null, key);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
index 91e2f6db72f..e98ca6b2fb2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
@@ -24,27 +24,23 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Cached manifest payload containing parsed files and an estimated weight.
+ * Cached manifest payload containing parsed files.
  */
 public class ManifestCacheValue {
     private final List<DataFile> dataFiles;
     private final List<DeleteFile> deleteFiles;
-    private final long weightBytes;
 
-    private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile> 
deleteFiles, long weightBytes) {
+    private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile> 
deleteFiles) {
         this.dataFiles = dataFiles == null ? Collections.emptyList() : 
dataFiles;
         this.deleteFiles = deleteFiles == null ? Collections.emptyList() : 
deleteFiles;
-        this.weightBytes = weightBytes;
     }
 
     public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) {
-        return new ManifestCacheValue(dataFiles, Collections.emptyList(),
-                estimateWeight(dataFiles, Collections.emptyList()));
+        return new ManifestCacheValue(dataFiles, Collections.emptyList());
     }
 
     public static ManifestCacheValue forDeleteFiles(List<DeleteFile> 
deleteFiles) {
-        return new ManifestCacheValue(Collections.emptyList(), deleteFiles,
-                estimateWeight(Collections.emptyList(), deleteFiles));
+        return new ManifestCacheValue(Collections.emptyList(), deleteFiles);
     }
 
     public List<DataFile> getDataFiles() {
@@ -54,12 +50,4 @@ public class ManifestCacheValue {
     public List<DeleteFile> getDeleteFiles() {
         return deleteFiles;
     }
-
-    public long getWeightBytes() {
-        return weightBytes;
-    }
-
-    private static long estimateWeight(List<DataFile> dataFiles, 
List<DeleteFile> deleteFiles) {
-        return ContentFileEstimator.estimate(dataFiles) + 
ContentFileEstimator.estimate(deleteFiles);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java
new file mode 100644
index 00000000000..ca0f1be330d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.metacache;
+
+import org.apache.doris.common.DdlException;
+
+import org.apache.commons.lang3.math.NumberUtils;
+
+import java.util.Map;
+import java.util.OptionalLong;
+
+/**
+ * Common cache specification for external metadata caches.
+ *
+ * <p>Semantics:
+ * <ul>
+ *   <li>enable=false disables cache</li>
+ *   <li>ttlSecond=0 disables cache, ttlSecond=-1 means no expiration</li>
+ *   <li>capacity=0 disables cache; capacity is count-based</li>
+ * </ul>
+ */
+public final class CacheSpec {
+    public static final long CACHE_NO_TTL = -1L;
+    public static final long CACHE_TTL_DISABLE_CACHE = 0L;
+
+    private final boolean enable;
+    private final long ttlSecond;
+    private final long capacity;
+
+    private CacheSpec(boolean enable, long ttlSecond, long capacity) {
+        this.enable = enable;
+        this.ttlSecond = ttlSecond;
+        this.capacity = capacity;
+    }
+
+    public static CacheSpec fromProperties(Map<String, String> properties,
+            String enableKey, boolean defaultEnable,
+            String ttlKey, long defaultTtlSecond,
+            String capacityKey, long defaultCapacity) {
+        boolean enable = getBooleanProperty(properties, enableKey, 
defaultEnable);
+        long ttlSecond = getLongProperty(properties, ttlKey, defaultTtlSecond);
+        long capacity = getLongProperty(properties, capacityKey, 
defaultCapacity);
+        if (!isCacheEnabled(enable, ttlSecond, capacity)) {
+            capacity = 0;
+        }
+        return new CacheSpec(enable, ttlSecond, capacity);
+    }
+
+    /**
+     * Build a cache spec from a ttl property value and fixed capacity.
+     *
+     * <p>Semantics are compatible with legacy schema cache behavior:
+     * <ul>
+     *   <li>ttlValue is null: use default ttl</li>
+     *   <li>ttl=-1: no expiration</li>
+     *   <li>ttl=0: disable cache by forcing capacity=0</li>
+     *   <li>ttl parse failure: fallback to -1 (no expiration)</li>
+     * </ul>
+     * TODO: Refactor schema cache and its parameters to the unified 
enable/ttl/capacity model,
+     * then remove this ttl-only adapter.
+     */
+    public static CacheSpec fromTtlValue(String ttlValue, long 
defaultTtlSecond, long defaultCapacity) {
+        long ttlSecond = ttlValue == null ? defaultTtlSecond : 
NumberUtils.toLong(ttlValue, CACHE_NO_TTL);
+        long capacity = defaultCapacity;
+        if (!isCacheEnabled(true, ttlSecond, capacity)) {
+            capacity = 0;
+        }
+        return new CacheSpec(true, ttlSecond, capacity);
+    }
+
+    public static void checkBooleanProperty(String value, String key) throws 
DdlException {
+        if (value == null) {
+            return;
+        }
+        if (!value.equalsIgnoreCase("true") && 
!value.equalsIgnoreCase("false")) {
+            throw new DdlException("The parameter " + key + " is wrong, value 
is " + value);
+        }
+    }
+
+    public static void checkLongProperty(String value, long minValue, String 
key) throws DdlException {
+        if (value == null) {
+            return;
+        }
+        long parsed;
+        try {
+            parsed = Long.parseLong(value);
+        } catch (NumberFormatException e) {
+            throw new DdlException("The parameter " + key + " is wrong, value 
is " + value);
+        }
+        if (parsed < minValue) {
+            throw new DdlException("The parameter " + key + " is wrong, value 
is " + value);
+        }
+    }
+
+    public static boolean isCacheEnabled(boolean enable, long ttlSecond, long 
capacity) {
+        return enable && ttlSecond != 0 && capacity != 0;
+    }
+
+    /**
+     * Convert ttlSecond to OptionalLong for CacheFactory.
+     * ttlSecond=-1 means no expiration; ttlSecond=0 disables cache.
+     */
+    public static OptionalLong toExpireAfterAccess(long ttlSecond) {
+        if (ttlSecond == CACHE_NO_TTL) {
+            return OptionalLong.empty();
+        }
+        return OptionalLong.of(Math.max(ttlSecond, CACHE_TTL_DISABLE_CACHE));
+    }
+
+    private static boolean getBooleanProperty(Map<String, String> properties, 
String key, boolean defaultValue) {
+        String value = properties.get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        return Boolean.parseBoolean(value);
+    }
+
+    private static long getLongProperty(Map<String, String> properties, String 
key, long defaultValue) {
+        String value = properties.get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        return NumberUtils.toLong(value, defaultValue);
+    }
+
+    public boolean isEnable() {
+        return enable;
+    }
+
+    public long getTtlSecond() {
+        return ttlSecond;
+    }
+
+    public long getCapacity() {
+        return capacity;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 2dd3c0c8c6b..b6a06fd4670 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -36,6 +37,7 @@ import org.apache.paimon.partition.Partition;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 // The subclasses of this class are all deprecated, only for meta persistence 
compatibility.
 public class PaimonExternalCatalog extends ExternalCatalog {
@@ -45,6 +47,9 @@ public class PaimonExternalCatalog extends ExternalCatalog {
     public static final String PAIMON_HMS = "hms";
     public static final String PAIMON_DLF = "dlf";
     public static final String PAIMON_REST = "rest";
+    public static final String PAIMON_TABLE_CACHE_ENABLE = 
"meta.cache.paimon.table.enable";
+    public static final String PAIMON_TABLE_CACHE_TTL_SECOND = 
"meta.cache.paimon.table.ttl-second";
+    public static final String PAIMON_TABLE_CACHE_CAPACITY = 
"meta.cache.paimon.table.capacity";
     protected String catalogType;
     protected Catalog catalog;
 
@@ -188,9 +193,27 @@ public class PaimonExternalCatalog extends ExternalCatalog 
{
     @Override
     public void checkProperties() throws DdlException {
         super.checkProperties();
+        
CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_ENABLE,
 null),
+                PAIMON_TABLE_CACHE_ENABLE);
+        
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_TTL_SECOND,
 null),
+                -1L, PAIMON_TABLE_CACHE_TTL_SECOND);
+        
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_CAPACITY,
 null),
+                0L, PAIMON_TABLE_CACHE_CAPACITY);
         
catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class);
     }
 
+    @Override
+    public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
+        super.notifyPropertiesUpdated(updatedProps);
+        String tableCacheEnable = 
updatedProps.getOrDefault(PAIMON_TABLE_CACHE_ENABLE, null);
+        String tableCacheTtl = 
updatedProps.getOrDefault(PAIMON_TABLE_CACHE_TTL_SECOND, null);
+        String tableCacheCapacity = 
updatedProps.getOrDefault(PAIMON_TABLE_CACHE_CAPACITY, null);
+        if (Objects.nonNull(tableCacheEnable) || Objects.nonNull(tableCacheTtl)
+                || Objects.nonNull(tableCacheCapacity)) {
+            PaimonUtils.getPaimonMetadataCache(this).init();
+        }
+    }
+
     @Override
     public void onClose() {
         super.onClose();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 174bfa64a2d..1782bae59b0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -20,7 +20,6 @@ package org.apache.doris.datasource.paimon;
 import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionType;
@@ -76,12 +75,9 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
 
     private static final Logger LOG = 
LogManager.getLogger(PaimonExternalTable.class);
 
-    private final Table paimonTable;
-
     public PaimonExternalTable(long id, String name, String remoteName, 
PaimonExternalCatalog catalog,
             PaimonExternalDatabase db) {
         super(id, name, remoteName, catalog, db, 
TableType.PAIMON_EXTERNAL_TABLE);
-        this.paimonTable = catalog.getPaimonTable(getOrBuildNameMapping());
     }
 
     public String getPaimonCatalogType() {
@@ -96,7 +92,13 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
     }
 
     public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
-        return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
+        if (snapshot.isPresent()) {
+            // MTMV scenario: get from snapshot cache
+            return 
getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
+        } else {
+            // Normal query scenario: get directly from table cache
+            return PaimonUtils.getPaimonTable(this);
+        }
     }
 
     private PaimonSnapshotCacheValue 
getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
@@ -109,7 +111,8 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
             // use the specified snapshot and the corresponding schema(not the 
latest
             // schema).
             try {
-                DataTable dataTable = (DataTable) paimonTable;
+                Table baseTable = getBasePaimonTable();
+                DataTable dataTable = (DataTable) baseTable;
                 Snapshot snapshot;
                 Map<String, String> scanOptions = new HashMap<>();
 
@@ -135,18 +138,19 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
                     scanOptions.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
                 }
 
-                Table scanTable = paimonTable.copy(scanOptions);
+                Table scanTable = baseTable.copy(scanOptions);
                 return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
                         new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), 
scanTable));
             } catch (Exception e) {
-                LOG.warn("Failed to get Paimon snapshot for table {}", 
paimonTable.name(), e);
+                LOG.warn("Failed to get Paimon snapshot for table {}", 
getOrBuildNameMapping().getFullLocalName(), e);
                 throw new RuntimeException(
                         "Failed to get Paimon snapshot: " + (e.getMessage() == 
null ? "unknown cause" : e.getMessage()),
                         e);
             }
         } else if (scanParams.isPresent() && scanParams.get().isBranch()) {
             try {
-                String branch = 
PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable);
+                Table baseTable = getBasePaimonTable();
+                String branch = 
PaimonUtil.resolvePaimonBranch(scanParams.get(), baseTable);
                 Table table = ((PaimonExternalCatalog) 
catalog).getPaimonTable(getOrBuildNameMapping(), branch, null);
                 Optional<Snapshot> latestSnapshot = table.latestSnapshot();
                 long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
@@ -160,15 +164,14 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
                 return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
                         new PaimonSnapshot(latestSnapshotId, schemaId, 
dataTable));
             } catch (Exception e) {
-                LOG.warn("Failed to get Paimon branch for table {}", 
paimonTable.name(), e);
+                LOG.warn("Failed to get Paimon branch for table {}", 
getOrBuildNameMapping().getFullLocalName(), e);
                 throw new RuntimeException(
                         "Failed to get Paimon branch: " + (e.getMessage() == 
null ? "unknown cause" : e.getMessage()),
                         e);
             }
         } else {
             // Otherwise, use the latest snapshot and the latest schema.
-            return 
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
-                    .getPaimonSnapshot(this);
+            return PaimonUtils.getLatestSnapshotCacheValue(this);
         }
     }
 
@@ -201,7 +204,7 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
     public long fetchRowCount() {
         makeSureInitialized();
         long rowCount = 0;
-        List<Split> splits = 
paimonTable.newReadBuilder().newScan().plan().splits();
+        List<Split> splits = 
getBasePaimonTable().newReadBuilder().newScan().plan().splits();
         for (Split split : splits) {
             rowCount += split.rowCount();
         }
@@ -320,7 +323,7 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
         makeSureInitialized();
         PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
         try {
-            Table table = ((PaimonExternalCatalog) 
getCatalog()).getPaimonTable(getOrBuildNameMapping());
+            Table table = getBasePaimonTable();
             TableSchema tableSchema = ((DataTable) 
table).schemaManager().schema(paimonSchemaCacheKey.getSchemaId());
             List<DataField> columns = tableSchema.fields();
             List<Column> dorisColumns = 
Lists.newArrayListWithCapacity(columns.size());
@@ -353,15 +356,15 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
 
     private PaimonSchemaCacheValue 
getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
         PaimonSnapshotCacheValue snapshotCacheValue = 
getOrFetchSnapshotCacheValue(snapshot);
-        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
-                .getPaimonSchemaCacheValue(getOrBuildNameMapping(), 
snapshotCacheValue.getSnapshot().getSchemaId());
+        return PaimonUtils.getSchemaCacheValue(this, snapshotCacheValue);
     }
 
     private PaimonSnapshotCacheValue 
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
         if (snapshot.isPresent()) {
             return ((PaimonMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
         } else {
-            return getPaimonSnapshotCacheValue(Optional.empty(), 
Optional.empty());
+            // Use new lazy-loading snapshot cache API
+            return PaimonUtils.getSnapshotCacheValue(snapshot, this);
         }
     }
 
@@ -373,13 +376,14 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
 
     @Override
     public String getComment() {
-        return paimonTable.comment().isPresent() ? paimonTable.comment().get() 
: "";
+        Table table = getBasePaimonTable();
+        return table.comment().isPresent() ? table.comment().get() : "";
     }
 
     public Map<String, String> getTableProperties() {
-
-        if (paimonTable instanceof DataTable) {
-            DataTable dataTable = (DataTable) paimonTable;
+        Table table = getBasePaimonTable();
+        if (table instanceof DataTable) {
+            DataTable dataTable = (DataTable) table;
             Map<String, String> properties = new 
LinkedHashMap<>(dataTable.coreOptions().toMap());
 
             if (!dataTable.primaryKeys().isEmpty()) {
@@ -395,6 +399,10 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
     @Override
     public boolean isPartitionedTable() {
         makeSureInitialized();
-        return !paimonTable.partitionKeys().isEmpty();
+        return !getBasePaimonTable().partitionKeys().isEmpty();
+    }
+
+    private Table getBasePaimonTable() {
+        return PaimonUtils.getPaimonTable(this);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
index fc58a7f15eb..7f118490fdd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -29,6 +29,7 @@ import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.metacache.CacheSpec;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Maps;
@@ -51,31 +52,65 @@ import java.util.concurrent.ExecutorService;
 
 public class PaimonMetadataCache {
 
-    private final LoadingCache<PaimonSnapshotCacheKey, 
PaimonSnapshotCacheValue> snapshotCache;
+    private final ExecutorService executor;
+    private final ExternalCatalog catalog;
+    private LoadingCache<PaimonTableCacheKey, PaimonTableCacheValue> 
tableCache;
 
-    public PaimonMetadataCache(ExecutorService executor) {
-        CacheFactory snapshotCacheFactory = new CacheFactory(
-                
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
-                OptionalLong.of(Config.external_cache_refresh_time_minutes * 
60),
-                Config.max_external_table_cache_num,
+    public PaimonMetadataCache(ExternalCatalog catalog, ExecutorService 
executor) {
+        this.executor = executor;
+        this.catalog = catalog;
+        init();
+    }
+
+    public void init() {
+        CacheSpec cacheSpec = resolveTableCacheSpec();
+        CacheFactory tableCacheFactory = new CacheFactory(
+                CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()),
+                OptionalLong.empty(),
+                cacheSpec.getCapacity(),
                 true,
                 null);
-        this.snapshotCache = snapshotCacheFactory.buildCache(key -> 
loadSnapshot(key), executor);
+        this.tableCache = tableCacheFactory.buildCache(key -> 
loadTableCacheValue(key), executor);
+    }
+
+    private CacheSpec resolveTableCacheSpec() {
+        return CacheSpec.fromProperties(catalog.getProperties(),
+                PaimonExternalCatalog.PAIMON_TABLE_CACHE_ENABLE, true,
+                PaimonExternalCatalog.PAIMON_TABLE_CACHE_TTL_SECOND,
+                Config.external_cache_expire_time_seconds_after_access,
+                PaimonExternalCatalog.PAIMON_TABLE_CACHE_CAPACITY,
+                Config.max_external_table_cache_num);
     }
 
     @NotNull
-    private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) {
+    private PaimonTableCacheValue loadTableCacheValue(PaimonTableCacheKey key) 
{
         NameMapping nameMapping = key.getNameMapping();
         try {
-            PaimonSnapshot latestSnapshot = loadLatestSnapshot(key);
+            PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr()
+                    .getCatalogOrException(nameMapping.getCtlId(),
+                            id -> new IOException("Catalog not found: " + id));
+            Table table = externalCatalog.getPaimonTable(nameMapping);
+            return new PaimonTableCacheValue(table);
+        } catch (Exception e) {
+            throw new CacheException("failed to load paimon table %s.%s.%s: 
%s",
+                    e, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
+                    nameMapping.getLocalTblName(), e.getMessage());
+        }
+    }
+
+    @NotNull
+    private PaimonSnapshotCacheValue loadSnapshot(ExternalTable dorisTable, 
Table paimonTable) {
+        NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
+        try {
+            PaimonSnapshot latestSnapshot = loadLatestSnapshot(paimonTable, 
nameMapping);
             List<Column> partitionColumns = 
getPaimonSchemaCacheValue(nameMapping,
                     latestSnapshot.getSchemaId()).getPartitionColumns();
-            PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, 
partitionColumns);
+            PaimonPartitionInfo partitionInfo = loadPartitionInfo(nameMapping, 
partitionColumns);
             return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot);
         } catch (Exception e) {
-            throw new CacheException("failed to load paimon snapshot %s.%s.%s 
or reason: %s",
-                    e, nameMapping.getCtlId(), nameMapping.getLocalDbName(), 
nameMapping.getLocalTblName(),
-                    e.getMessage());
+            throw new CacheException("failed to load paimon snapshot %s.%s.%s: 
%s",
+                    e, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
+                    nameMapping.getLocalTblName(), e.getMessage());
         }
     }
 
@@ -97,67 +132,96 @@ public class PaimonMetadataCache {
         return (PaimonSchemaCacheValue) schemaCacheValue.get();
     }
 
-    private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, 
List<Column> partitionColumns)
+    private PaimonPartitionInfo loadPartitionInfo(NameMapping nameMapping, 
List<Column> partitionColumns)
             throws AnalysisException {
         if (CollectionUtils.isEmpty(partitionColumns)) {
             return PaimonPartitionInfo.EMPTY;
         }
-        NameMapping nameMapping = key.getNameMapping();
         PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr()
                 .getCatalogOrAnalysisException(nameMapping.getCtlId());
         List<Partition> paimonPartitions = 
externalCatalog.getPaimonPartitions(nameMapping);
         return PaimonUtil.generatePartitionInfo(partitionColumns, 
paimonPartitions);
     }
 
-    private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) 
throws IOException {
-        NameMapping nameMapping = key.getNameMapping();
-        PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr()
-                .getCatalogOrException(nameMapping.getCtlId(), id -> new 
IOException("Catalog not found: " + id));
-        Table table = externalCatalog.getPaimonTable(nameMapping);
-        Table snapshotTable = table;
+    private PaimonSnapshot loadLatestSnapshot(Table paimonTable, NameMapping 
nameMapping) {
+        Table snapshotTable = paimonTable;
         // snapshotId and schemaId
         Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
-        Optional<Snapshot> optionalSnapshot = table.latestSnapshot();
+        Optional<Snapshot> optionalSnapshot = paimonTable.latestSnapshot();
         if (optionalSnapshot.isPresent()) {
             latestSnapshotId = optionalSnapshot.get().id();
-            snapshotTable =
-                
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
latestSnapshotId.toString()));
+            snapshotTable = paimonTable.copy(
+                    
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
latestSnapshotId.toString()));
         }
-        DataTable dataTable = (DataTable) table;
+        DataTable dataTable = (DataTable) paimonTable;
         long latestSchemaId = 
dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
         return new PaimonSnapshot(latestSnapshotId, latestSchemaId, 
snapshotTable);
     }
 
+    public Table getPaimonTable(ExternalTable dorisTable) {
+        PaimonTableCacheKey key = new 
PaimonTableCacheKey(dorisTable.getOrBuildNameMapping());
+        return tableCache.get(key).getPaimonTable();
+    }
+
+    public Table getPaimonTable(PaimonTableCacheKey key) {
+        return tableCache.get(key).getPaimonTable();
+    }
+
+    public PaimonSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) 
{
+        PaimonTableCacheKey key = new 
PaimonTableCacheKey(dorisTable.getOrBuildNameMapping());
+        PaimonTableCacheValue tableCacheValue = tableCache.get(key);
+        return tableCacheValue.getSnapshotCacheValue(() -> 
loadSnapshot(dorisTable,
+                tableCacheValue.getPaimonTable()));
+    }
+
     public void invalidateCatalogCache(long catalogId) {
-        snapshotCache.asMap().keySet().stream()
-                .filter(key -> key.getNameMapping().getCtlId() == catalogId)
-                .forEach(snapshotCache::invalidate);
+        tableCache.invalidateAll();
     }
 
     public void invalidateTableCache(ExternalTable dorisTable) {
-        snapshotCache.asMap().keySet().stream()
-                .filter(key -> key.getNameMapping().getCtlId() == 
dorisTable.getCatalog().getId()
-                        && 
key.getNameMapping().getLocalDbName().equals(dorisTable.getDbName())
-                        && 
key.getNameMapping().getLocalTblName().equals(dorisTable.getName()))
-                .forEach(snapshotCache::invalidate);
+        PaimonTableCacheKey key = new 
PaimonTableCacheKey(dorisTable.getOrBuildNameMapping());
+        tableCache.invalidate(key);
     }
 
     public void invalidateDbCache(long catalogId, String dbName) {
-        snapshotCache.asMap().keySet().stream()
-                .filter(key -> key.getNameMapping().getCtlId() == catalogId
-                        && 
key.getNameMapping().getLocalTblName().equals(dbName))
-                .forEach(snapshotCache::invalidate);
-    }
-
-    public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable 
dorisTable) {
-        PaimonSnapshotCacheKey key = new 
PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping());
-        return snapshotCache.get(key);
+        tableCache.asMap().keySet().stream()
+                .filter(key -> 
key.getNameMapping().getLocalDbName().equals(dbName))
+                .forEach(tableCache::invalidate);
     }
 
     public Map<String, Map<String, String>> getCacheStats() {
         Map<String, Map<String, String>> res = Maps.newHashMap();
-        res.put("paimon_snapshot_cache", 
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
-                snapshotCache.estimatedSize()));
+        res.put("paimon_table_cache", 
ExternalMetaCacheMgr.getCacheStats(tableCache.stats(),
+                tableCache.estimatedSize()));
         return res;
     }
+
+    static class PaimonTableCacheKey {
+        private final NameMapping nameMapping;
+
+        public PaimonTableCacheKey(NameMapping nameMapping) {
+            this.nameMapping = nameMapping;
+        }
+
+        public NameMapping getNameMapping() {
+            return nameMapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PaimonTableCacheKey that = (PaimonTableCacheKey) o;
+            return nameMapping.equals(that.nameMapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return nameMapping.hashCode();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
deleted file mode 100644
index 4493f918939..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.paimon;
-
-import org.apache.doris.datasource.ExternalTable;
-
-import java.util.concurrent.ExecutorService;
-
-public class PaimonMetadataCacheMgr {
-
-    private PaimonMetadataCache paimonMetadataCache;
-
-    public PaimonMetadataCacheMgr(ExecutorService executor) {
-        this.paimonMetadataCache = new PaimonMetadataCache(executor);
-    }
-
-    public PaimonMetadataCache getPaimonMetadataCache() {
-        return paimonMetadataCache;
-    }
-
-    public void removeCache(long catalogId) {
-        paimonMetadataCache.invalidateCatalogCache(catalogId);
-    }
-
-    public void invalidateCatalogCache(long catalogId) {
-        paimonMetadataCache.invalidateCatalogCache(catalogId);
-    }
-
-    public void invalidateTableCache(ExternalTable dorisTable) {
-        paimonMetadataCache.invalidateTableCache(dorisTable);
-    }
-
-    public void invalidateDbCache(long catalogId, String dbName) {
-        paimonMetadataCache.invalidateDbCache(catalogId, dbName);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
deleted file mode 100644
index 6154d607f0b..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.paimon;
-
-import org.apache.doris.datasource.NameMapping;
-
-import java.util.StringJoiner;
-
-public class PaimonSnapshotCacheKey {
-    private final NameMapping nameMapping;
-
-    public PaimonSnapshotCacheKey(NameMapping nameMapping) {
-        this.nameMapping = nameMapping;
-    }
-
-    public NameMapping getNameMapping() {
-        return nameMapping;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o;
-        return nameMapping.equals(that.nameMapping);
-    }
-
-    @Override
-    public int hashCode() {
-        return nameMapping.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return new StringJoiner(", ", 
PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]")
-                .add("catalog=" + nameMapping.getCtlId())
-                .add("dbName='" + nameMapping.getLocalDbName() + "'")
-                .add("tableName='" + nameMapping.getLocalTblName() + "'")
-                .toString();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java
new file mode 100644
index 00000000000..cbbd9076b65
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.paimon.table.Table;
+
+import java.util.function.Supplier;
+
+/**
+ * Cache value for Paimon table metadata.
+ * Encapsulates the Paimon Table object and provides lazy loading for snapshot 
cache.
+ */
+public class PaimonTableCacheValue {
+    private final Table paimonTable;
+
+    // Lazy-loaded snapshot cache
+    private volatile boolean snapshotCacheLoaded;
+    private volatile PaimonSnapshotCacheValue snapshotCacheValue;
+
+    public PaimonTableCacheValue(Table paimonTable) {
+        this.paimonTable = paimonTable;
+    }
+
+    public Table getPaimonTable() {
+        return paimonTable;
+    }
+
+    /**
+     * Get snapshot cache value with lazy loading.
+     * Uses double-checked locking to ensure thread-safe initialization.
+     *
+     * @param loader Supplier to load snapshot cache value when needed
+     * @return The cached or newly loaded snapshot cache value
+     */
+    public PaimonSnapshotCacheValue 
getSnapshotCacheValue(Supplier<PaimonSnapshotCacheValue> loader) {
+        if (!snapshotCacheLoaded) {
+            synchronized (this) {
+                if (!snapshotCacheLoaded) {
+                    snapshotCacheValue = loader.get();
+                    snapshotCacheLoaded = true;
+                }
+            }
+        }
+        return snapshotCacheValue;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java
new file mode 100644
index 00000000000..30ec4a1185e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+
+import org.apache.paimon.table.Table;
+
+import java.util.Optional;
+
+public class PaimonUtils {
+
+    public static Table getPaimonTable(ExternalTable dorisTable) {
+        return 
paimonMetadataCache(dorisTable.getCatalog()).getPaimonTable(dorisTable);
+    }
+
+    public static PaimonSnapshotCacheValue 
getLatestSnapshotCacheValue(ExternalTable dorisTable) {
+        return 
paimonMetadataCache(dorisTable.getCatalog()).getSnapshotCache(dorisTable);
+    }
+
+    public static PaimonSnapshotCacheValue 
getSnapshotCacheValue(Optional<MvccSnapshot> snapshot,
+            ExternalTable dorisTable) {
+        if (snapshot.isPresent() && snapshot.get() instanceof 
PaimonMvccSnapshot) {
+            return ((PaimonMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
+        }
+        return getLatestSnapshotCacheValue(dorisTable);
+    }
+
+    public static PaimonSchemaCacheValue getSchemaCacheValue(ExternalTable 
dorisTable,
+            PaimonSnapshotCacheValue snapshotValue) {
+        return getSchemaCacheValue(dorisTable, 
snapshotValue.getSnapshot().getSchemaId());
+    }
+
+    public static PaimonSchemaCacheValue getSchemaCacheValue(ExternalTable 
dorisTable, long schemaId) {
+        return paimonMetadataCache(dorisTable.getCatalog())
+                .getPaimonSchemaCacheValue(dorisTable.getOrBuildNameMapping(), 
schemaId);
+    }
+
+    public static PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog 
catalog) {
+        return paimonMetadataCache(catalog);
+    }
+
+    private static PaimonMetadataCache paimonMetadataCache(ExternalCatalog 
catalog) {
+        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache(catalog);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 5a7ecec791f..1783477b7ad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource.paimon.source;
 
 import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -33,6 +32,7 @@ import 
org.apache.doris.datasource.credentials.VendedCredentialsFactory;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.PaimonUtil;
+import org.apache.doris.datasource.paimon.PaimonUtils;
 import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
 import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
 import org.apache.doris.datasource.property.storage.StorageProperties;
@@ -206,8 +206,7 @@ public class PaimonScanNode extends FileQueryScanNode {
     private void putHistorySchemaInfo(Long schemaId) {
         if (currentQuerySchema.putIfAbsent(schemaId, Boolean.TRUE) == null) {
             PaimonExternalTable table = (PaimonExternalTable) 
source.getTargetTable();
-            TableSchema tableSchema = 
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
-                    .getPaimonSchemaCacheValue(table.getOrBuildNameMapping(), 
schemaId).getTableSchema();
+            TableSchema tableSchema = PaimonUtils.getSchemaCacheValue(table, 
schemaId).getTableSchema();
             params.addToHistorySchemaInfo(
                     PaimonUtil.getSchemaInfo(tableSchema, 
source.getCatalog().getEnableMappingVarbinary(),
                             
source.getCatalog().getEnableMappingTimestampTz()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
index cb2a3e15816..d9777d10b39 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
@@ -18,6 +18,8 @@
 package org.apache.doris.datasource.property.metastore;
 
 import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.datasource.property.ConnectorProperty;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
@@ -136,6 +138,8 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
      * @param catalogProps the catalog properties map to add manifest cache 
properties to
      */
     protected void addManifestCacheProperties(Map<String, String> 
catalogProps) {
+        boolean hasIoManifestCacheEnabled = 
StringUtils.isNotBlank(ioManifestCacheEnabled)
+                || 
StringUtils.isNotBlank(catalogProps.get(CatalogProperties.IO_MANIFEST_CACHE_ENABLED));
         if (StringUtils.isNotBlank(ioManifestCacheEnabled)) {
             catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, 
ioManifestCacheEnabled);
         }
@@ -149,6 +153,22 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
         if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) {
             
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, 
ioManifestCacheMaxContentLength);
         }
+
+        // default enable io manifest cache if the meta.cache.manifest is 
enabled
+        if (!hasIoManifestCacheEnabled) {
+            CacheSpec manifestCacheSpec = 
CacheSpec.fromProperties(catalogProps,
+                    IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
+                    
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
+                    IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+                    
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+                    IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
+                    
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
+            if (CacheSpec.isCacheEnabled(manifestCacheSpec.isEnable(),
+                    manifestCacheSpec.getTtlSecond(),
+                    manifestCacheSpec.getCapacity())) {
+                catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, 
"true");
+            }
+        }
     }
 
     /**
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index a8eaf8bc2f4..2af0aef9710 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1519,16 +1519,47 @@ class Suite implements GroovyInterceptable {
         // Split by semicolon and execute each statement
         def statements = sqlStatements.split(';').collect { it.trim() 
}.findAll { it }
         def results = []
-        
+
         for (stmt in statements) {
             if (stmt) {
                 results << spark_iceberg(stmt, timeoutSeconds)
             }
         }
-        
+
         return results
     }
 
+    /**
+     * Execute Spark SQL on the spark-iceberg container with Paimon extensions 
enabled.
+     *
+     * Usage in test suite:
+     *   spark_paimon "CREATE TABLE paimon.test_db.t1 (id INT) USING paimon"
+     *   spark_paimon "INSERT INTO paimon.test_db.t1 VALUES (1)"
+     *   def result = spark_paimon "SELECT * FROM paimon.test_db.t1"
+     */
+    String spark_paimon(String sqlStr, int timeoutSeconds = 120) {
+        String containerName = getSparkIcebergContainerName()
+        if (containerName == null) {
+            throw new RuntimeException("spark-iceberg container not found. 
Please ensure the container is running.")
+        }
+        String masterUrl = "spark://${containerName}:7077"
+
+        String escapedSql = sqlStr.replaceAll('"', '\\\\"')
+        String command = """docker exec ${containerName} spark-sql --master 
${masterUrl} --conf 
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 -e "${escapedSql}" """
+
+        logger.info("Executing Spark Paimon SQL: ${sqlStr}".toString())
+        logger.info("Container: ${containerName}".toString())
+
+        try {
+            String result = cmd(command, timeoutSeconds)
+            logger.info("Spark Paimon SQL result: ${result}".toString())
+            return result
+        } catch (Exception e) {
+            logger.error("Spark Paimon SQL failed: ${e.message}".toString())
+            throw e
+        }
+    }
+
     List<List<Object>> db2_docker(String sqlStr, boolean isOrder = false) {
         String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "")
         def (result, meta) = 
JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr)
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
index d95f05ae76c..fc7a21785f7 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
@@ -39,7 +39,7 @@ suite("test_iceberg_manifest_cache", 
"p0,external,doris,external_docker,external
                     "s3.secret_key" = "password",
                     "s3.endpoint" = "http://${externalEnvIp}:${minioPort}";,
                     "s3.region" = "us-east-1",
-                    "iceberg.manifest.cache.enable" = "true"
+                    "meta.cache.iceberg.manifest.enable" = "true"
                 );
             """
 
@@ -54,7 +54,7 @@ suite("test_iceberg_manifest_cache", 
"p0,external,doris,external_docker,external
                     "s3.secret_key" = "password",
                     "s3.endpoint" = "http://${externalEnvIp}:${minioPort}";,
                     "s3.region" = "us-east-1",
-                    "iceberg.manifest.cache.enable" = "false"
+                    "meta.cache.iceberg.manifest.enable" = "false"
                 );
             """
 
@@ -116,4 +116,3 @@ suite("test_iceberg_manifest_cache", 
"p0,external,doris,external_docker,external
         }
     }
 }
-
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
index 7cc9f6af0b7..cf9ad8145d9 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
@@ -58,7 +58,7 @@ suite("test_iceberg_table_cache", 
"p0,external,doris,external_docker,external_do
             "s3.secret_key" = "password",
             "s3.endpoint" = "http://${externalEnvIp}:${minioPort}";,
             "s3.region" = "us-east-1",
-            "iceberg.table.meta.cache.ttl-second" = "0"
+            "meta.cache.iceberg.table.ttl-second" = "0"
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
index 0f3391805b0..2e2a2ea8e9b 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
@@ -37,7 +37,7 @@ suite("test_iceberg_table_meta_cache", 
"p0,external,doris,external_docker,extern
                 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
                 'fs.defaultFS' = '${default_fs}',
                 'warehouse' = '${warehouse}',
-                'iceberg.manifest.cache.enable' = 'true'
+                'meta.cache.iceberg.manifest.enable' = 'true'
             );
             """
             sql """switch ${catalog_name}"""
@@ -65,7 +65,7 @@ suite("test_iceberg_table_meta_cache", 
"p0,external,doris,external_docker,extern
             sql """select * from test_iceberg_meta_cache_db.sales"""
             sql """drop table test_iceberg_meta_cache_db.sales"""
 
-            // 2. test catalog with iceberg.table.meta.cache.ttl-second
+            // 2. test catalog with meta.cache.iceberg.table.ttl-second
             sql """drop catalog if exists ${catalog_name_no_cache};"""
             test {
                 sql """
@@ -75,8 +75,8 @@ suite("test_iceberg_table_meta_cache", 
"p0,external,doris,external_docker,extern
                     'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hmsPort}',
                     'fs.defaultFS' = '${default_fs}',
                     'warehouse' = '${warehouse}',
-                    'iceberg.manifest.cache.enable' = 'false',
-                    'iceberg.table.meta.cache.ttl-second' = '-2'
+                    'meta.cache.iceberg.manifest.enable' = 'false',
+                    'meta.cache.iceberg.table.ttl-second' = '-2'
                 );
                 """
                 exception "is wrong"
@@ -90,8 +90,8 @@ suite("test_iceberg_table_meta_cache", 
"p0,external,doris,external_docker,extern
                 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
                 'fs.defaultFS' = '${default_fs}',
                 'warehouse' = '${warehouse}',
-                'iceberg.manifest.cache.enable' = 'false',
-                'iceberg.table.meta.cache.ttl-second' = '0'
+                'meta.cache.iceberg.manifest.enable' = 'false',
+                'meta.cache.iceberg.table.ttl-second' = '0'
             );
             """
             sql """switch ${catalog_name_no_cache}"""
@@ -126,7 +126,7 @@ suite("test_iceberg_table_meta_cache", 
"p0,external,doris,external_docker,extern
                 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
                 'fs.defaultFS' = '${default_fs}',
                 'warehouse' = '${warehouse}',
-                'iceberg.manifest.cache.enable' = 'false'
+                'meta.cache.iceberg.manifest.enable' = 'false'
             );
             """
             sql """switch ${catalog_name_no_cache}"""
@@ -147,11 +147,11 @@ suite("test_iceberg_table_meta_cache", 
"p0,external,doris,external_docker,extern
             sql """select * from test_iceberg_meta_cache_db.sales"""
             // alter wrong catalog property
             test {
-                sql """alter catalog ${catalog_name_no_cache} set properties 
("iceberg.table.meta.cache.ttl-second" = "-2")"""
+                sql """alter catalog ${catalog_name_no_cache} set properties 
("meta.cache.iceberg.table.ttl-second" = "-2")"""
                 exception "is wrong"
             }
             // alter catalog property, disable meta cache
-            sql """alter catalog ${catalog_name_no_cache} set properties 
("iceberg.table.meta.cache.ttl-second" = "0")"""
+            sql """alter catalog ${catalog_name_no_cache} set properties 
("meta.cache.iceberg.table.ttl-second" = "0")"""
             // select 2 rows
             sql """select * from test_iceberg_meta_cache_db.sales"""
             // insert into new value
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy
new file mode 100644
index 00000000000..2a3176688f5
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_table_meta_cache", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable paimon test.")
+        return
+    }
+
+    String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    String catalogWithCache = "test_paimon_table_cache_with_cache"
+    String catalogNoCache = "test_paimon_table_cache_no_cache"
+    String testDb = "paimon_cache_test_db"
+
+    sql """drop catalog if exists ${catalogWithCache}"""
+    sql """drop catalog if exists ${catalogNoCache}"""
+
+    sql """
+        CREATE CATALOG ${catalogWithCache} PROPERTIES (
+            'type' = 'paimon',
+            'warehouse' = 's3://warehouse/wh',
+            's3.endpoint' = 'http://${externalEnvIp}:${minioPort}',
+            's3.access_key' = 'admin',
+            's3.secret_key' = 'password',
+            's3.path.style.access' = 'true'
+        );
+    """
+
+    sql """
+        CREATE CATALOG ${catalogNoCache} PROPERTIES (
+            'type' = 'paimon',
+            'warehouse' = 's3://warehouse/wh',
+            's3.endpoint' = 'http://${externalEnvIp}:${minioPort}',
+            's3.access_key' = 'admin',
+            's3.secret_key' = 'password',
+            's3.path.style.access' = 'true',
+            'meta.cache.paimon.table.ttl-second' = '0'
+        );
+    """
+
+    try {
+        spark_paimon "CREATE DATABASE IF NOT EXISTS paimon.${testDb}"
+
+        // ==================== Test 1: DML (INSERT) ====================
+        logger.info("========== Test 1: DML (INSERT) ==========")
+        spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_insert"
+        spark_paimon "CREATE TABLE paimon.${testDb}.test_insert (id INT, name 
STRING) USING paimon"
+        spark_paimon "INSERT INTO paimon.${testDb}.test_insert VALUES (1, 
'initial')"
+
+        sql """switch ${catalogWithCache}"""
+        def result1 = sql """select * from ${testDb}.test_insert order by id"""
+        assertEquals(1, result1.size())
+
+        sql """switch ${catalogNoCache}"""
+        def result1NoCache = sql """select * from ${testDb}.test_insert order 
by id"""
+        assertEquals(1, result1NoCache.size())
+
+        spark_paimon "INSERT INTO paimon.${testDb}.test_insert VALUES (2, 
'external_insert')"
+
+        sql """switch ${catalogWithCache}"""
+        def result2 = sql """select * from ${testDb}.test_insert order by id"""
+        assertEquals(1, result2.size())
+
+        sql """switch ${catalogNoCache}"""
+        def result2NoCache = sql """select * from ${testDb}.test_insert order 
by id"""
+        assertEquals(2, result2NoCache.size())
+
+        sql """switch ${catalogWithCache}"""
+        sql """refresh table ${testDb}.test_insert"""
+        def result3 = sql """select * from ${testDb}.test_insert order by id"""
+        assertEquals(2, result3.size())
+
+        // ==================== Test 2: Schema Change (ADD COLUMN) 
====================
+        logger.info("========== Test 2: Schema Change (ADD COLUMN) ==========")
+        spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_add_column"
+        spark_paimon "CREATE TABLE paimon.${testDb}.test_add_column (id INT, 
name STRING) USING paimon"
+        spark_paimon "INSERT INTO paimon.${testDb}.test_add_column VALUES (1, 
'test')"
+
+        sql """switch ${catalogWithCache}"""
+        def addColDesc1 = sql """desc ${testDb}.test_add_column"""
+        assertEquals(2, addColDesc1.size())
+
+        sql """switch ${catalogNoCache}"""
+        def addColDesc1NoCache = sql """desc ${testDb}.test_add_column"""
+        assertEquals(2, addColDesc1NoCache.size())
+
+        spark_paimon "ALTER TABLE paimon.${testDb}.test_add_column ADD COLUMNS 
(new_col INT)"
+
+        sql """switch ${catalogWithCache}"""
+        def addColDesc2 = sql """desc ${testDb}.test_add_column"""
+        assertEquals(2, addColDesc2.size())
+
+        sql """switch ${catalogNoCache}"""
+        def addColDesc2NoCache = sql """desc ${testDb}.test_add_column"""
+        assertEquals(3, addColDesc2NoCache.size())
+
+        sql """switch ${catalogWithCache}"""
+        sql """refresh table ${testDb}.test_add_column"""
+        def addColDesc3 = sql """desc ${testDb}.test_add_column"""
+        assertEquals(3, addColDesc3.size())
+    } finally {
+        try {
+            spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_insert"
+            spark_paimon "DROP TABLE IF EXISTS 
paimon.${testDb}.test_add_column"
+        } catch (Exception e) {
+            logger.warn("Failed to drop paimon tables: 
${e.message}".toString())
+        }
+        sql """drop catalog if exists ${catalogWithCache}"""
+        sql """drop catalog if exists ${catalogNoCache}"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to